Spark基础编程核心思想介绍。参考自官方文档
总览
Spark程序
有一个驱动程序,会运行用户的主要功能,并且在集群上执行各种并行操作。
RDD
RDD是跨集群节点分区
的、并且可以并行计算
的分布式数据集合。可以通过外部文件系统或者内部集合来创建。可以在内存中持久化
一个RDD,并且在并行计算中有效地重用
。RDD可以从节点故障中自动恢复
。
共享变量
当一组任务在不同的节点上并行运行一个函数时,Spark会为函数中的每个变量发送一个副本
到各个任务中去(低效)。有时,变量需要在任务与任务、任务与驱动程序间共享。Spark有两种共享变量。
- 累加器:将工作节点中的值聚合到驱动程序中
- 广播变量:在各个节点中cache一个
只读变量
SparkContext
Spark的主要入口点。使用它可以连接到集群、创建RDD和广播变量。
RDD
RDD是Spark中最核心的概念。
- 这是一个
分布式的
、容忍错误的
、能并行操作
的数据集合。 - RDD是一个分布式的不可变的对象集合,可以包含任意对象。
- 每个RDD都会被分为多个分区,这些分区运行在不同的节点上。
- Spark会自动把RDD的数据分发到集群上,并且并行化执行相关操作。
- 记录如何转化、计算数据的指令列表。
Spark中对数据的所有操作都是创建RDD、转化已有RDD、调用RDD操作进行求值。
创建RDD
创建RDD有两种方式:驱动程序内部的集合,外部系统的数据集(如HDFS, HBase等)。
集合
从集合中创建RDD,会把集合中的元素复制去创建一个可以并行执行的分布式数据集。
Spark可以对这些并行集合进行分区,把这些数据切割到多个分区。Spark会为集群的每个分区运行一个Task。一般,我们需要为集群中的每个CPU分配2-4个分区。默认,Spark会根据集群尝试自动设置分区数。但我们也可以手动地设置分区数。(有的代码中也称partition为slice)
1 | rdd = sc.parallelize([1, 2, 3, 4]) |
外部数据集
Spark可以从本地文件系统、HDFS、Cassandra、HBase、Amazon S3等创建数据。支持Text、SequenceFile和任何其他Hadoop的Input Format。
Spark读取文件textFile
的一些说明:
- 本地文件使用本地路径读取文件时,该文件也得在其它的worker node的相同路径上访问到。可以把文件复制过去或者使用network-mounted的文件共享系统。
- 支持文件 、文件夹、通配符、压缩文件(.gz)。
- 可以设置分区数。默认,Spark为文件的每一个块创建一个分区。(HDFS的block是128MB)。可以传递一个更大的值来请求更多的分区。
RDD操作
RDD主要有2个操作。
- 转化操作:由一个RDD生成一个新的RDD(Dataset)。惰性求值。
- 行动操作:会对RDD(Dataset)计算出一个结果或者写到外部系统。会触发实际的计算。
Spark会惰性计算这些RDD,只有第一次在一个行动操作中用到时才会真正计算。
一般,Spark会在每次行动操作时重新计算转换RDD。如果想复用,则用persist
把RDD持久化缓存下来。可以持久化到内存、到磁盘、在多个节点上进行复制。这样,在下次查询时,集群可以更快地访问。
Spark程序大体步骤如下。
- 从外部数据创建输入RDD。如
textFile
- 使用转化操作得到新的RDD。如
map
,filter
- 对重用的中间结果RDD进行持久化。如
persist
- 使用行动操作来触发一次并行计算。如
count
,first
1 | # 从外部创建一个rdd。此时并没有把数据加载到内存中。lines只是一个指向文件的指针 |
传递函数给Spark
Spark的API很多都依赖于传递函数来在集群上面运行。有下面3种方式可以使用:
- Lambda表达式:简单功能。不支持多语句函数、不支持没有返回值的语句。
- 本地def函数,调用spark。
- 模块的顶级函数。
代码较多时
1 | def my_func(s): |
对象方法时
千万不要引用self,这样会把整个对象序列化发送过去。而我们其实只需要一个方法或者属性就可以了,我们可以copy一份局部变量传递过去。
1 | class SearchFunctions(object): |
理解闭包
当在集群上面执行代码时,理解变量和方法的范围和生命周期是很重要并且困难的。先看一段代码。
1 | counter = 0 |
执行job的时候,Spark会把处理RDD的操作分解为多个任务,每个任务会由一个执行器executor
执行。执行前,Spark会计算任务的闭包。闭包其实就是一些变量和方法,为了计算RDD,它们对于执行器是可见的。Spark会把闭包序列化并且发送到每一个执行器。
发送给执行器的闭包里的变量其实是一个副本,这些执行器程序却看不到驱动器程序节点的内存中的变量(counter),只能看到自己的副本。当foreach函数引用counter的时候,它使用的不是驱动器程序中的counter,而是自己的副本。
本地执行时,有时候foreach函数会在和driver同一个JVM里面执行,那么访问的就是最初的counter,也会对其进行修改。
一般,我们可以使用累加器Accumulator
,它可以安全地修改一个变量。闭包不应该修改全局变量。如果要进行全局聚合,则应该使用累加器。
在本地模式,rdd.foreach(println)的时候,会打印出所有的RDD。但是在集群模式的时候,执行器会打印出它自己的那一部分,在driver中并没有。如果要在driver中打印,则需要collect().foreach(),但是只适用于数据量小的情况。因为collect会拿出所有的数据。
键值对RDD
详细的知识参见Spark的键值对RDD。
Shuffle操作
shuffle说明
Shuffle是Spark中重新分布数据的机制,因此它在分区之间分组也不同。主要是复制数据到执行器和机器上,这个很复杂而且很耗费。
以reduceByKey
为例,一个key的所有value不一定在同一个partition甚至不在同一个machine,但是却需要把这些values放在一起进行计算。单个任务会在单个分区上执行。为了reduceByKey的reduce任务,需要获得所有的数据。Spark执行一个all-to-all
操作,会在所有分区上,查找所有key的所有value,然后跨越分区汇总,去执行reduce任务。这就是shuffle。
shuffle后,分区的顺序和分区里的元素是确定的,但是分区里元素的顺序却不是确定的。可以去设置确定顺序。
性能影响
Shuffle涉及到磁盘IO、数据序列化、网络IO。组织data:一系列map任务;shuffle这些data;聚合data:一系列reduce任务。
一些map的结果会写到内存里,当太大时,会以分区排好序,然后写到单个文件里。在reduce端,task会读取相关的有序的block。
Shuffle操作会占用大量的堆内存,在传输data之前或者之后,都会使用内存中的数据结构去组织这些record。也就是说,在map端,会创建这些structures,在reduce端会生成这些structures。在内存中存不下时,就会写到磁盘中。
Shuffle操作会在磁盘上生成大量的中间文件,并且在RDD不再被使用并且被垃圾回收之前,这些文件都将被一直保留。因为lineage(血统,DAG图)要被重新计算的话,就不会再次shuffle了。如果保留RDD的引用或者垃圾回收不频繁,那么Spark会占用大量的磁盘空间。文件目录可由spark.local.dir
配置。
我们可以在Spark的配配置指南中配置各种参数。
RDD持久化
介绍
Spark一个重要的特性是可以在操作的时候持久化缓存RDD到内存中。Persist
一个RDD后,每个节点都会将这个RDD计算的所有分区存储在内存中,并且会在后续的计算中进行复用。这可以让future actions快很多(一般是10倍)。缓存是迭代算法
和快速交互使用的关键工具。
持久化RDD可以使用persist
或cache
方法。会先进行行动操作计算,然后缓存到各个节点的内存中。Spark的缓存是fault-tolerant
的,如果RDD的某些分区丢失了,它会自动使用产生这个RDD的transformation进行重新计算。
类别
出于不同的目的,持久化可以设置不同的级别。例如可以缓存到磁盘,缓存到内存(以序列化对象存储,节省空间)等,然后会复制到其他节点上。可以对persist
传递StorageLevel
对象进行设置缓存级别,而cache
方法默认的是MEMORY_ONLY,下面是几个常用的。
MEMORY_ONLY(default): RDD作为
反序列化的
Java对象存储在JVM中。如果not fit in memory,那么一些分区就不会存储,并且会在每次使用的时候重新计算。CPU时间快,但耗内存。MEMORY_ONLY_SER: RDD作为
序列化的
Java对象存储在JVM中,每个分区一个字节数组。很省内存,可以选择一个快速的序列化器。CPU计算时间多。只是Java和Scala。MEMORY_AND_DISK:
反序列化的
Java对象存在内存中。如果not fit in memory,那么把不适合在磁盘中存放的分区存放在内存中。MEMORY_AND_DISK_SER: 和MEMORY_ONLY_SER差不多,只是存不下的再存储到磁盘中,而不是再重新计算。只是Java和Scala。
名字 | 占用空间 | CPU时间 | 在内存 | 在磁盘 |
---|---|---|---|---|
MEMORY_ONLY | 高 | 低 | 是 | 否 |
MEMORY_ONLY_SER | 低 | 高 | 是 | 否 |
MEMORY_AND_DISK | 高 | 中等 | 部分 | 部分 |
MEMORY_AND_DISK_SER | 低 | 高 | 部分 | 部分 |
所有的类别都通过重新计算
丢失的数据来保证容错能力
。完整的配置见官方RDD持久化。
在Python中,我们会始终序列化要存储的数据,使用的是Pickle,所以不用担心选择serialized level。
在shuffle中,Spark会自动持久化一些中间结果,即使用户没有使用persist
。这样是因为,如果一个节点failed,可以避免重新计算整个input。如果要reuse
一个RDD的话,推荐使用persist
这个RDD。
选择
Spark的不同storage level是为了在CPU和内存的效率之间不同的权衡,按照如下去选择:
- 如果适合
MEMORY_ONLY
,那么就这个。CPU效率最高了。RDD的操作速度会很快! - 如果不适合MEMORY_ONLY,则尽量使用
MEMORY_ONLY_SER
,然后选个快速序列化库。这样更加节省空间,理论上也能够快速访问。 - 不要溢写到磁盘。只有这两种才溢写到磁盘:计算数据集非常耗费资源;会过滤掉大量的数据。
- 如果要快速故障恢复,那么使用复制的storage level。虽然有容错能力,但是复制了,却可以直接继续执行任务,而不需要等待重新计算丢失的分区。
移除数据
Spark会自动监视每个节点上的缓存使用情况,并且以LRU
最近最少使用的策略把最老的分区从内存中移除。当然也可以使用rdd.unpersist
手动移除。
- 内存策略:移除分区,再次使用的时候,就需要重新计算。
- 内存和磁盘策略:移除的分区会写入磁盘。
共享变量
一般,把一个函数f传给Spark的操作,f会在远程集群节点上执行。当函数f在节点上执行的时候,会对所有的变量复制一份副本到该节点,然后利用这些副本单独地工作。对这些副本变量的更新修改不会传回驱动程序,只是修改这些副本。如果要在任务之间支持一般读写共享的变量是很低效的。
Spark支持两种共享变量:
- 广播变量:用来高效地分发较大的只读对象
- 累加器:用来对信息进行聚合
广播变量
简介
广播变量可以让程序高效地向所有工作节点发送一个较大的只读值,供一个或多个Spark操作共同使用。
例如较大的只读查询表、机器学习中的一个很大的特征向量,使用广播变量就很方便。这会在每台机器上cache这个变量,而不是发送一个副本。
Spark的Action操作由一组stage组成,由分布式的"shuffle"操作隔离。Spark会自动广播每个stage的tasks需要的common data。这种广播的数据,是以序列化格式缓存的,并且会在每个任务运行之前反序列化。
创建广播变量只有下面两种情况有用:
- 多个stage的task需要相同的数据
- 以反序列化形式缓存数据很重要
存在的问题:
- Spark会自动把闭包中引用到的变量发送到工作节点。方便但是低效。
- 可能在并行操作中使用同一个变量,但是Spark会为每个操作都发送一次这个变量。
- 有的变量可能很大,为每个任务都发送一次代价很大。后面再用的话,则还要重新发送。
广播变量来解决:
- 其实就是一个类型为
spark.broadcast.BroadCast[T]
的变量。 - 可以在Task中进行访问。
- 广播变量只会发送到节点一次,只读。
- 一种高效地类似BitTorrent的通信机制。
使用方法
- 对于一个类型为T的对象,使用
SparkContext.broadcast
创建一个BroadCast[T]
。要可以序列化 - 通过
value
属性访问值 - 变量作为只读值会发送到各个节点一次,在自己的节点上修改不会影响到其他变量。
累加器
简介
累加器可以把工作节点中的数据聚合到驱动程序中。类似于reduce
,但是更简单。常用作对事件进行计数。累加器仅仅通过关联和交换的操作来实现累加
。可以有效地支持并行操作。Spark本身支持数值类型的累加器,我们也可以添加新的类型。
用法
- 在驱动器程序中,调用
SparkContext.accumulator(initialValue)
创建一个有初始值的累加器。返回值为org.apache.spark.Accumulator[T]
- Spark的闭包里的执行器代码可以用累加器的
+=
来累加。 - 驱动器程序中,调用累加器的
value
属性来访问累加器的值 - 工作节点上的任务不能访问累加器的值
例子
累加空行
1 | file = sc.textFile("callsign_file") |
进行错误计数
1 | # 创建用来验证呼号的累加器 |
1 | sign_prefixes = sc.broadcast(load_callsign_table()) |