上一步实现了SparkStructuredStreaming写hive动态分区表:数据仓库之SparkStructuredStreaming写hive动态分区表 | 62bit的秘密基地

这一步分一下层,把上一步解耦一下

建库

1
2
CREATE DATABASE ods;
CREATE DATABASE dw;

后来意识到可以用spark sql远程建库,需要配置spark.sql.warehouse.dir(2023.4.7)

1
2
3
4
5
6
val spark = SparkSession.builder().config(conf)
.config("hive.metastore.uris","thrift://192.168.52.120:9083")
.config("spark.sql.warehouse.dir", "hdfs://192.168.52.100:8020/user/hive/warehouse")
.enableHiveSupport()
.getOrCreate()
spark.sql ("CREATE DATABASE IF NOT EXISTS ods")

Kafka to ODS

其中KafkaConsumer来自之前的博客:数据仓库之Spark Structured Streaming实时消费Kafka | 62bit的秘密基地

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import java.util.concurrent.TimeUnit
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.{OutputMode, Trigger}


object Kafka2ODS {

def main(args: Array[String]): Unit = {

val conf = new SparkConf()
.setAppName("Kafka2ODS")
.setMaster("local[*]")
val spark = SparkSession.builder().config(conf)
.config("hive.metastore.uris","thrift://192.168.52.120:9083")
.enableHiveSupport()
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
val df = KafkaConsumer.readKafka2DF(spark,"bili","earliest")


val ws = df.writeStream
.outputMode(OutputMode.Append())
.format("orc")
//.trigger(Trigger.ProcessingTime(10,TimeUnit.SECONDS))
.option("checkpointLocation","hdfs://192.168.52.100:8020/tmp/offset/bilibili/kafka2ODS")
.toTable("ods.blbl")
ws.awaitTermination()
}

}

ODS to DW

上一步忘记做类型转换了,这一步补上

1
2
3
4
5
val convertDF = df.withColumn("play",col("play").cast(IntegerType))
.withColumn("review", col("review").cast(IntegerType))
.withColumn("video_review", col("video_review").cast(IntegerType))
.withColumn("favorites", col("favorites").cast(IntegerType))
.withColumn("tag", split(col("tag"), "/").cast("array<string>"))

可以使用for循环简化一下(但是感觉可读性降低了。。):

1
2
3
4
val intCols = Array("play", "review", "video_review", "favorites")
var castDF = df
for(colName <- intCols) castDF = castDF.withColumn(colName, col(colName).cast(IntegerType))
castDF = castDF.withColumn("tag", split(col("tag"), "/").cast("array<string>"))

完整代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions.{col, split, to_date, to_timestamp, weekofyear}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.IntegerType


object ODS2DWD {

def main(args: Array[String]): Unit = {

val conf = new SparkConf()
.setAppName("ODS2DWD")
.setMaster("local[*]")
val spark = SparkSession.builder().config(conf)
.config("hive.metastore.uris","thrift://192.168.52.120:9083")
.enableHiveSupport()
.getOrCreate()
val df = spark.sql("select * from ods.blbl")

df.show(3)
//类型转换
val convertDF = df.withColumn("play",col("play").cast(IntegerType))
.withColumn("review", col("review").cast(IntegerType))
.withColumn("video_review", col("video_review").cast(IntegerType))
.withColumn("favorites", col("favorites").cast(IntegerType))
.withColumn("tag", split(col("tag"), "/").cast("array<string>"))

//判断数据有没有错位
//如果为null说明数据错位了,过滤掉
val filterDF = castDF.filter(col("play").isNotNull)


//将pubdata拆成 year、week
val dataTimeDF = filterDF.withColumn("date",to_date(split(col("pubdate"), " ").getItem(0),"yyyy-MM-dd"))
.withColumn("time", to_timestamp(split(col("pubdate"), " ").getItem(1),"HH:mm:ss"))
dataTimeDF.printSchema()

val yearWeekDF = dataTimeDF.withColumn("year",split(col("date"), "-").getItem(0).cast(IntegerType))
.withColumn("week",weekofyear(to_date(col("date"),"yyyy-MM-dd")))
yearWeekDF.printSchema()


yearWeekDF.write
.format("orc") //指定外部输出的文件存储格式
.partitionBy("year","week")//提供分区字段
.mode("overwrite")
.saveAsTable("dw.blbl")//写入hive表

}

}

pp4vWOe.png

Spark创建hive外部表

之前创建的都是hive的内部表,想要创建外部表只需要在建表时增加path配置即可

1
2
3
4
5
6
yearWeekDF.write
.format("orc")
.partitionBy("year","week")
.mode("overwrite")
.option("path", "hdfs://192.168.52.100:8020/hive_external_table/blbl")
.saveAsTable("dw.blbl")

这样创建表后,可以在spark上用两种方式查询该表:

  • 可以直接用spark.sql(“SELECT * FROM dw.bllbl”)来查询这张表的数据,不需要修复分区信息。
  • 也可以用spark.catalog.recoverPartitions(“test_table”)来修复分区信息,然后用spark.table(“test_table”)来加载这张表为一个DataFrame。

这两种方法的效果应该是一样的,但是第二种方法会更新Hive metastore中的分区信息,可能会更方便以后的查询。