看的是极客时间的课同时结合官网

Overview - Spark 3.3.1 Documentation (apache.org)

零基础入门 Spark (geekbang.org)

基础知识

01 Spark:从“大数据的Hello World”开始

准备工作

  1. IDEA安装Scala插件

  2. 构建Maven项目

  3. pom.xml加入spark

    1
    2
    3
    4
    5
    6
    7
    <dependencies>
    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.12</artifactId>
    <version>3.2.0</version>
    </dependency>
    </dependencies>

Word Count

1. 读取内容

1
2
3
4
5
6
val rootPath: String = "xxxx" //这里是文件所在目录
val file: String = s"$rootPath\\wikiOfSpark.txt"
val sparkConf = new SparkConf()
sparkConf.setMaster("local[*]") //本地运行
sparkConf.setAppName("wordCount")
val lineRDD: RDD[String] = new SparkContext(sparkConf).textFile(file)//文件中的每一行当作一个元素存入RDD

源码中关于SparkConf和SparkContext的说明

SparkConf:Configuration for a Spark application. Used to set various Spark parameters as key-value pairs.

SparkContext:Main entry point for Spark functionality. A SparkContext represents the connection to a Spark cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.

我目前理解的RDD:一个分布式元素集合,里面的元素可以通过算子转换成各种类型和结构,每个RDD都被分成不同的分区,分别在集群中的不同节点上操作

2. 分词

1
2
//以行为单位做分词
val wordRDD: RDD[String] = lineRDD.flatMap(_.split(" "))
1
2
// 过滤掉空字符串
val cleanWordRDD: RDD[String] = wordRDD.filter(!_.equals(""))

3. 分组计数

1
2
3
4
5
6
7
8
// 把RDD元素转换为(Key,Value)的形式
val kvRDD: RDD[(String, Int)] = cleanWordRDD.map((_, 1))

// 按照单词做分组计数
val wordCounts: RDD[(String, Int)] = kvRDD.reduceByKey(_ + _)

// 打印词频最高的5个词汇
wordCounts.map{case (k, v) => (v, k)}.sortByKey(false).take(5)

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object WordCount {
def main(args: Array[String]): Unit = {
val rootPath: String = "xxxx"
val file: String = s"$rootPath\\wikiOfSpark.txt"

val sparkConf = new SparkConf()
sparkConf.setMaster("local[*]") //本地运行
sparkConf.setAppName("wordCount")

val lineRDD: RDD[String] = new SparkContext(sparkConf).textFile(file)
val wordRDD: RDD[String] = lineRDD.flatMap(_.split(" "))
val cleanRDD: RDD[String] = wordRDD.filter(!_.equals(""))
val kvRDD: RDD[(String,Int)] = cleanRDD.map((_, 1))
val wordCounts: RDD[(String, Int)] = kvRDD.reduceByKey(_ + _)

println(wordCounts.map { case (k, v) => (v, k) }.sortByKey(false).take(5).mkString("Array(", ", ", ")"))
}
}

自测题:

  1. 独立完成WordCount代码的编写
  2. flatMap流程

02 RDD与编程模型:延迟计算是怎么回事?

RDD与数组的区别

RDD是一种抽象,是Spark对于分布式数据集的抽象,它用于囊括所有内存中和磁盘中的分布式数据实体

对比项 数组 RDD
概念类型 数据结构实体 数据模型抽象
数据跨度 单机进程内 跨进程、跨计算节点
数据构成 数组元素 数据分片(Partitions)
数据定位 数组下标、索引 数据分片索引

RDD的四大属性

  • partitions:数据分片,不同节点上的数据属于不同分片
  • partitioner:分片切割规则,根据规则将数据发往不同分区
  • dependencies:RDD依赖,RDD每种数据形态都依赖上一种形态
  • compute:转换函数,RDD的转换方法

编程模型与延迟计算

每个RDD都代表着一种分布式数据形态

RDD 到 RDD 之间的转换,本质上是数据形态上的转换(Transformations)

RDD算子的一个共性:RDD转换

在 RDD 的编程模型中,一共有两种算子,Transformations 类算子Actions 类算子。开发者需要使用 Transformations 类算子,定义并描述数据形态的转换过程,然后调用 Actions 类算子,将计算结果收集起来、或是物化到磁盘。

