上一步实现了数仓分层:数据仓库之数仓分层 | 62bit的秘密基地

这一步进行维度建模

目前的数据是一整张表,为了提高查询效率和减少数据存储空间,可以将整张大表拆成几张小表

目前的表有如下字段:title(视频标题)、author(作者)、play(播放量)、partition(分区)、subpartition(子分区)、tag(标签)、pic(封面url)、pubdate(发布日期+发布时间)、arcurl(视频链接)、review(评论数)、video_review(弹幕数)、favorites(收藏数)、year(发布年份)、week(发布周次)、time(发布时间)、date(发布日期)

为了方便查询,先给每条数据一个编号

1
2
3
4
5
6
7
8
val df = spark.sql("select * from dw.blbl")
spark.sparkContext.setLogLevel("ERROR")
def indexer(df: DataFrame, columnName: String): DataFrame = {
val w = Window.orderBy(lit("A"))
val partitionDF = df.withColumn(columnName, row_number().over(w))
partitionDF
}
val idDF = indexer(df, "id")

我将整张表分为如下几张表:

  • dim(维度表):包含id、partition、subpartition、year、week字段
  • video_data(视频数据表):包含id、play、review、video_review、favorites字段
  • video_info(视频信息表):包含id、title、author、tag、pic、arcurl字段
1
2
3
val dimTableDF = idDF.select("id", "partition", "subpartition", "year", "week")
val videoDataDF = idDF.select("id", "play", "review", "video_review", "favorites")
val videoInfoDF = idDF.select("id", "title", "author", "tag", "pic", "arcurl", "date")

观察发现partition字段和subpartition字段大量重复,于是我打算把这两个字段用数字代替,以减小数据量

先把这两个字段建成形如 [“编号”,“字段名”] 的表

1
2
val partitionDF = indexer(idDF.select("partition").distinct, "partitionId")
val subpartitionDF = indexer(idDF.select("subpartition").distinct, "subpartitionId")

然后把dim表中对应的字段替换成字段编号

1
2
3
4
val transformDimTableDF = dimTableDF
.join(partitionDF, Seq("partition"))
.join(subpartitionDF, Seq("subpartition"))
.drop("partition", "subpartition")

这样便减小了原本dim表的数据量

最后把所有的表写入hive中去

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
val dimWriter = transformDimTableDF.write
.format("orc")
.partitionBy("year","week","partitionId", "subpartitionId")
.mode("overwrite")
dimWriter.saveAsTable("dw.dim")
val partitionWriter = partitionDF.write
.format("orc")
.mode("overwrite")
partitionWriter.saveAsTable("dw.partition_id")
val subpartitionWriter = subpartitionDF.write
.format("orc")
.mode("overwrite")
subpartitionWriter.saveAsTable("dw.subpartition_id")
val videoDataWriter = videoDataDF.write
.format("orc")
.mode("overwrite")
videoDataWriter.saveAsTable("dw.video_data")
val videoInfoWriter = videoInfoDF.write
.format("orc")
.mode("overwrite")
videoInfoWriter.saveAsTable("dw.video_info")

最后可以看到这些表都被成功创建了

pp4vmz8.png

用sparkSQL查一下video_info

pp4vdL4.png

查看hive表的大小

以下内容更新于2023.4.7,我重新跑了一遍全部流程

1
2
3
4
5
6
hadoop fs -du -s -h hdfs://bigdata01:8020/user/hive/warehouse/bilibili_dw.db/bilibili_data
hadoop fs -du -s -h hdfs://bigdata01:8020/user/hive/warehouse/bilibili_dw.db/dim
hadoop fs -du -s -h hdfs://bigdata01:8020/user/hive/warehouse/bilibili_dw.db/partition_id
hadoop fs -du -s -h hdfs://bigdata01:8020/user/hive/warehouse/bilibili_dw.db/subpartition_id
hadoop fs -du -s -h hdfs://bigdata01:8020/user/hive/warehouse/bilibili_dw.db/video_data
hadoop fs -du -s -h hdfs://bigdata01:8020/user/hive/warehouse/bilibili_dw.db/video_info

输出

1
2
3
4
5
6
4.3 M  13.0 M  hdfs://bigdata01:8020/user/hive/warehouse/bilibili_dw.db/bilibili_data
49.1 K 147.4 K hdfs://bigdata01:8020/user/hive/warehouse/bilibili_dw.db/dim
511 1.5 K hdfs://bigdata01:8020/user/hive/warehouse/bilibili_dw.db/partition_id
1.3 K 4.0 K hdfs://bigdata01:8020/user/hive/warehouse/bilibili_dw.db/subpartition_id
198.7 K 596.2 K hdfs://bigdata01:8020/user/hive/warehouse/bilibili_dw.db/video_data
3.7 M 11.1 M hdfs://bigdata01:8020/user/hive/warehouse/bilibili_dw.db/video_info

可以看到数据量确实变小了