Spark学习笔记
看的是极客时间的课同时结合官网
Overview - Spark 3.3.1 Documentation (apache.org)
基础知识
01 Spark:从“大数据的Hello World”开始
准备工作
IDEA安装Scala插件
构建Maven项目
pom.xml加入spark
1
2
3
4
5
6
7<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.2.0</version>
</dependency>
</dependencies>
Word Count
1. 读取内容
1 | val rootPath: String = "xxxx" //这里是文件所在目录 |
源码中关于SparkConf和SparkContext的说明
SparkConf:Configuration for a Spark application. Used to set various Spark parameters as key-value pairs.
SparkContext:Main entry point for Spark functionality. A SparkContext represents the connection to a Spark cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.
我目前理解的RDD:一个分布式元素集合,里面的元素可以通过算子转换成各种类型和结构,每个RDD都被分成不同的分区,分别在集群中的不同节点上操作
2. 分词
1 | //以行为单位做分词 |
graph LR a["第1行,第2行,...,第n行"] b["[第1行第1个词,第1行第2个词,...]<br/> [第2行第1个词,第2行第2个词,...]<br/> [...]<br/> [第n行第1个词,第n行第2个词,...]"] c["第1行第1个词,第1行第2个词,...,第n行第n个词"] subgraph "lineRDD:RDD[String]" a end subgraph "RDD[Array[String]]" b end a --映射--> b subgraph "wordRDD:RDD[String]" c end b --展平--> c
1 | // 过滤掉空字符串 |
3. 分组计数
1 | // 把RDD元素转换为(Key,Value)的形式 |
完整代码
1 | import org.apache.spark.{SparkConf, SparkContext} |
自测题:
- 独立完成WordCount代码的编写
- flatMap流程
02 RDD与编程模型:延迟计算是怎么回事?
RDD与数组的区别
RDD是一种抽象,是Spark对于分布式数据集的抽象,它用于囊括所有内存中和磁盘中的分布式数据实体
对比项 | 数组 | RDD |
---|---|---|
概念类型 | 数据结构实体 | 数据模型抽象 |
数据跨度 | 单机进程内 | 跨进程、跨计算节点 |
数据构成 | 数组元素 | 数据分片(Partitions) |
数据定位 | 数组下标、索引 | 数据分片索引 |
RDD的四大属性
- partitions:数据分片,不同节点上的数据属于不同分片
- partitioner:分片切割规则,根据规则将数据发往不同分区
- dependencies:RDD依赖,RDD每种数据形态都依赖上一种形态
- compute:转换函数,RDD的转换方法
编程模型与延迟计算
每个RDD都代表着一种分布式数据形态
RDD 到 RDD 之间的转换,本质上是数据形态上的转换(Transformations)
RDD算子的一个共性:RDD转换
在 RDD 的编程模型中,一共有两种算子,Transformations 类算子和 Actions 类算子。开发者需要使用 Transformations 类算子,定义并描述数据形态的转换过程,然后调用 Actions 类算子,将计算结果收集起来、或是物化到磁盘。
在这样的编程模型下,Spark 在运行时的计算被划分为两个环节。
- 基于不同数据形态之间的转换,构建计算流图(DAG,Directed Acyclic Graph);
- 通过 Actions 类算子,以回溯的方式去触发执行这个计算流图。
调用的各类 Transformations 算子,并不立即执行计算,调用 Actions 算子时,之前调用的Transformations算子才会执行,这就叫作“延迟计算”(Lazy Evaluation)。
所以构建计算流图的过程不会耗费很多时间,Actions算子触发执行计算流程的过程才最耗时,这也是延迟计算的一个特点
自测题:
- 讲一下RDD
- 延迟计算是什么意思
03 RDD常用算子(一):RDD内部的数据转换
创建RDD
在Spark中,创建RDD的典型方式有两种
通过SparkContext.parallelize在内部数据之上创建RDD
1
2
3import org.apache.spark.rdd.RDD
val words: Array[String] = Array("Spark", "is", "cool")
val rdd: RDD[String] = sc.parallelize(words)通过SparkContext.textFile等API从外部数据创建RDD
这里的内部、外部是相对应用程序来说的。开发者在 Spark 应用中自定义的各类数据结构,如数组、列表、映射等,都属于“内部数据”;而“外部数据”指代的,是 Spark 系统之外的所有数据形式,如本地文件系统或是分布式文件系统中的数据,再比如来自其他大数据组件(Hive、Hbase、RDBMS 等)的数据。
map:以元素为粒度的数据转换
map算子的用法:给定映射函数 f,map(f) 以元素为粒度对 RDD 做数据转换。其中 f 可以是带有明确签名的带名函数,也可以是匿名函数,它的形参类型必须与 RDD 的元素类型保持一致,而输出类型则任由开发者自行决定。
正因为map是以元素为粒度做数据转换的,在某些计算场景下,这个特点会严重影响执行效率
例:对每个元素的哈希值计数
1 | import java.security.MessageDigest |
要对每个元素创建一次MD5对象实例,严重影响效率
这种情况就要用到mapPartitions
mapPartitions:以数据分区为粒度的数据转换
还是同样的例子:对每个元素的哈希值计数
1 | import java.security.MessageDigest |
同一个分区的数据,可以共享同一个MD5对象
flatMap:从元素到集合、再从集合到元素
flatMap的映射过程在逻辑上分为两步:
- 以元素为单位,创建集合
- 去掉集合“外包装”,提取集合元素
可以结合01 wordcount中的分词部分理解
例子:统计相邻单词共同出现的次数
如:Spark is cool –> (Spark is, 1) (is cool, 1)
1 | import org.apache.spark.{SparkConf, SparkContext} |
可以用一些正则表达式
1 | //按空格、标点符号、数学符号、数字分割 |
filter:过滤RDD
filter,顾名思义,这个算子的作用,是对 RDD 进行过滤。就像是 map 算子依赖其映射函数一样,filter 算子也需要借助一个判定函数 f,才能实现对 RDD 的过滤转换。
所谓判定函数,它指的是类型为(RDD 元素类型) => (Boolean)的函数。可以看到,判定函数 f 的形参类型,必须与 RDD 的元素类型保持一致,而 f 的返回结果,只能是 True 或者 False。在任何一个 RDD 之上调用 filter(f),其作用是保留 RDD 中满足 f(也就是 f 返回 True)的数据元素,而过滤掉不满足 f(也就是 f 返回 False)的数据元素。
还是上面的例子,这次要把像“Spark-&”之类的词对过滤掉
1 | import org.apache.spark.{SparkConf, SparkContext} |
自测题:
- SparkContext.parallelize和SparkContext.textFile的区别
- map和mapPartitions的区别
- 四个算子的传入的形参各有什么特点
04 进程模型与分布式部署:分布式计算是怎么回事?
分布式计算的精髓,在于如何把抽象的计算流图,转化为实实在在的分布式计算任务,然后以并行计算的方式交付执行
Spark实现分布式计算的两个关键要素:进程模型和分布式部署
进程模型
Dirver与Executor
任何一个Spark应用程序的入口,都是带有SparkSession的main函数,在Spark分布式计算环境中,有且仅有一个JVM进程运行这样的main函数,称为Dirver
Dirver构建计算流图,然后将计算流图转化为分布式任务,并把分布式任务分发给集群中的执行进程Executor交付运行
Driver 除了分发任务之外,还需要定期与每个 Executor 进行沟通,及时获取他们的工作进展,从而协调整体的执行进度。
在Spark的Dirver进程中,DAGScheduler、TaskScheduler和SchedulerBackend依次完成分布式任务调度的三个核心步骤:
- DAGScheduler:根据用户代码构建计算流图
- TaskScheduler:根据计算流图拆解出分布式任务
- SchedulerBackend:将分布式任务分发到Executors中去
接收到任务之后,Executors 调用内部线程池,结合事先分配好的数据分片,并发地执行任务代码。对于一个完整的 RDD,每个 Executors 负责处理这个 RDD 的一个数据分片子集。
master
Spark shell命令中的master用于指定部署模式
1 | spark-shell --master local[*] |
分布式任务(以word count为例)
步骤:
word count中用到了textFile、flatMap、filter、map、reduceByKey等算子,其中textFile、flatMap、filter和map都可以在单个Executor中独立完成,所以Dirver会先将这几个算子捏合成一个任务,打包发给每个Executor
每个Executor收到这个任务后,再将任务拆解成原本的四个步骤,分别对自己负责的数据分片按顺序执行这些步骤
每个Executor执行完任务后会向Dirver汇报自己的工作进展
在执行reduceByKey这个任务之前,会进行shuffle操作
因为reduceByKey需要按照Key值将Value值进行统计计数,而相同的Key值很可能分布在不同的数据分片中,shuffle的过程就是把相同的Key聚合到同一个数据分片的过程
执行完shuffle操作,Driver会分发reduceByKey的任务,Executors会把最终的计算结果统一返回给Driver
分布式环境部署
Spark的两种部署模式:本地部署 和 分布式部署
Spark支持多种分布式部署模式:Standalone或YARN等
Standalone模式
Standalone 在资源调度层面,采用了一主多从的主从架构,把计算节点的角色分为 Master 和 Worker。其中,Master 有且只有一个,而 Worker 可以有一到多个。所有 Worker 节点周期性地向 Master 汇报本节点可用资源状态,Master 负责汇总、变更、管理集群中的可用资源,并对 Spark 应用程序中 Driver 的资源请求作出响应。
Standalone 在计算层面,就是用上述的Driver和Executors进程模型进行任务的执行
摘自评论区:
提问:
老师好!讲解很精彩! 为了帮助大家理解,还是要说说 standalone 模式下的 主从选举过程,三个节点怎么互相找到并选出主从。另外,standalone 模式下的 master 和 worker,与前面进程模型里说的 Driver 和 executor,二组之间的对应关系,也要讲讲。只要能简单串起来就可以了。让大家获得一个即便简单、但却完成的理解模型。
作者回复:
感谢老弟,问题提得很好~
先说说选主,这个其实比较简单,Standalone部署模式下,Master与Worker角色,这个是我们通过配置文件,事先配置好的,所以说,哪台是Master,哪台是Worker,这个配置文件里面都有。在Standalone部署下,先启动Master,然后启动Worker,由于配置中有Master的连接地址,所以Worker启动的时候,会自动去连接Master,然后双方建立心跳机制,随后集群进入ready状态。
接下来说Master、Worker与Driver、Executors的关系。首先,这4个“家伙”,都是JVM进程。不过呢,他们的定位和角色,是完全不一样的。Master、Worker用来做资源的调度与分配,你可以这样理解,这两个家伙,只负责维护集群中可用硬件资源的状态。换句话说,Worker记录着每个计算节点可用CPU cores、可用内存,等等。而Master从Worker收集并汇总所有集群中节点的可用计算资源。
Driver和Executors的角色,那就纯是Spark应用级别的进程了。这个咱们课程有介绍,就不赘述了。Driver、Executors的计算资源,全部来自于Master的调度。一般来说,Driver会占用Master所在节点的资源;而Executors一般占用Worker所在节点的计算资源。一旦Driver、Executors从Master、Worker那里申请到资源之后,Driver、Executors就不再“鸟”Master和Worker了,因为资源已经到手了,后续就是任务调度的范畴。任务调度课程中也有详细的介绍,老弟可以关注下~ 大概其就是这么些关系,不知道对老弟是否有所帮助~
Standalone分布式部署快速入门
配置ssh免密登陆
将master的公钥追加到workers的authorized_keys文件
JAVA和Spark环境搭建
安装JAVA和Spark,配置环境变量
配置ip地址与域名对
1
2
3
4
5vim /etc/hosts
<node1 ip地址> node1
<node2 ip地址> node2
<node3 ip地址> node3启动Master和Workers
修改Master的spark-defaults.conf配置文件,设置Master URL
1
spark.master spark://node1:7077
启动Master和Workers
1
2
3
4master
sbin/start-master.sh
workers
sbin/start-worker.sh node1:7077运行spark自带的demo
1
MASTER=spark://node1:7077 $SPARK_HOME/bin/run-example org.apache.spark.examples.SparkPi
自测题:
- 分布式任务执行的步骤
- Worker和Master、Driver和Executor的联系与区别
05 调度系统:如何把握分布式计算的精髓
分布式计算的精髓,在于如何把抽象的计算图,转化为实实在在的分布式计算任务,然后以并行计算的方式交付执行
步骤序号 | 调度系统关键步骤 | 所在进程 | 核心组件 |
---|---|---|---|
1 | 将DAG(计算流图)拆分为不同的运行阶段,即Stages;根据Stages创建分布式任务Tasks和任务组TaskSets | Driver | DAGScheduler |
2 | 获取集群内可用计算资源 | Driver | SchedulerBackend |
3 | 按照调度规则决定任务优先级,完成任务调度 | Driver | TaskScheduler |
4 | 依序将分布式任务分发到Executors | Driver | SchedulerBackend |
5 | 并发执行接收到的分布式计算任务 | Executors | ExecutorBackend |
SchedulerBackend可以理解为资源管理器(Standalone、YARN等),用于分配资源,分发任务
简而言之,DAGScheduler 手里有“活儿”,SchedulerBackend 手里有“人力”,TaskScheduler 的核心职能,就是把合适的“活儿”派发到合适的“人”的手里。由此可见,TaskScheduler 承担的是承上启下、上通下达的关键角色
Spark 调度系统的核心思想,是“数据不动、代码动”
任务调度分为如下 5 个步骤:
- DAGScheduler 以 Shuffle 为边界,将开发者设计的计算图 DAG 拆分为多个执行阶段 Stages,然后为每个 Stage 创建任务集 TaskSet。
- SchedulerBackend 通过与 Executors 中的 ExecutorBackend 的交互来实时地获取集群中可用的计算资源,并将这些信息记录到 ExecutorDataMap 数据结构。
- 与此同时,SchedulerBackend 根据 ExecutorDataMap 中可用资源创建 WorkerOffer,以 WorkerOffer 为粒度提供计算资源。
- 对于给定 WorkerOffer,TaskScheduler 结合 TaskSet 中任务的本地性倾向,按照 PROCESS_LOCAL、NODE_LOCAL、RACK_LOCAL 和 ANY 的顺序,依次对 TaskSet 中的任务进行遍历,优先调度本地性倾向要求苛刻的 Task。
- 被选中的 Task 由 TaskScheduler 传递给 SchedulerBackend,再由 SchedulerBackend 分发到 Executors 中的 ExecutorBackend。Executors 接收到 Task 之后,即调用本地线程池来执行分布式任务。
自测题:
- 说一下DAGScheduler、SchedulerBackend、TaskScheduler、ExecutorBackend
- 任务调度的步骤
06 Shuffle管理:为什么shuffle是性能瓶颈
Shuffle 的本意是扑克的“洗牌”,在分布式计算场景中,它被引申为集群范围内跨节点、跨进程的数据分发
Shuffle 的过程中,分布式数据集在集群内的分发,会引入大量的磁盘 I/O 与网络 I/O。在 DAG 的计算链条中,Shuffle 环节的执行性能是最差的。
graph LR Map-->Shuffle-->Reduce
shuffle是map和reduce的中间阶段,将不同的数据分发到不同的节点
shuffle中间文件
Map 阶段与 Reduce 阶段,通过生产与消费 Shuffle 中间文件的方式,来完成集群范围内的数据交换
Shuffle 中间文件是统称,它包含两类文件,一个是记录(Key,Value)键值对的 data 文件,另一个是记录键值对所属 Reduce Task 的 index 文件。计算图 DAG 中的 Map 阶段与 Reduce 阶段,正是通过中间文件来完成数据的交换。
Shuffle 中间文件的生成过程,分为如下几个步骤:
- 对于数据分区中的数据记录,逐一计算(哈希取模)其目标分区,然后填充内存数据结构;
- 当数据结构填满后,如果分区中还有未处理的数据记录,就对结构中的数据记录按(目标分区 ID,Key)排序,将所有数据溢出到临时文件,同时清空数据结构;
- 重复前 2 个步骤,直到分区中所有的数据记录都被处理为止;
- 对所有临时文件和内存数据结构中剩余的数据记录做归并排序,生成数据文件(data)和索引文件(index)。
最后,在 Reduce 阶段,Reduce Task 通过 index 文件来“定位”属于自己的数据内容,并通过网络从不同节点的 data 文件中下载属于自己的数据记录。
自测题:
- 什么是shuffle中间文件
- shuffle中间文件的生成过程
07 RDD常用算子(二):Spark如何实现数据聚合
- 算子类型:Transformations
- 适用范围:Paired RDD(kvRDD)
- 算子用途:RDD内数据聚合
- 算子集合:groupByKey、sortByKey、reduceByKey、aggregateByKey
- 特点:会引入繁重的Shuffle计算
groupByKey:分组收集
RDD[(Key, Value)] —> RDD[(Key, Value集合)]
使用场景较少
reduceByKey:分组聚合
reduceByKey函数需要传入一个聚合函数f
需要强调的是,给定 RDD[(Key 类型,Value 类型)],聚合函数 f 的类型,必须是(Value 类型,Value 类型) => (Value 类型)。换句话说,函数 f 的形参,必须是两个数值,且数值的类型必须与 Value 的类型相同,而 f 的返回值,也必须是 Value 类型的数值。
1 | 某个kvRDD.reduceByKey((Value 类型,Value 类型) => (Value 类型)) |
练习:把 Word Count 的计算逻辑,改为随机赋值、提取同一个 Key 的最大值。也就是在 kvRDD 的生成过程中,我们不再使用映射函数 word => (word, 1),而是改为 word => (word, 随机数),然后再使用 reduceByKey 算子来计算同一个 word 当中最大的那个随机数。
1 | import scala.util.Random._ |
- Map端聚合:在每个节点内部先聚合
- Reduce端聚合:数据经由网络分发之后,在 Reduce 阶段完成的聚合
reduceByKey 算子的局限性,在于其 Map 阶段与 Reduce 阶段的计算逻辑必须保持一致,这个计算逻辑统一由聚合函数 f 定义。当一种计算场景需要在两个阶段执行不同计算逻辑的时候,reduceByKey 就爱莫能助了。
aggregateByKey:更加灵活的聚合算子
aggregateByKey算子需要三个参数,分别对应初始值、Map端聚合函数和Reduce端聚合函数,可以实现在两个阶段执行不同的计算逻辑
就这 3 个参数来说,比较伤脑筋的,是它们之间的类型需要保持一致,具体来说:
- 初始值类型,必须与 f2 的结果类型保持一致;
- f1 的形参类型,必须与 Paired RDD 的 Value 类型保持一致;
- f2 的形参类型,必须与 f1 的结果类型保持一致。
练习:map端求和,reduce端求最值
1 | val wordCounts = kvRDD.aggregateByKey(0)(_ + _, math.max) |
与 reduceByKey 一样,aggregateByKey 也可以通过 Map 端的初步聚合来大幅削减数据量,在降低磁盘与网络开销的同时,提升 Shuffle 环节的执行性能。
摘自评论区:
reduceByKey和aggregateByKey的联系和区别:
reduceByKey和aggregateByKey底层实现完全相同,都是combineByKeyWithClassTag,只不过reduceByKey调用 combineByKeyWithClassTag的入参mergeValue和mergeCombiners是相等的,aggregateByKey是用户指定可以不等的,也就是说reduceByKey是一种特殊的aggregateByKey。
sortByKey:排序
1 | //按照Key升序排序 |
08 内存管理:Spark如何使用内存
Spark内存区域划分 |
---|
Execution Memory <—相互转化—>Storage Memory |
User Memory |
Reserved Memory (300MB) |
- Reserved Memory:固定300MB,是Spark预留的、用来存储各种Spark内部对象的内存区域
- User Memory:用于存储开发者自定义的数据结构
- Execution Memory:用来执行分布式任务
- Storage Memory:缓存分布式数据集
RDD Cache
当同一个 RDD 被引用多次时,就可以考虑对其进行 Cache,从而提升作业的执行效率。
例如:调用两次wordCounts(RDD)
1 | // 按照单词做分组计数 |
下面两句等价
1 | wordCounts.cache |
persist()括号里填写存储级别
Spark支持丰富的存储级别,每一种存储级别都包含3个最基本的要素
- 存储介质:数据缓存到内存还是磁盘,或是两者都有
- 存储形式:数据内容是对象值还是字节数组,带SER字样的表示以序列化方式存储,不带SER则表示采用对象值
- 副本数量:存储级别名字最后的数字代表拷贝数量,没有数字默认为1份副本
09 RDD常用算子(三):数据的准备、重分布与持久化
数据加载 | 数据准备 | 数据预处理 | 数据处理 | 接过结果收集 |
---|---|---|---|---|
parallelize textFile |
union sample |
coalesce repartition |
flatMap map filter sortByKey reduceByKey aggregateByKey |
take first collect saveAsTextFile |
数据准备
union
将两个类型一致的RDD合并
1 | val rdd = rdd1.union(rdd2) |
sample
RDD 的 sample 算子用于对 RDD 做随机采样,从而把一个较大的数据集变为一份“小数据”
三个参数:
- withReplacement(Boolean类型):采样是否有放回
- fraction(Double类型,值域为0到1):采样比例
- seed(Long类型,可选参数):根据seed随机抽取,相同的seed采样结果是相同的
数据预处理
并行度:RDD的数据分区数量,对应RDD的partitions属性
repartition
用来调整RDD并行度
1 | val rdd1 = rdd.repartition(2) |
RDD 的并行度,很大程度上决定了分布式系统中 CPU 的使用效率,进而还会影响分布式系统并行计算的执行效率。并行度过高或是过低,都会降低 CPU 利用率,从而白白浪费掉宝贵的分布式计算资源,因此,合理有效地设置 RDD 并行度,至关重要。
repartition算子会引入shuffle
coalesce
1 | val rdd1 = rdd.coalesce(2) |
coalesce算子不会引入shuffle
- 增加并行度只能用reparation
- 降低并行度用coalesce
摘自官网:
coalesce:Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.
reparation: Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.
结果收集
first、take和collect
摘自官网:
first:Return the first element of the dataset (similar to take(1)).
take:Return an array with the first n elements of the dataset.
collect:Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.
collect 算子有两处性能隐患,一个是拉取数据过程中引入的网络开销,另一个 Driver 的 OOM(内存溢出,Out of Memory)。
saveAsTextFile
摘自官网:
Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file.
以 saveAsTextFile 为代表的算子,直接通过 Executors 将 RDD 数据分区物化到文件系统,这个过程并不涉及与 Driver 端的任何交互。
由于数据的持久化与 Driver 无关,因此这类算子天然地避开了 collect 算子带来的两个性能隐患。
10 广播变量 & 累加器:共享变量是用来做什么的?
按照创建与使用方式的不同,Spark 提供了两类共享变量,分别是广播变量(Broadcast variables)和累加器(Accumulators)。
广播变量(Broadcast variables)
在 Driver 与 Executors 之间,普通变量的分发与存储,是以 Task 为粒度的,因此,它所引入的网络与内存开销,会成为作业执行性能的一大隐患。在使用广播变量的情况下,数据内容的分发粒度变为以 Executors 为单位。相比前者,广播变量的优势高下立判,它可以大幅度消除前者引入的网络与内存开销,进而在整体上提升作业的执行效率。
广播变量的创建与使用
1 | val list: List[String] = List("Apache", "Spark") |
累加器(Accumulators)
累加器,顾名思义,它的主要作用是全局计数(Global counter)。与单机系统不同,在分布式系统中,我们不能依赖简单的普通变量来完成全局计数,而是必须依赖像累加器这种特殊的数据结构才能达到目的。
累加器的分类
- longAccumulator
- doubleAccumulator
- collectionAccumulator
累加器的创建与使用
1 | val ac = sc.longAccumulator("Empty string") |