在这样的编程模型下,Spark 在运行时的计算被划分为两个环节。

  1. 基于不同数据形态之间的转换,构建计算流图(DAG,Directed Acyclic Graph)
  2. 通过 Actions 类算子,以回溯的方式去触发执行这个计算流图。

调用的各类 Transformations 算子,并不立即执行计算,调用 Actions 算子时,之前调用的Transformations算子才会执行,这就叫作“延迟计算”(Lazy Evaluation)。

所以构建计算流图的过程不会耗费很多时间,Actions算子触发执行计算流程的过程才最耗时,这也是延迟计算的一个特点

自测题:

  1. 讲一下RDD
  2. 延迟计算是什么意思

03 RDD常用算子(一):RDD内部的数据转换

创建RDD

在Spark中,创建RDD的典型方式有两种

  • 通过SparkContext.parallelize在内部数据之上创建RDD

    1
    2
    3
    import org.apache.spark.rdd.RDD
    val words: Array[String] = Array("Spark", "is", "cool")
    val rdd: RDD[String] = sc.parallelize(words)
  • 通过SparkContext.textFile等API从外部数据创建RDD

这里的内部、外部是相对应用程序来说的。开发者在 Spark 应用中自定义的各类数据结构,如数组、列表、映射等,都属于“内部数据”;而“外部数据”指代的,是 Spark 系统之外的所有数据形式,如本地文件系统或是分布式文件系统中的数据,再比如来自其他大数据组件(Hive、Hbase、RDBMS 等)的数据。

map:以元素为粒度的数据转换

map算子的用法:给定映射函数 f,map(f) 以元素为粒度对 RDD 做数据转换。其中 f 可以是带有明确签名的带名函数,也可以是匿名函数,它的形参类型必须与 RDD 的元素类型保持一致,而输出类型则任由开发者自行决定。

正因为map是以元素为粒度做数据转换的,在某些计算场景下,这个特点会严重影响执行效率

例:对每个元素的哈希值计数

1
2
3
4
5
6
7
8
9
10
import java.security.MessageDigest

val kvRDD: RDD[(String, Int)] = cleanWordRDD.map{ word =>
// 获取MD5对象实例
val md5 = MessageDigest.getInstance("MD5")
// 使用MD5计算哈希值
val hash = md5.digest(word.getBytes).mkString
// 返回哈希值与数字1的Pair
(hash, 1)
}

要对每个元素创建一次MD5对象实例,严重影响效率

这种情况就要用到mapPartitions

mapPartitions:以数据分区为粒度的数据转换

还是同样的例子:对每个元素的哈希值计数

1
2
3
4
5
6
7
8
9
10
11
import java.security.MessageDigest 

val kvRDD: RDD[(String, Int)] = cleanWordRDD.mapPartitions( partition => {
// 注意!这里是以数据分区为粒度,获取MD5对象实例
val md5 = MessageDigest.getInstance("MD5")
val newPartition = partition.map( word => {
// 在处理每一条数据记录的时候,可以复用同一个Partition内的MD5对象
(md5.digest(word.getBytes()).mkString,1)
})
newPartition
})

同一个分区的数据,可以共享同一个MD5对象

flatMap:从元素到集合、再从集合到元素

flatMap的映射过程在逻辑上分为两步:

  1. 以元素为单位,创建集合
  2. 去掉集合“外包装”,提取集合元素

可以结合01 wordcount中的分词部分理解

例子:统计相邻单词共同出现的次数

如:Spark is cool –> (Spark is, 1) (is cool, 1)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import org.apache.spark.{SparkConf, SparkContext}

object AdjacentWordsCount {
def main(args: Array[String]): Unit = {
val rootPath = "xxxx"
val file = s"$rootPath\\wikiOfSpark.txt"
val conf = new SparkConf().setMaster("local[*]").setAppName("AdjacentWordsCount")
val lineRDD = new SparkContext(conf).textFile(file)
val adjacentRDD = lineRDD.flatMap(line => {
val words = line.split(" ")
for(i <- 0 until words.length - 1) yield words(i) + "-" + words(i + 1)
})
val kvRDD = adjacentRDD.map((_,1))
val resRDD = kvRDD.reduceByKey(_ + _)
println(resRDD.map { case (k, v) => (v, k) }.sortByKey(false).take(5).mkString("Array(", ", ", ")"))

}
}

