Spark-KV-RDD的API使用
PairRDD及其创建
键值对RDD称为PairRDD,通常用来做聚合计算
。Spark为Pair RDD提供了许多专有的操作。
1 2 3 4
|
pairs = lines.map(lambda line: (line.split(' ')[0], line))
|
转化操作
Pair RDD 的转化操作分为单个和多个RDD的转化操作。
单个Pair RDD转化
reduceByKey(func)
合并
含有相同键的值,也称作聚合
1 2 3 4 5 6
| from operator import add rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) sorted(rdd.reduceByKey(add).collect())
rdd.reduceByKey(lambda x, y: x+y).collect()
|
groupByKey
对具有相同键的值进行分组
。会生成hash分区的RDD
1 2 3 4 5
| rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) sorted(rdd.groupByKey().mapValues(len).collect())
sorted(rdd.groupByKey().mapValues(list).collect()) [('a', [1, 1]), ('b', [1])]
|
说明:如果对键进行分组以便对每个键进行聚合(如sum和average),则用reduceByKey
和aggregateByKey
性能更好
combineByKey
合并具有相同键的值,但是返回不同类型
(K, V) - (K, C)。最常用的聚合操作。
1 2 3 4
| x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) def add(a, b): return a + str(b) sorted(x.combineByKey(str, add, add).collect()) [('a', '11'), ('b', '1')]
|
下面是combineByKey的源码和参数说明
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| def combineByKey[C]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)] = { }
val scores = sc.parallelize( List(("chinese", 88.0) , ("chinese", 90.5) , ("math", 60.0), ("math", 87.0)) ) val avg = scores.combineByKey( (v) => (v, 1), (acc: (Float, Int), V) => (acc._1 + v, acc._2 + 1), (acc1: (Float, Int), acc2: (Float, Int) => (acc1._1 + acc2._1, acc1._2 + acc2._2)) ).map{case (key, value) => (key, value._1 / value._2.toFloat)}
|
1 2 3 4 5 6 7 8 9 10 11
| nums = sc.parallelize([('c', 90), ('m', 95), ('c', 80)]) sum_count = nums.combineByKey( lambda x: (x, 1), lambda acc, x: (acc[0] + x, acc[1] + 1), lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]) )
avg_map = sum_count.mapValues(lambda (sum, count): sum/count).collectAsMap()
avg_map = sum_count.map(lambda key, s_c: (key, s_c[0]/s_c[1])).collectAsMap()
|
mapValues(f)
对每个pair RDD中的每个Value应用一个func,不改变Key。其实也是对value做map
操作。一般我们只想访问pair的值的时候,可以用mapValues
。类似于map{case (x, y): (x, func(y))}
1 2 3 4
| x = sc.parallelize([("a", ["apple", "banana", "lemon"]), ("b", ["grapes"])]) def f(x): return len(x) x.mapValues(f).collect() [('a', 3), ('b', 1)]
|
mapPartitions(f)
是map
的一个变种,都需要传入一个函数f,去处理数据。不同点如下:
- map: f应用于每一个元素。
- mapPartitions: f应用于每一个分区。分区的内容以Iterator[T]传入f,f的输出结果是Iterator[U]。最终RDD的由所有分区经过输入函数处理后的结果得到的。
优点:我们可以为每一个分区做一些初始化操作,而不用为每一个元素做初始化。例如,初始化数据库,次数n。map时:n=元素数量,mapPartitions时:n=分区数量。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| def f(iterator): print ("called f") return map(lambda x: x + 1, iterator) rdd = sc.parallelize([1, 2, 3, 4, 5], 2) rdd.mapPartitions(f).collect() """ called f called f [2, 3, 4, 5, 6] """
rdd = sc.parallelize([1, 2, 3, 4, 5, 6], 2) def f(iterator): print "call f" yield sum(iterator) rdd.mapPartitions(f).collect() """ call f call f [6, 15] """
|
mapPartitionsWithIndex(f)
和mapPartitions
一样,只是多了个partition的index。
1 2 3 4 5 6 7 8 9 10 11 12 13
| rdd = sc.parallelize(["yellow","red","blue","cyan","black"], 3) def g(index, item): return "id-{}, {}".format(index, item) def f(index, iterator): print 'called f' return map(lambda x: g(index, x), iterator) rdd.mapPartitionsWithIndex(f).collect() """ called f called f called f ['id-0, yellow', 'id-1, red', 'id-1, blue', 'id-2, cyan', 'id-2, black'] """
|
repartition(n)
生成新的RDD,分区数目为n。会增加或者减少 RDD的并行度。会对分布式数据集进行shuffle
操作,效率低。如果只是想减少分区数,则使用coalesce
,不会进行shuffle操作。
1 2 3 4 5 6 7
| >>> rdd = sc.parallelize([1,2,3,4,5,6,7], 4) >>> sorted(rdd.glom().collect()) [[1], [2, 3], [4, 5], [6, 7]] >>> len(rdd.repartition(2).glom().collect()) 2 >>> len(rdd.repartition(10).glom().collect()) 10
|
coalesce(n)
合并,减少分区数,默认不执行shuffle操作。
1 2 3 4
| sc.parallelize([1, 2, 3, 4, 5], 3).glom().collect()
sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect()
|
flatMapValues(f)
打平values,[("k", ["v1", "v2"])] -- [("k","v1"), ("k", "v2")]
1 2 3 4
| x = sc.parallelize([("a", ["x", "y", "z"]), ("b", ["p", "r"])]) def f(x): return x x.flatMapValues(f).collect()
|
keys
values
sortByKey
返回一个对键进行排序的RDD。会生成范围分区的RDD
1 2 3 4 5 6 7 8 9 10 11 12
| tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)] sc.parallelize(tmp).sortByKey().first()
sc.parallelize(tmp).sortByKey(True, 1).collect()
sc.parallelize(tmp).sortByKey(True, 2).collect()
tmp2 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), ('lamb', 5)] tmp2.extend([('whose', 6), ('fleece', 7), ('was', 8), ('white', 9)]) sc.parallelize(tmp2).sortByKey(True, 3, keyfunc=lambda k: k.lower()).collect()
|
两个Pair RDD转化
substract
留下在x中却不在y中的元素
1 2 3 4
| x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)]) y = sc.parallelize([("a", 3), ("c", None)]) sorted(x.subtract(y).collect())
|
substractByKey
删掉X中在Y中也存在的Key所包含的所有元素
join
内连接,从x中去和y中一个一个的匹配,(k, v1), (k, v2) -- (k, (v1, v2))
1 2 3 4
| x = sc.parallelize([("a", 1), ("b", 4)]) y = sc.parallelize([("a", 2), ("a", 3)]) sorted(x.join(y).collect())
|
leftOuterJoin
左边RDD的键都有,右边没有的配None
1 2 3 4
| x = sc.parallelize([('a', 1), ('b', 4)]) y = sc.parallelize([('a', 2)]) sorted(x.leftOuterJoin(y).collect())
|
rightOuterJoin
右边RDD的键都有,左边没有的配None
1 2 3 4 5 6
| x = sc.parallelize([('a', 1), ('b', 4)]) y = sc.parallelize([('a', 2)]) sorted(x.rightOuterJoin(y).collect())
sorted(y.rightOuterJoin(x).collect())
|
cogroup
将两个RDD中拥有相同键的value分组到一起,即使两个RDD的V不一样
1 2 3 4 5 6
| x = sc.parallelize([('a', 1), ('b', 4)]) y = sc.parallelize([('a', 2)]) x.cogroup(y).collect()
[(x, tuple(map(list, y))) for x, y in sorted(x.cogroup(y).collect())]
|
行动操作
countByKey
对每个键对应的元素分别计数
1 2 3
| rdd = sc.parallelize([('a', 1), ('b', 1), ('a', 1)]) rdd.countByKey().items()
|
collectAsMap
返回一个map
1 2 3
| m = sc.parallelize([(1, 2), (3, 4)]).collectAsMap() m[1] - 2 m[3] - 4
|
lookup(key)
返回键的RDD中的值列表。如果RDD具有已知的分区器,则通过仅搜索key映射到的分区来高效地完成该操作。
1 2 3 4 5 6 7 8 9 10
| l = range(1000) rdd = sc.parallelize(zip(l, l), 10) rdd.lookup(42)
sorted_rdd = rdd.sortByKey() sorted_rdd.lookup(42)
rdd = sc.parallelize([('a', 'a1'), ('a', 'a2'), ('b', 'b1')]) rdd.lookup('a')[0]
|
聚合操作
当数据是键值对组织的时候,聚合具有相同键的元素是很常见的操作。基础RDD有fold()
, combine()
, reduce()
,Pair RDD有combineByKey()
最常用,reduceByKey()
, foldByKey()
等。
计算均值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| rdd = sc.parallelize([('a', 1), ('a', 3), ('b', 4)]) maprdd = rdd.mapValues(lambda x : (x, 1))
reducerdd = maprdd.reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
reducerdd.mapValues(lambda x : x[0]/x[1]).collect()
nums = sc.parallelize([('c', 90), ('m', 95), ('c', 80)]) sum_count = nums.combineByKey( lambda x: (x, 1), lambda acc, x: (acc[0] + x, acc[1] + 1), lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]) )
avg_map = sum_count.mapValues(lambda (sum, count): sum/count).collectAsMap()
avg_map = sum_count.map(lambda key, s_c: (key, s_c[0]/s_c[1])).collectAsMap()
|
统计单词计数
1 2 3 4 5 6 7
| rdd = sc.textFile('README.md') words = rdd.flatMap(lambda x: x.split(' '))
result = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x+y)
result.top(2)
|
数据分区
分区说明
在分布式程序中,通信的代价是很大的。因此控制数据分布以获得最少的网络传输可以极大地提升整体性能。Spark程序通过控制RDD分区方式来减少通信开销。使用partitionBy
进行分区
- 不需分区:给定RDD只需要被扫描一次
- 需要分区:数据集在多次基于键的操作中使用,比如连接操作。
partitionBy
是转化操作,生成新的RDD,为了多次计算,一般要进行持久化persist
Spark中所有的键值对RDD都可以进行分区。系统会根据一个针对键的函数对元素进行分组。Spark不能显示控制具体每个键落在哪一个工作节点上,但是Spark可以确保同一组的键出现在同一个节点上。
- Hash分区:将一个RDD分成了100个分区,hashcode(key)%100 相同的,会在同一个节点上面
- 范围分区:将key在同一个范围区间内的记录放在同一个节点上
一个简单的例子,内存中有一张很大的用户信息表 -- 即(UserId, UserInfo)组成的RDD,UserInfo包含用户订阅了的所有Topics。还有一张(UserId, LinkInfo)存放着过去5分钟用户浏览的Topic。现在要找出用户浏览了但是没有订阅的Topic数量。
1 2 3 4 5 6 7 8 9 10 11 12
| val sc = new SparkContext(...) val userData = sc.sequenceFile[UserId, UserInfo]("hdfs://...").persist()
def processNewLog(logFileName: String) { val events = sc.sequenceFile[UserId, LinkInfo](logFileName) val joined = userData.join(events) val offTopicVisits = joined.filter{ case (UserId, (UserInfo, LinkInfo)) => !UserInfo.topics.contains(LinkInfo.topic) }.count() print ("浏览了且未订阅的数量:" + offTopicVisits) }
|
这段代码不够高效。因为每次调用processNewLog
都会用join
操作,但我们却不知道数据集是如何分区的。
连接操作,会将两个数据集的所有键的哈希值都求出来,将该哈希值相同的记录通过网络传到同一台机器上,然后在机器上对所有键相同的记录进行操作。
因为userData比events要大的多并且基本不会变化,所以有很多浪费效率的事情:每次调用时都对userData表进行计算hash值计算和跨节点数据混洗。
解决方案:在程序开始的时候,对userData表使用partitionBy()
转换操作,将这张表转换为哈希分区
1 2 3
| val userData = sc.sequenceFile[UserId, UserInfo]("hdfs://...") .partitionBy(new HashPartioner(100)) .persist()
|
events是本地变量,并且只使用一次,所以为它指定分区方式没有什么用处。
userData使用了partitionBy()
,Spark就知道该RDD是根据键的hash值来分区的。在userData.join(events)
时,Spark只会对events进行数据混洗操作。将events中特定UserId的记录发送到userData的对应分区所在的那台机器上。需要网络传输的数据就大大减少了,速度也就显著提升了。
分区相关的操作
Spark的许多操作都有将数据根据跨节点进行混洗的过程。所有这些操作都会从数据分区中获益。类似join
这样的二元操作,预先进行数据分区会导致其中至少一个RDD不发生数据混洗
。
获取好处的操作:cogroup
, groupWith
, join
, leftOuterJoin
, rightOuterJoin
, groupByKey
, reduceByKey
, combineByKey
, lookup
为结果设好分区的操作:cogroup
, groupWith
, join
, leftOuterJoin
, rightOuterJoin
, groupByKey
, reduceByKey
, combineByKey
, partitionBy
, sort
, (mapValues
, flatMapValues
, filter
如果父RDD有分区方式的话)
其他所有的操作的结果都不会存在特定的分区方式。对于二元操作,输出数据的分区方式取决于父RDD的分区方式。默认情况结果会采取hash分区。
PageRank
PageRank的python版本
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
|
""" PageRank算法 author = PuLiming 运行: bin/spark-submit files/pagerank.py data/mllib/pagerank_data.txt 10 """ from __future__ import print_function
import re import sys from operator import add
from pyspark import SparkConf, SparkContext
def compute_contribs(urls, rank): """ 给urls计算 Args: urls: 目标url相邻的urls集合 rank: 目标url的当前rank Returns: url: 相邻urls中的一个url rank: 当前url的新的rank """ num_urls = len(urls) for url in urls: yield (url, rank / num_urls)
def split_url(url_line): """ 把一行url切分开来 Args: url_line: 一行url,如 1 2
Returns: url, neighbor_url """ parts = re.split(r'\s+', url_line) return parts[0], parts[1]
def compute_pagerank(sc, url_data_file, iterations): """ 计算各个page的排名 Args: sc: SparkContext url_data_file: 测试数据文件 iterations: 迭代次数
Returns: status: 成功就返回0 """
lines = sc.textFile(url_data_file).map(lambda line: line.encode('utf8')) links = lines.map(lambda line : split_url(line)).distinct().groupByKey().mapValues(lambda x: list(x)).cache() ranks = lines.map(lambda line : (line[0], 1))
for i in range(iterations): contribs = links.join(ranks).flatMap( lambda url_urls_rank: compute_contribs(url_urls_rank[1][0], url_urls_rank[1][1]) ) ranks = contribs.reduceByKey(add).mapValues(lambda rank : rank * 0.85 + 0.15)
for (link, rank) in ranks.collect(): print("%s has rank %s." % (link, rank)) return 0
if __name__ == '__main__': if len(sys.argv) != 3: print("Usage: python pagerank.py <data.txt> <iterations>", file = sys.stderr) sys.exit(-1)
url_data_file = sys.argv[1] iterations = int(sys.argv[2]) conf = SparkConf().setAppName('PythonPageRank') conf.setMaster('local') sc = SparkContext(conf=conf)
ret = compute_pagerank(sc, url_data_file, iterations) sys.exit(ret)
|
PageRank的scala版本
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| val sc = new SparkContext(...) val links = sc.objectFile[(String, Seq[String])]("links") .partitionBy(new HashPartitioner(100)) .persist()
var ranks = links.mapValues(_ => 1.0)
for (i <- 0 until 10) { val contributions = links.join(ranks).flatMap { case (pageId, (links, rank)) => links.map(dest => (dest, rank / links.size)) } ranks = contributions.reduceByKey(_ + _).mapValues(0.15 + 0.85* _) } ranks.saveAsTextFile("ranks")
|
当前scala版本的PageRank算法的优点:
- links每次都会和ranks发生连接操作,所以一开始就对它进行分区
partitionBy
,就不会通过网络进行数据混洗了,节约了相当可观的网络通信开销
- 对links进行
persist
,留在内存中,每次迭代使用
- 第一次创建ranks,使用
mapValues
保留了父RDD的分区方式,第一次连接开销就会很小
reduceByKey
后已经是分区了,再使用mapValues
分区方式,再次和links进行join
就会更加高效
所以对分区后的RDD尽量使用mapValues
保留父分区方式,而不要用map
丢失分区方式。