上一步实现了SparkStructuredStreaming写hive动态分区表:数据仓库之SparkStructuredStreaming写hive动态分区表 | 62bit的秘密基地
这一步分一下层,把上一步解耦一下
建库 1 2 CREATE DATABASE ods;CREATE DATABASE dw;
后来意识到可以用spark sql远程建库,需要配置spark.sql.warehouse.dir
(2023.4.7)
1 2 3 4 5 6 val spark = SparkSession .builder().config(conf) .config("hive.metastore.uris" ,"thrift://192.168.52.120:9083" ) .config("spark.sql.warehouse.dir" , "hdfs://192.168.52.100:8020/user/hive/warehouse" ) .enableHiveSupport() .getOrCreate() spark.sql ("CREATE DATABASE IF NOT EXISTS ods" )
Kafka to ODS 其中KafkaConsumer来自之前的博客:数据仓库之Spark Structured Streaming实时消费Kafka | 62bit的秘密基地
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 import java.util.concurrent.TimeUnit import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import org.apache.spark.sql.streaming.{OutputMode , Trigger }object Kafka2ODS { def main (args: Array [String ]): Unit = { val conf = new SparkConf () .setAppName("Kafka2ODS" ) .setMaster("local[*]" ) val spark = SparkSession .builder().config(conf) .config("hive.metastore.uris" ,"thrift://192.168.52.120:9083" ) .enableHiveSupport() .getOrCreate() spark.sparkContext.setLogLevel("ERROR" ) val df = KafkaConsumer .readKafka2DF(spark,"bili" ,"earliest" ) val ws = df.writeStream .outputMode(OutputMode .Append ()) .format("orc" ) .option("checkpointLocation" ,"hdfs://192.168.52.100:8020/tmp/offset/bilibili/kafka2ODS" ) .toTable("ods.blbl" ) ws.awaitTermination() } }
ODS to DW 上一步忘记做类型转换了,这一步补上
1 2 3 4 5 val convertDF = df.withColumn("play" ,col("play" ).cast(IntegerType )) .withColumn("review" , col("review" ).cast(IntegerType )) .withColumn("video_review" , col("video_review" ).cast(IntegerType )) .withColumn("favorites" , col("favorites" ).cast(IntegerType )) .withColumn("tag" , split(col("tag" ), "/" ).cast("array<string>" ))
可以使用for循环简化一下(但是感觉可读性降低了。。):
1 2 3 4 val intCols = Array ("play" , "review" , "video_review" , "favorites" )var castDF = dffor (colName <- intCols) castDF = castDF.withColumn(colName, col(colName).cast(IntegerType ))castDF = castDF.withColumn("tag" , split(col("tag" ), "/" ).cast("array<string>" ))
完整代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 import org.apache.spark.SparkConf import org.apache.spark.sql.functions.{col, split, to_date, to_timestamp, weekofyear}import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types.IntegerType object ODS2DWD { def main (args: Array [String ]): Unit = { val conf = new SparkConf () .setAppName("ODS2DWD" ) .setMaster("local[*]" ) val spark = SparkSession .builder().config(conf) .config("hive.metastore.uris" ,"thrift://192.168.52.120:9083" ) .enableHiveSupport() .getOrCreate() val df = spark.sql("select * from ods.blbl" ) df.show(3 ) val convertDF = df.withColumn("play" ,col("play" ).cast(IntegerType )) .withColumn("review" , col("review" ).cast(IntegerType )) .withColumn("video_review" , col("video_review" ).cast(IntegerType )) .withColumn("favorites" , col("favorites" ).cast(IntegerType )) .withColumn("tag" , split(col("tag" ), "/" ).cast("array<string>" )) val filterDF = castDF.filter(col("play" ).isNotNull) val dataTimeDF = filterDF.withColumn("date" ,to_date(split(col("pubdate" ), " " ).getItem(0 ),"yyyy-MM-dd" )) .withColumn("time" , to_timestamp(split(col("pubdate" ), " " ).getItem(1 ),"HH:mm:ss" )) dataTimeDF.printSchema() val yearWeekDF = dataTimeDF.withColumn("year" ,split(col("date" ), "-" ).getItem(0 ).cast(IntegerType )) .withColumn("week" ,weekofyear(to_date(col("date" ),"yyyy-MM-dd" ))) yearWeekDF.printSchema() yearWeekDF.write .format("orc" ) .partitionBy("year" ,"week" ) .mode("overwrite" ) .saveAsTable("dw.blbl" ) } }
Spark创建hive外部表 之前创建的都是hive的内部表,想要创建外部表只需要在建表时增加path
配置即可
1 2 3 4 5 6 yearWeekDF.write .format("orc" ) .partitionBy("year" ,"week" ) .mode("overwrite" ) .option("path" , "hdfs://192.168.52.100:8020/hive_external_table/blbl" ) .saveAsTable("dw.blbl" )
这样创建表后,可以在spark上用两种方式查询该表:
可以直接用spark.sql(“SELECT * FROM dw.bllbl”)来查询这张表的数据,不需要修复分区信息。
也可以用spark.catalog.recoverPartitions(“test_table”)来修复分区信息,然后用spark.table(“test_table”)来加载这张表为一个DataFrame。
这两种方法的效果应该是一样的,但是第二种方法会更新Hive metastore中的分区信息,可能会更方便以后的查询。