可以用一些正则表达式

1
2
//按空格、标点符号、数学符号、数字分割
val wordRDD: RDD[String] = lineRDD.flatMap(_.split("[ ]|\\pP|\\pS|\\pN"))

filter:过滤RDD

filter,顾名思义,这个算子的作用,是对 RDD 进行过滤。就像是 map 算子依赖其映射函数一样,filter 算子也需要借助一个判定函数 f,才能实现对 RDD 的过滤转换。

所谓判定函数,它指的是类型为(RDD 元素类型) => (Boolean)的函数。可以看到,判定函数 f 的形参类型,必须与 RDD 的元素类型保持一致,而 f 的返回结果,只能是 True 或者 False。在任何一个 RDD 之上调用 filter(f),其作用是保留 RDD 中满足 f(也就是 f 返回 True)的数据元素,而过滤掉不满足 f(也就是 f 返回 False)的数据元素。

还是上面的例子,这次要把像“Spark-&”之类的词对过滤掉

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import org.apache.spark.{SparkConf, SparkContext}

object AdjacentWordsCount {
def main(args: Array[String]): Unit = {
val rootPath = "xxxx"
val file = s"$rootPath\\wikiOfSpark.txt"
val conf = new SparkConf().setMaster("local[*]").setAppName("AdjacentWordsCount")
val lineRDD = new SparkContext(conf).textFile(file)
val adjacentRDD = lineRDD.flatMap(line => {
val words = line.split(" ")
for(i <- 0 until words.length - 1) yield words(i) + "-" + words(i + 1)
})

//================filter====================
val list = List("","!","@","#","$","%","^","&","*")
def f(s: String) = {
val words = s.split("-")
val b1 = list.contains(words(0))
val b2 = list.contains(words(1))
!b1 && !b2
}
val kvRDD = adjacentRDD.filter(f).map((_,1))
//==========================================

val resRDD = kvRDD.reduceByKey(_ + _)
println(resRDD.map { case (k, v) => (v, k) }.sortByKey(false).take(5).mkString("Array(", ", ", ")"))

}
}

自测题:

  1. SparkContext.parallelize和SparkContext.textFile的区别
  2. map和mapPartitions的区别
  3. 四个算子的传入的形参各有什么特点

04 进程模型与分布式部署:分布式计算是怎么回事?

分布式计算的精髓,在于如何把抽象的计算流图,转化为实实在在的分布式计算任务,然后以并行计算的方式交付执行

Spark实现分布式计算的两个关键要素:进程模型分布式部署

进程模型

Dirver与Executor

任何一个Spark应用程序的入口,都是带有SparkSession的main函数,在Spark分布式计算环境中,有且仅有一个JVM进程运行这样的main函数,称为Dirver

Dirver构建计算流图,然后将计算流图转化为分布式任务,并把分布式任务分发给集群中的执行进程Executor交付运行

Driver 除了分发任务之外,还需要定期与每个 Executor 进行沟通,及时获取他们的工作进展,从而协调整体的执行进度。

在Spark的Dirver进程中,DAGSchedulerTaskSchedulerSchedulerBackend依次完成分布式任务调度的三个核心步骤:

  1. DAGScheduler:根据用户代码构建计算流图
  2. TaskScheduler:根据计算流图拆解出分布式任务
  3. SchedulerBackend:将分布式任务分发到Executors中去

接收到任务之后,Executors 调用内部线程池,结合事先分配好的数据分片,并发地执行任务代码。对于一个完整的 RDD,每个 Executors 负责处理这个 RDD 的一个数据分片子集。

master

Spark shell命令中的master用于指定部署模式

1
spark-shell --master local[*]

分布式任务(以word count为例)

步骤:

  1. word count中用到了textFile、flatMap、filter、map、reduceByKey等算子,其中textFile、flatMap、filter和map都可以在单个Executor中独立完成,所以Dirver会先将这几个算子捏合成一个任务,打包发给每个Executor

  2. 每个Executor收到这个任务后,再将任务拆解成原本的四个步骤,分别对自己负责的数据分片按顺序执行这些步骤

  3. 每个Executor执行完任务后会向Dirver汇报自己的工作进展

  4. 在执行reduceByKey这个任务之前,会进行shuffle操作

    因为reduceByKey需要按照Key值将Value值进行统计计数,而相同的Key值很可能分布在不同的数据分片中,shuffle的过程就是把相同的Key聚合到同一个数据分片的过程

  5. 执行完shuffle操作,Driver会分发reduceByKey的任务,Executors会把最终的计算结果统一返回给Driver

