Spark基础编程核心思想介绍。参考自官方文档

总览

Spark程序

有一个驱动程序,会运行用户的主要功能,并且在集群上执行各种并行操作。

RDD

RDD是跨集群节点分区的、并且可以并行计算的分布式数据集合。可以通过外部文件系统或者内部集合来创建。可以在内存中持久化一个RDD,并且在并行计算中有效地重用。RDD可以从节点故障中自动恢复

共享变量

当一组任务在不同的节点上并行运行一个函数时,Spark会为函数中的每个变量发送一个副本到各个任务中去(低效)。有时,变量需要在任务与任务、任务与驱动程序间共享。Spark有两种共享变量。

  • 累加器:将工作节点中的值聚合到驱动程序中
  • 广播变量:在各个节点中cache一个只读变量

SparkContext

Spark的主要入口点。使用它可以连接到集群、创建RDD和广播变量。

RDD

RDD是Spark中最核心的概念。

  • 这是一个分布式的容忍错误的能并行操作数据集合
  • RDD是一个分布式的不可变的对象集合,可以包含任意对象。
  • 每个RDD都会被分为多个分区,这些分区运行在不同的节点上。
  • Spark会自动把RDD的数据分发到集群上,并且并行化执行相关操作。
  • 记录如何转化、计算数据的指令列表。

Spark中对数据的所有操作都是创建RDD转化已有RDD调用RDD操作进行求值

创建RDD

创建RDD有两种方式:驱动程序内部的集合,外部系统的数据集(如HDFS, HBase等)。

集合

从集合中创建RDD,会把集合中的元素复制去创建一个可以并行执行的分布式数据集。

Spark可以对这些并行集合进行分区,把这些数据切割到多个分区。Spark会为集群的每个分区运行一个Task。一般,我们需要为集群中的每个CPU分配2-4个分区。默认,Spark会根据集群尝试自动设置分区数。但我们也可以手动地设置分区数。(有的代码中也称partition为slice)

1
2
3
rdd = sc.parallelize([1, 2, 3, 4])
rdd.reduce(lambda x, y: x + y) # 求和
rdd2 = sc.parallelize(['Spark', 'Hadoop', 'ML', 'Python', 'Data'], 2) # 设置2个分区

外部数据集

Spark可以从本地文件系统、HDFS、Cassandra、HBase、Amazon S3等创建数据。支持Text、SequenceFile和任何其他Hadoop的Input Format。

Spark读取文件textFile的一些说明:

  • 本地文件使用本地路径读取文件时,该文件也得在其它的worker node的相同路径上访问到。可以把文件复制过去或者使用network-mounted的文件共享系统。
  • 支持文件 、文件夹、通配符、压缩文件(.gz)。
  • 可以设置分区数。默认,Spark为文件的每一个块创建一个分区。(HDFS的block是128MB)。可以传递一个更大的值来请求更多的分区。

RDD操作

RDD主要有2个操作。

  • 转化操作:由一个RDD生成一个新的RDD(Dataset)。惰性求值
  • 行动操作:会对RDD(Dataset)计算出一个结果或者写到外部系统。会触发实际的计算

Spark会惰性计算这些RDD,只有第一次在一个行动操作中用到时才会真正计算。

一般,Spark会在每次行动操作时重新计算转换RDD。如果想复用,则用persist把RDD持久化缓存下来。可以持久化到内存、到磁盘、在多个节点上进行复制。这样,在下次查询时,集群可以更快地访问。

Spark程序大体步骤如下。

  • 从外部数据创建输入RDD。如textFile
  • 使用转化操作得到新的RDD。如mapfilter
  • 对重用的中间结果RDD进行持久化。如persist
  • 使用行动操作来触发一次并行计算。如count, first
1
2
3
4
5
6
7
8
9
# 从外部创建一个rdd。此时并没有把数据加载到内存中。lines只是一个指向文件的指针
lines = sc.textFile("data.txt")
# 转化。没有进行真实的计算,因为惰性求值
lineLengths = lines.map(lambda s: len(s))
# 持久化
lineLengths.persist()
# 行动。Spark把计算分解为一些任务,这些任务在单独的机器上进行运算。
# 每个机器只做属于自己map的部分,并且在本地reduce。返一个结果给DriverProgram
totalLength = lineLengths.reduce(lambda a, b: a + b)

