Spark基础RDD的常用API总结

RDD基础

RDD是Spark中的核心抽象——弹性分布式数据集(Resilient Distributed Dataset)。其实RDD是分布式的元素集合,是一个不可变的分布式对象集合。每个RDD都会被分为多个分区,这些分区运行在不同的节点上。RDD可以包含任意对象。Spark会自动将这些RDD的数据分发到集群上,并将操作并行化执行

RDD当做我们通过转化操作构建出来的、记录如何计算数据的指令列表。

对数据的所有操作都是创建RDD转化已有RDD调用RDD操作进行求值

RDD主要有2个操作。

  • 转化操作:由一个RDD生成一个新的RDD。惰性求值。
  • 行动操作:会对RDD计算出一个结果或者写到外部系统。会触发实际的计算

Spark会惰性计算这些RDD,只有第一次在一个行动操作中用到时才会真正计算。

Spark的RDD会在每次行动操作时重新计算。如果想复用,则用persist把RDD持久化缓存下来。

下面是总的大体步骤

  • 从外部数据创建输入RDD。如textFile
  • 使用转化操作得到新的RDD。如mapfilter
  • 对重用的中间结果RDD进行持久化。如persist
  • 使用行动操作来触发一次并行计算。如count, first

创建RDD

创建RDD主要有两个方法:读取集合,读取外部数据。

1
2
3
4
# 读取集合
words = sc.parallelize(["hello", "spark", "good", "study"])
# 读取外部数据
lines = sc.textFile("README.md")

转化操作

RDD的转化操作会返回新的RDD,是惰性求值的。即只有真正调用这些RDD的行动操作这些RDD才会被计算。许多转化操作是针对各个元素的,即每次只会操作RDD中的一个元素。

通过转化操作,会从已有RDD派生出新的RDD。Spark会使用谱系图(lineage graph)来记录这些不同RDD之间的依赖关系。Spark会利用这些关系按需计算每个RDD,或者恢复所丢失的数据。

最常用的转化操作是map()filter()。下面说明一下常用的转化操作。

map(f)

对每个元素使用func函数,将返回值构成新的RDD。不会保留父RDD的分区。

1
2
3
4
5
6
7
rdd =  sc.parallelize(["b", "a", "c"])
rddnew = rdd.map(lambda x: (x, 1))
# [('b', 1), ('a', 1), ('c', 1)]

# 可使用sorted()进行排序
sorted(rddnew.collect())
# [('a', 1), ('b', 1), ('c', 1)]

flatMap(f)

对每个元素使用func函数,然后展平结果。通常用来切分单词

1
2
3
4
5
6
7
lines = sc.textFile("README.md")
# 104个
words = lines.flatMap(lambda line : line.split(" "))
# 568个
rdd = sc.parallelize([2, 3, 4])
rdd2 = rdd.flatMap(lambda x: range(1, x)) # range(1, x) 生成1-x的数,不包括x
# [1, 1, 2, 1, 2, 3]

filter(f)

元素满足f函数,则放到新的RDD里

1
2
3
rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd.filter(lambda x: x % 2 == 0).collect()
# [2, 4]

distinct

去重。开销很大,会进行数据混洗。

1
2
sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect())
# [1, 2, 3]

union

合并两个RDD。会包含重复的元素

intersection

求两个RDD的共同的元素。会去掉重复的元素

subtract

留下在自己却不在other里面的元素

cartesian

两个RDD的笛卡尔积

行动操作

行动操作会把最终求得的结果返回到驱动程序,或者写入外部存储系统中。

collect

返回RDD中的所有元素。只适用于数据小的情况,因为会把所有数据加载到驱动程序的内存中。通常只在单元测试中使用

count

RDD中元素的个数

countByValue

各元素在RDD中出现的次数,返回一个dictionary。在pair RDD中有countByKey方法

1
2
sc.parallelize([1, 2, 1, 2, 2]).countByValue().items()
# [(1, 2), (2, 3)]

take(num)

返回RDD中的n个元素。它会首先扫描一个分区,在这个分区里面尽量满足n个元素,不够再去查别的分区。只能用于数据量小的情况下。得到的顺序可能和你预期的不一样

takeOrdered(num, key=None)

获取n个元素,按照升序或者按照传入的key function。只适用于数据小的RDD

1
2
3
4
sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7]).takeOrdered(6)
# [1, 2, 3, 4, 5, 6]
sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7], 2).takeOrdered(6, key=lambda x: -x)
# [10, 9, 7, 6, 5, 4]

top(num, key=None)

从RDD只获取前N个元素。降序排列。只适用于数据量小的RDD

1
2
3
4
sc.parallelize([2, 3, 4, 5, 6]).top(2)
# [6, 5]
sc.parallelize([10, 4, 2, 12, 3]).top(3, key=str)
# [4, 3, 2]

reduce(f)

并行整合RDD中的所有数据,得到一个结果。接收一个f函数。目前在本地reduce partitions。返回结果类型不变。

1
2
3
4
from operator import add
sc.parallelize([1, 2, 3, 4, 5]).reduce(add)
sc.parallelize([1, 2, 3, 4, 5]).reduce(lambda x, y: x+y)
# 15

fold(zeroValue, op)

和reduce一样,但是需要提供初始值。op(t1, t2),t1可以变,t2不能变

1
2
3
from operator import add
sc.parallelize([1, 2, 3, 4, 5]).fold(0, add)
# 15

aggregate(zeroValue, seqOp, combOp)

聚合所有分区的元素,然后得到一个结果。和reduce相似,但是通常返回不同的类型

1
2
3
4
seqOp = (lambda x, y: (x[0] + y, x[1] + 1))			# 累加
combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1])) # combine多个
sc.parallelize([1, 2, 3, 4]).aggregate((0, 0), seqOp, combOp)
# (10, 4)

foreach(f)

对rdd中的每个元素使用f函数

1
2
3
def f(x):
print (x)
sc.parallelize([1, 2, 3, 4]).foreach(f)

glom

将分区中的元素合并到一个list

1
2
3
rdd = sc.parallelize([1, 2, 3, 4, 5], 2)
rdd.glom().collect()
# [[1, 2], [3, 4, 5]]