分布式环境部署

Spark的两种部署模式:本地部署 和 分布式部署

Spark支持多种分布式部署模式:Standalone或YARN等

Standalone模式

Standalone 在资源调度层面,采用了一主多从的主从架构,把计算节点的角色分为 Master 和 Worker。其中,Master 有且只有一个,而 Worker 可以有一到多个。所有 Worker 节点周期性地向 Master 汇报本节点可用资源状态,Master 负责汇总、变更、管理集群中的可用资源,并对 Spark 应用程序中 Driver 的资源请求作出响应。

Standalone 在计算层面,就是用上述的Driver和Executors进程模型进行任务的执行

摘自评论区:

提问:

老师好!讲解很精彩! 为了帮助大家理解,还是要说说 standalone 模式下的 主从选举过程,三个节点怎么互相找到并选出主从。另外,standalone 模式下的 master 和 worker,与前面进程模型里说的 Driver 和 executor,二组之间的对应关系,也要讲讲。只要能简单串起来就可以了。让大家获得一个即便简单、但却完成的理解模型。

作者回复:

感谢老弟,问题提得很好~

先说说选主,这个其实比较简单,Standalone部署模式下,Master与Worker角色,这个是我们通过配置文件,事先配置好的,所以说,哪台是Master,哪台是Worker,这个配置文件里面都有。在Standalone部署下,先启动Master,然后启动Worker,由于配置中有Master的连接地址,所以Worker启动的时候,会自动去连接Master,然后双方建立心跳机制,随后集群进入ready状态。

接下来说Master、Worker与Driver、Executors的关系。首先,这4个“家伙”,都是JVM进程。不过呢,他们的定位和角色,是完全不一样的。Master、Worker用来做资源的调度与分配,你可以这样理解,这两个家伙,只负责维护集群中可用硬件资源的状态。换句话说,Worker记录着每个计算节点可用CPU cores、可用内存,等等。而Master从Worker收集并汇总所有集群中节点的可用计算资源。

Driver和Executors的角色,那就纯是Spark应用级别的进程了。这个咱们课程有介绍,就不赘述了。Driver、Executors的计算资源,全部来自于Master的调度。一般来说,Driver会占用Master所在节点的资源;而Executors一般占用Worker所在节点的计算资源。一旦Driver、Executors从Master、Worker那里申请到资源之后,Driver、Executors就不再“鸟”Master和Worker了,因为资源已经到手了,后续就是任务调度的范畴。任务调度课程中也有详细的介绍,老弟可以关注下~ 大概其就是这么些关系,不知道对老弟是否有所帮助~

Standalone分布式部署快速入门

  1. 配置ssh免密登陆

    将master的公钥追加到workers的authorized_keys文件

  2. JAVA和Spark环境搭建

    安装JAVA和Spark,配置环境变量

  3. 配置ip地址与域名对

    1
    2
    3
    4
    5
    vim /etc/hosts

    <node1 ip地址> node1
    <node2 ip地址> node2
    <node3 ip地址> node3
  4. 启动Master和Workers

    修改Master的spark-defaults.conf配置文件,设置Master URL

    1
    spark.master  spark://node1:7077

    启动Master和Workers

    1
    2
    3
    4
    #master
    sbin/start-master.sh
    #workers
    sbin/start-worker.sh node1:7077
  5. 运行spark自带的demo

    1
    MASTER=spark://node1:7077 $SPARK_HOME/bin/run-example org.apache.spark.examples.SparkPi

自测题:

  1. 分布式任务执行的步骤
  2. Worker和Master、Driver和Executor的联系与区别

05 调度系统:如何把握分布式计算的精髓

分布式计算的精髓,在于如何把抽象的计算图,转化为实实在在的分布式计算任务,然后以并行计算的方式交付执行