传递函数给Spark

Spark的API很多都依赖于传递函数来在集群上面运行。有下面3种方式可以使用:

  • Lambda表达式:简单功能。不支持多语句函数、不支持没有返回值的语句。
  • 本地def函数,调用spark。
  • 模块的顶级函数。

代码较多时

1
2
3
4
def my_func(s):
words = s.split(" ")
return len(words)
len_rdd = sc.textFile("word.txt").map(my_func)

对象方法时

千万不要引用self,这样会把整个对象序列化发送过去。而我们其实只需要一个方法或者属性就可以了,我们可以copy一份局部变量传递过去。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class SearchFunctions(object):
def __init__(self, query):
self.query = query
def is_match(self, s):
return self.query in s
def get_matches_func_ref(self, rdd):
"""问题: self.is_match引用了整个self
"""
return rdd.filter(self.is_match)
def get_matches_attr_ref(self, rdd):
"""问题:self.query引用了整个self
"""
return rdd.filter(lambda s: self.query in s)
def get_matches_no_ref(self, rdd):
"""正确做法:使用局部变量
"""
query = self.query
return rdd.filter(lambda s: query in s)

理解闭包

当在集群上面执行代码时,理解变量和方法的范围和生命周期是很重要并且困难的。先看一段代码。

1
2
3
4
5
6
7
8
9
10
counter = 0
rdd = sc.parallelize(data)

# Wrong: Don't do this!!请使用Accumulator
def increment_counter(x):
global counter
counter += x
rdd.foreach(increment_counter)

print("Counter value: ", counter)

执行job的时候,Spark会把处理RDD的操作分解为多个任务,每个任务会由一个执行器executor执行。执行前,Spark会计算任务的闭包。闭包其实就是一些变量和方法,为了计算RDD,它们对于执行器是可见的。Spark会把闭包序列化并且发送到每一个执行器。

发送给执行器的闭包里的变量其实是一个副本,这些执行器程序却看不到驱动器程序节点的内存中的变量(counter),只能看到自己的副本。当foreach函数引用counter的时候,它使用的不是驱动器程序中的counter,而是自己的副本。

本地执行时,有时候foreach函数会在和driver同一个JVM里面执行,那么访问的就是最初的counter,也会对其进行修改。

一般,我们可以使用累加器Accumulator,它可以安全地修改一个变量。闭包不应该修改全局变量。如果要进行全局聚合,则应该使用累加器

在本地模式,rdd.foreach(println)的时候,会打印出所有的RDD。但是在集群模式的时候,执行器会打印出它自己的那一部分,在driver中并没有。如果要在driver中打印,则需要collect().foreach(),但是只适用于数据量小的情况。因为collect会拿出所有的数据。

键值对RDD

详细的知识参见Spark的键值对RDD

Shuffle操作

shuffle说明

Shuffle是Spark中重新分布数据的机制,因此它在分区之间分组也不同。主要是复制数据到执行器和机器上,这个很复杂而且很耗费。

reduceByKey为例,一个key的所有value不一定在同一个partition甚至不在同一个machine,但是却需要把这些values放在一起进行计算。单个任务会在单个分区上执行。为了reduceByKey的reduce任务,需要获得所有的数据。Spark执行一个all-to-all操作,会在所有分区上,查找所有key的所有value,然后跨越分区汇总,去执行reduce任务。这就是shuffle。

shuffle后,分区的顺序和分区里的元素是确定的,但是分区里元素的顺序却不是确定的。可以去设置确定顺序。

性能影响

Shuffle涉及到磁盘IO、数据序列化、网络IO。组织data:一系列map任务;shuffle这些data;聚合data:一系列reduce任务。

一些map的结果会写到内存里,当太大时,会以分区排好序,然后写到单个文件里。在reduce端,task会读取相关的有序的block。

Shuffle操作会占用大量的堆内存,在传输data之前或者之后,都会使用内存中的数据结构去组织这些record。也就是说,在map端,会创建这些structures,在reduce端会生成这些structures。在内存中存不下时,就会写到磁盘中。

