Spark基础RDD的常用API总结
RDD基础
RDD是Spark中的核心抽象——弹性分布式数据集(Resilient Distributed Dataset)。其实RDD是分布式的元素集合,是一个不可变的分布式对象集合。每个RDD都会被分为多个分区,这些分区运行在不同的节点上。RDD可以包含任意对象。Spark会自动将这些RDD的数据分发到集群上,并将操作并行化执行。
RDD当做我们通过转化操作构建出来的、记录如何计算数据的指令列表。
对数据的所有操作都是创建RDD、转化已有RDD、调用RDD操作进行求值。
RDD主要有2个操作。
- 转化操作:由一个RDD生成一个新的RDD。惰性求值。
- 行动操作:会对RDD计算出一个结果或者写到外部系统。会触发实际的计算。
Spark会惰性计算这些RDD,只有第一次在一个行动操作中用到时才会真正计算。
Spark的RDD会在每次行动操作时重新计算。如果想复用,则用persist
把RDD持久化缓存下来。
下面是总的大体步骤
- 从外部数据创建输入RDD。如
textFile
- 使用转化操作得到新的RDD。如
map
,filter
- 对重用的中间结果RDD进行持久化。如
persist
- 使用行动操作来触发一次并行计算。如
count
,first
创建RDD
创建RDD主要有两个方法:读取集合,读取外部数据。
1 | # 读取集合 |
转化操作
RDD的转化操作会返回新的RDD,是惰性求值的。即只有真正调用这些RDD的行动操作这些RDD才会被计算。许多转化操作是针对各个元素的,即每次只会操作RDD中的一个元素。
通过转化操作,会从已有RDD派生出新的RDD。Spark会使用谱系图
(lineage graph)来记录这些不同RDD之间的依赖关系。Spark会利用这些关系按需计算每个RDD,或者恢复所丢失的数据。
最常用的转化操作是map()
和filter()
。下面说明一下常用的转化操作。
map(f)
对每个元素使用func函数,将返回值构成新的RDD。不会保留父RDD的分区。
1 | rdd = sc.parallelize(["b", "a", "c"]) |
flatMap(f)
对每个元素使用func函数,然后展平结果。通常用来切分单词
1 | lines = sc.textFile("README.md") |
filter(f)
元素满足f函数,则放到新的RDD里
1 | rdd = sc.parallelize([1, 2, 3, 4, 5]) |
distinct
去重。开销很大,会进行数据混洗。
1 | sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect()) |
union
合并两个RDD。会包含重复的元素
intersection
求两个RDD的共同的元素。会去掉重复的元素
subtract
留下在自己却不在other里面的元素
cartesian
两个RDD的笛卡尔积
行动操作
行动操作会把最终求得的结果返回到驱动程序,或者写入外部存储系统中。
collect
返回RDD中的所有元素。只适用于数据小的情况,因为会把所有数据加载到驱动程序的内存中。通常只在单元测试中使用
count
RDD中元素的个数
countByValue
各元素在RDD中出现的次数,返回一个dictionary
。在pair RDD中有countByKey
方法
1 | sc.parallelize([1, 2, 1, 2, 2]).countByValue().items() |
take(num)
返回RDD中的n个元素。它会首先扫描一个分区,在这个分区里面尽量满足n个元素,不够再去查别的分区。只能用于数据量小的情况下。得到的顺序可能和你预期的不一样
takeOrdered(num, key=None)
获取n个元素,按照升序或者按照传入的key function。只适用于数据小的RDD
1 | sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7]).takeOrdered(6) |
top(num, key=None)
从RDD只获取前N个元素。降序排列。只适用于数据量小的RDD
1 | sc.parallelize([2, 3, 4, 5, 6]).top(2) |
reduce(f)
并行整合RDD中的所有数据,得到一个结果。接收一个f函数。目前在本地reduce partitions。返回结果类型不变。
1 | from operator import add |
fold(zeroValue, op)
和reduce一样,但是需要提供初始值。op(t1, t2),t1可以变,t2不能变
1 | from operator import add |
aggregate(zeroValue, seqOp, combOp)
聚合所有分区的元素,然后得到一个结果。和reduce
相似,但是通常返回不同的类型。
1 | seqOp = (lambda x, y: (x[0] + y, x[1] + 1)) # 累加 |
foreach(f)
对rdd中的每个元素使用f函数
1 | def f(x): |
glom
将分区中的元素合并到一个list
1 | rdd = sc.parallelize([1, 2, 3, 4, 5], 2) |