步骤序号 调度系统关键步骤 所在进程 核心组件
1 将DAG(计算流图)拆分为不同的运行阶段,即Stages;根据Stages创建分布式任务Tasks和任务组TaskSets Driver DAGScheduler
2 获取集群内可用计算资源 Driver SchedulerBackend
3 按照调度规则决定任务优先级,完成任务调度 Driver TaskScheduler
4 依序将分布式任务分发到Executors Driver SchedulerBackend
5 并发执行接收到的分布式计算任务 Executors ExecutorBackend

SchedulerBackend可以理解为资源管理器(Standalone、YARN等),用于分配资源,分发任务

简而言之,DAGScheduler 手里有“活儿”,SchedulerBackend 手里有“人力”,TaskScheduler 的核心职能,就是把合适的“活儿”派发到合适的“人”的手里。由此可见,TaskScheduler 承担的是承上启下、上通下达的关键角色

Spark 调度系统的核心思想,是“数据不动、代码动”

任务调度分为如下 5 个步骤:

  1. DAGScheduler 以 Shuffle 为边界,将开发者设计的计算图 DAG 拆分为多个执行阶段 Stages,然后为每个 Stage 创建任务集 TaskSet。
  2. SchedulerBackend 通过与 Executors 中的 ExecutorBackend 的交互来实时地获取集群中可用的计算资源,并将这些信息记录到 ExecutorDataMap 数据结构。
  3. 与此同时,SchedulerBackend 根据 ExecutorDataMap 中可用资源创建 WorkerOffer,以 WorkerOffer 为粒度提供计算资源。
  4. 对于给定 WorkerOffer,TaskScheduler 结合 TaskSet 中任务的本地性倾向,按照 PROCESS_LOCAL、NODE_LOCAL、RACK_LOCAL 和 ANY 的顺序,依次对 TaskSet 中的任务进行遍历,优先调度本地性倾向要求苛刻的 Task。
  5. 被选中的 Task 由 TaskScheduler 传递给 SchedulerBackend,再由 SchedulerBackend 分发到 Executors 中的 ExecutorBackend。Executors 接收到 Task 之后,即调用本地线程池来执行分布式任务。

自测题:

  1. 说一下DAGScheduler、SchedulerBackend、TaskScheduler、ExecutorBackend
  2. 任务调度的步骤

06 Shuffle管理:为什么shuffle是性能瓶颈

Shuffle 的本意是扑克的“洗牌”,在分布式计算场景中,它被引申为集群范围内跨节点、跨进程的数据分发

Shuffle 的过程中,分布式数据集在集群内的分发,会引入大量的磁盘 I/O 与网络 I/O。在 DAG 的计算链条中,Shuffle 环节的执行性能是最差的。

shuffle是map和reduce的中间阶段,将不同的数据分发到不同的节点

shuffle中间文件

Map 阶段与 Reduce 阶段,通过生产消费 Shuffle 中间文件的方式,来完成集群范围内的数据交换

Shuffle 中间文件是统称,它包含两类文件,一个是记录(Key,Value)键值对的 data 文件,另一个是记录键值对所属 Reduce Task 的 index 文件。计算图 DAG 中的 Map 阶段与 Reduce 阶段,正是通过中间文件来完成数据的交换。

Shuffle 中间文件的生成过程,分为如下几个步骤:

  1. 对于数据分区中的数据记录,逐一计算(哈希取模)其目标分区,然后填充内存数据结构;
  2. 当数据结构填满后,如果分区中还有未处理的数据记录,就对结构中的数据记录按(目标分区 ID,Key)排序,将所有数据溢出到临时文件,同时清空数据结构;
  3. 重复前 2 个步骤,直到分区中所有的数据记录都被处理为止;
  4. 对所有临时文件和内存数据结构中剩余的数据记录做归并排序,生成数据文件(data)和索引文件(index)。

最后,在 Reduce 阶段,Reduce Task 通过 index 文件来“定位”属于自己的数据内容,并通过网络从不同节点的 data 文件中下载属于自己的数据记录。

自测题:

  1. 什么是shuffle中间文件
  2. shuffle中间文件的生成过程

07 RDD常用算子(二):Spark如何实现数据聚合

  • 算子类型:Transformations
  • 适用范围:Paired RDD(kvRDD)
  • 算子用途:RDD内数据聚合
  • 算子集合:groupByKey、sortByKey、reduceByKey、aggregateByKey
  • 特点:会引入繁重的Shuffle计算