Shuffle操作会在磁盘上生成大量的中间文件,并且在RDD不再被使用并且被垃圾回收之前,这些文件都将被一直保留。因为lineage(血统,DAG图)要被重新计算的话,就不会再次shuffle了。如果保留RDD的引用或者垃圾回收不频繁,那么Spark会占用大量的磁盘空间。文件目录可由spark.local.dir配置。

我们可以在Spark的配配置指南中配置各种参数。

RDD持久化

介绍

Spark一个重要的特性是可以在操作的时候持久化缓存RDD到内存中。Persist一个RDD后,每个节点都会将这个RDD计算的所有分区存储在内存中,并且会在后续的计算中进行复用。这可以让future actions快很多(一般是10倍)。缓存迭代算法和快速交互使用的关键工具。

持久化RDD可以使用persistcache方法。会先进行行动操作计算,然后缓存到各个节点的内存中。Spark的缓存是fault-tolerant的,如果RDD的某些分区丢失了,它会自动使用产生这个RDD的transformation进行重新计算。

类别

出于不同的目的,持久化可以设置不同的级别。例如可以缓存到磁盘,缓存到内存(以序列化对象存储,节省空间)等,然后会复制到其他节点上。可以对persist传递StorageLevel对象进行设置缓存级别,而cache方法默认的是MEMORY_ONLY,下面是几个常用的。

  • MEMORY_ONLY(default): RDD作为反序列化的Java对象存储在JVM中。如果not fit in memory,那么一些分区就不会存储,并且会在每次使用的时候重新计算CPU时间快,但耗内存

  • MEMORY_ONLY_SER: RDD作为序列化的Java对象存储在JVM中,每个分区一个字节数组。很省内存,可以选择一个快速的序列化器。CPU计算时间多。只是Java和Scala。

  • MEMORY_AND_DISK: 反序列化的Java对象存在内存中。如果not fit in memory,那么把不适合在磁盘中存放的分区存放在内存中。

  • MEMORY_AND_DISK_SER: 和MEMORY_ONLY_SER差不多,只是存不下的再存储到磁盘中,而不是再重新计算。只是Java和Scala。

名字 占用空间 CPU时间 在内存 在磁盘
MEMORY_ONLY
MEMORY_ONLY_SER
MEMORY_AND_DISK 中等 部分 部分
MEMORY_AND_DISK_SER 部分 部分

所有的类别都通过重新计算丢失的数据来保证容错能力。完整的配置见官方RDD持久化

在Python中,我们会始终序列化要存储的数据,使用的是Pickle,所以不用担心选择serialized level。

在shuffle中,Spark会自动持久化一些中间结果,即使用户没有使用persist。这样是因为,如果一个节点failed,可以避免重新计算整个input。如果要reuse一个RDD的话,推荐使用persist这个RDD。

选择

Spark的不同storage level是为了在CPU和内存的效率之间不同的权衡,按照如下去选择:

  • 如果适合MEMORY_ONLY,那么就这个。CPU效率最高了。RDD的操作速度会很快
  • 如果不适合MEMORY_ONLY,则尽量使用MEMORY_ONLY_SER,然后选个快速序列化库。这样更加节省空间,理论上也能够快速访问。
  • 不要溢写到磁盘。只有这两种才溢写到磁盘:计算数据集非常耗费资源;会过滤掉大量的数据。
  • 如果要快速故障恢复,那么使用复制的storage level。虽然有容错能力,但是复制了,却可以直接继续执行任务,而不需要等待重新计算丢失的分区。

移除数据

Spark会自动监视每个节点上的缓存使用情况,并且以LRU最近最少使用的策略把最老的分区从内存中移除。当然也可以使用rdd.unpersist手动移除。

  • 内存策略:移除分区,再次使用的时候,就需要重新计算。
  • 内存和磁盘策略:移除的分区会写入磁盘。

共享变量

一般,把一个函数f传给Spark的操作,f会在远程集群节点上执行。当函数f在节点上执行的时候,会对所有的变量复制一份副本到该节点,然后利用这些副本单独地工作。对这些副本变量的更新修改不会传回驱动程序,只是修改这些副本。如果要在任务之间支持一般读写共享的变量是很低效的。