groupByKey:分组收集

RDD[(Key, Value)] —> RDD[(Key, Value集合)]

使用场景较少

reduceByKey:分组聚合

reduceByKey函数需要传入一个聚合函数f

需要强调的是,给定 RDD[(Key 类型,Value 类型)],聚合函数 f 的类型,必须是(Value 类型,Value 类型) => (Value 类型)。换句话说,函数 f 的形参,必须是两个数值,且数值的类型必须与 Value 的类型相同,而 f 的返回值,也必须是 Value 类型的数值。

1
某个kvRDD.reduceByKey((Value 类型,Value 类型) => (Value 类型))

练习:把 Word Count 的计算逻辑,改为随机赋值、提取同一个 Key 的最大值。也就是在 kvRDD 的生成过程中,我们不再使用映射函数 word => (word, 1),而是改为 word => (word, 随机数),然后再使用 reduceByKey 算子来计算同一个 word 当中最大的那个随机数。

1
2
3
4
5
import scala.util.Random._

val kvRDD = cleanWordRDD.map((_, nextInt(100)))

val wordCounts = kvRDD.reduceByKey(math.max)
  • Map端聚合:在每个节点内部先聚合
  • Reduce端聚合:数据经由网络分发之后,在 Reduce 阶段完成的聚合

reduceByKey 算子的局限性,在于其 Map 阶段与 Reduce 阶段的计算逻辑必须保持一致,这个计算逻辑统一由聚合函数 f 定义。当一种计算场景需要在两个阶段执行不同计算逻辑的时候,reduceByKey 就爱莫能助了。

aggregateByKey:更加灵活的聚合算子

aggregateByKey算子需要三个参数,分别对应初始值、Map端聚合函数和Reduce端聚合函数,可以实现在两个阶段执行不同的计算逻辑

就这 3 个参数来说,比较伤脑筋的,是它们之间的类型需要保持一致,具体来说:

  • 初始值类型,必须与 f2 的结果类型保持一致;
  • f1 的形参类型,必须与 Paired RDD 的 Value 类型保持一致;
  • f2 的形参类型,必须与 f1 的结果类型保持一致。

练习:map端求和,reduce端求最值

1
val wordCounts = kvRDD.aggregateByKey(0)(_ + _, math.max)

与 reduceByKey 一样,aggregateByKey 也可以通过 Map 端的初步聚合来大幅削减数据量,在降低磁盘与网络开销的同时,提升 Shuffle 环节的执行性能。

摘自评论区:

reduceByKey和aggregateByKey的联系和区别:

reduceByKey和aggregateByKey底层实现完全相同,都是combineByKeyWithClassTag,只不过reduceByKey调用 combineByKeyWithClassTag的入参mergeValue和mergeCombiners是相等的,aggregateByKey是用户指定可以不等的,也就是说reduceByKey是一种特殊的aggregateByKey。

sortByKey:排序

1
2
3
4
5
//按照Key升序排序
rdd.sortByKey()
rdd.sortByKey(true)
//降序
rdd.sortByKey(false)

08 内存管理:Spark如何使用内存

Spark内存区域划分
Execution Memory <—相互转化—>Storage Memory
User Memory
Reserved Memory (300MB)
  • Reserved Memory:固定300MB,是Spark预留的、用来存储各种Spark内部对象的内存区域
  • User Memory:用于存储开发者自定义的数据结构
  • Execution Memory:用来执行分布式任务
  • Storage Memory:缓存分布式数据集

RDD Cache

当同一个 RDD 被引用多次时,就可以考虑对其进行 Cache,从而提升作业的执行效率。

例如:调用两次wordCounts(RDD)

1
2
3
4
5
6
7
8
9
10
11
12
// 按照单词做分组计数
val wordCounts: RDD[(String, Int)] = kvRDD.reduceByKey((x, y) => x + y)

wordCounts.cache// 使用cache算子告知Spark对wordCounts加缓存
wordCounts.count// 触发wordCounts的计算,并将wordCounts缓存到内存

// 打印词频最高的5个词汇
wordCounts.map{case (k, v) => (v, k)}.sortByKey(false).take(5)

// 将分组计数结果落盘到文件
val targetPath: String = _
wordCounts.saveAsTextFile(targetPath)

下面两句等价

1
2
wordCounts.cache
wordCounts.persist(MEMORY_ONLY)

persist()括号里填写存储级别

Spark支持丰富的存储级别,每一种存储级别都包含3个最基本的要素

  • 存储介质:数据缓存到内存还是磁盘,或是两者都有
  • 存储形式:数据内容是对象值还是字节数组,带SER字样的表示以序列化方式存储,不带SER则表示采用对象值
  • 副本数量:存储级别名字最后的数字代表拷贝数量,没有数字默认为1份副本

09 RDD常用算子(三):数据的准备、重分布与持久化

数据加载 数据准备 数据预处理 数据处理 接过结果收集
parallelize
textFile
union
sample
coalesce
repartition
flatMap
map
filter
sortByKey
reduceByKey
aggregateByKey
take
first
collect
saveAsTextFile

数据准备

union

将两个类型一致的RDD合并

1
2
3
val rdd = rdd1.union(rdd2)
//或者
val rdd = rdd1 union rdd2

sample

RDD 的 sample 算子用于对 RDD 做随机采样,从而把一个较大的数据集变为一份“小数据”

三个参数:

  • withReplacement(Boolean类型):采样是否有放回
  • fraction(Double类型,值域为0到1):采样比例
  • seed(Long类型,可选参数):根据seed随机抽取,相同的seed采样结果是相同的

数据预处理

并行度:RDD的数据分区数量,对应RDD的partitions属性

repartition

用来调整RDD并行度

1
val rdd1 = rdd.repartition(2)

RDD 的并行度,很大程度上决定了分布式系统中 CPU 的使用效率,进而还会影响分布式系统并行计算的执行效率。并行度过高或是过低,都会降低 CPU 利用率,从而白白浪费掉宝贵的分布式计算资源,因此,合理有效地设置 RDD 并行度,至关重要。

repartition算子会引入shuffle

coalesce

1
val rdd1 = rdd.coalesce(2)

coalesce算子不会引入shuffle

  • 增加并行度只能用reparation
  • 降低并行度用coalesce

摘自官网:

coalesce:Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.

reparation: Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.

结果收集

first、take和collect

摘自官网:

first:Return the first element of the dataset (similar to take(1)).

take:Return an array with the first n elements of the dataset.

collect:Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.

collect 算子有两处性能隐患,一个是拉取数据过程中引入的网络开销,另一个 Driver 的 OOM(内存溢出,Out of Memory)。

saveAsTextFile

摘自官网:

Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file.

以 saveAsTextFile 为代表的算子,直接通过 Executors 将 RDD 数据分区物化到文件系统,这个过程并不涉及与 Driver 端的任何交互。

由于数据的持久化与 Driver 无关,因此这类算子天然地避开了 collect 算子带来的两个性能隐患。

10 广播变量 & 累加器:共享变量是用来做什么的?

按照创建与使用方式的不同,Spark 提供了两类共享变量,分别是广播变量(Broadcast variables)和累加器(Accumulators)。

广播变量(Broadcast variables)

在 Driver 与 Executors 之间,普通变量的分发与存储,是以 Task 为粒度的,因此,它所引入的网络与内存开销,会成为作业执行性能的一大隐患。在使用广播变量的情况下,数据内容的分发粒度变为以 Executors 为单位。相比前者,广播变量的优势高下立判,它可以大幅度消除前者引入的网络与内存开销,进而在整体上提升作业的执行效率。

广播变量的创建与使用

1
2
3
4
5
6
7
8
9
10
11
12
val list: List[String] = List("Apache", "Spark")

// sc为SparkContext实例
val bc = sc.broadcast(list)

// 读取广播变量内容
bc.value
// List[String] = List(Apache, Spark)

// 直接读取列表内容
list
// List[String] = List(Apache, Spark)

累加器(Accumulators)

累加器,顾名思义,它的主要作用是全局计数(Global counter)。与单机系统不同,在分布式系统中,我们不能依赖简单的普通变量来完成全局计数,而是必须依赖像累加器这种特殊的数据结构才能达到目的。

累加器的分类

  • longAccumulator
  • doubleAccumulator
  • collectionAccumulator

累加器的创建与使用

1
2
3
4
val ac = sc.longAccumulator("Empty string")

ac.add(1)//累加
ac.value//获取累加结果