Spark支持两种共享变量:

  • 广播变量:用来高效地分发较大的只读对象
  • 累加器:用来对信息进行聚合

广播变量

简介

广播变量可以让程序高效地向所有工作节点发送一个较大的只读值,供一个或多个Spark操作共同使用。

例如较大的只读查询表、机器学习中的一个很大的特征向量,使用广播变量就很方便。这会在每台机器上cache这个变量,而不是发送一个副本。

Spark的Action操作由一组stage组成,由分布式的"shuffle"操作隔离。Spark会自动广播每个stage的tasks需要的common data。这种广播的数据,是以序列化格式缓存的,并且会在每个任务运行之前反序列化

创建广播变量只有下面两种情况有用

  • 多个stage的task需要相同的数据
  • 以反序列化形式缓存数据很重要

存在的问题:

  • Spark会自动把闭包中引用到的变量发送到工作节点。方便但是低效
  • 可能在并行操作中使用同一个变量,但是Spark会为每个操作都发送一次这个变量。
  • 有的变量可能很大,为每个任务都发送一次代价很大。后面再用的话,则还要重新发送

广播变量来解决:

  • 其实就是一个类型为spark.broadcast.BroadCast[T]的变量。
  • 可以在Task中进行访问
  • 广播变量只会发送到节点一次,只读。
  • 一种高效地类似BitTorrent的通信机制。

使用方法

  • 对于一个类型为T的对象,使用SparkContext.broadcast创建一个BroadCast[T]。要可以序列化
  • 通过value属性访问值
  • 变量作为只读值会发送到各个节点一次,在自己的节点上修改不会影响到其他变量。

累加器

简介

累加器可以把工作节点中的数据聚合到驱动程序中。类似于reduce,但是更简单。常用作对事件进行计数。累加器仅仅通过关联和交换的操作来实现累加。可以有效地支持并行操作。Spark本身支持数值类型的累加器,我们也可以添加新的类型。

用法

  • 在驱动器程序中,调用SparkContext.accumulator(initialValue)创建一个有初始值的累加器。返回值为org.apache.spark.Accumulator[T]
  • Spark的闭包里的执行器代码可以用累加器的+=来累加。
  • 驱动器程序中,调用累加器的value属性来访问累加器的值
  • 工作节点上的任务不能访问累加器的值

例子

累加空行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
file = sc.textFile("callsign_file")
# 创建累加器Accumulator[Int]并且赋初值0
blank_line_count = sc.accumulator(0)

def extract_callsigns(line):
"""提取callsigns"""
global blank_line_count # 访问全局变量
if line == "":
blank_line_count += 1 # 累加
return line.split(" ")

callsigns = file.flatMap(extract_callsigns)
callsigns.saveAsTextFile(output_dir + "/callsigns")
# 读取累加器的值 由于惰性求值,只有callsigns的action发生后,才能读取到值
print "Blank lines count: %d" % blank_line_count.value

进行错误计数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 创建用来验证呼号的累加器
valid_signcount = sc.accumulator(0)
invalid_signcount = sc.accumulator(0)

def valid_datesign(sign):
global valid_signcount, invalid_sign_count
if re.match(r"\A\d?[a-zA-Z]{1,2}\d{1,4}[a-zA-Z]{1, 3}\Z", sign):
valid_signcount += 1
return True
else:
invalid_signcount += 1
return False

# 对每个呼号的联系次数进行计数
valid_signs = callsigns.filter(valid_datesign)
contact_count = valid_signs.map(lambda sign: (sign, 1)).reduceByKey(lambda (x, y): x+y)

# 强制求值计算计数
contact_count.count()
if invalid_signcount.value < 0.1 * valid_signcount.value:
contact_count.saveAsTextFile(output_dir + "/contactcount")
else:
print "Too many errors: %d in %d" % (invalid_signcount.value, valid_signcount.value)
1
2
3
4
5
6
7
8
sign_prefixes = sc.broadcast(load_callsign_table())

def process_sign_count(sign_count, sign_prefixes):
country = lookup_country(sign_count[0], sign_prefixes.value)
count = sign_count[1]
return (country, count)

country_contack_counts =