Spark-KV-RDD的API使用

PairRDD及其创建

键值对RDD称为PairRDD,通常用来做聚合计算。Spark为Pair RDD提供了许多专有的操作。

1
2
3
4
# 创建pair rdd: map 或者 读取键值对格式自动转成pairrdd

# 每行的第一个单词作为key,line作为value
pairs = lines.map(lambda line: (line.split(' ')[0], line))

转化操作

Pair RDD 的转化操作分为单个和多个RDD的转化操作。

单个Pair RDD转化

reduceByKey(func)

合并含有相同键的值,也称作聚合

1
2
3
4
5
6
from operator import add
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
sorted(rdd.reduceByKey(add).collect())
# [('a', 2), ('b', 1)]
# 这种写法也可以
rdd.reduceByKey(lambda x, y: x+y).collect()

groupByKey

对具有相同键的值进行分组。会生成hash分区的RDD

1
2
3
4
5
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
sorted(rdd.groupByKey().mapValues(len).collect())
# [('a', 2), ('b', 1)]
sorted(rdd.groupByKey().mapValues(list).collect())
[('a', [1, 1]), ('b', [1])]

说明:如果对键进行分组以便对每个键进行聚合(如sum和average),则用reduceByKeyaggregateByKey性能更好

combineByKey

合并具有相同键的值,但是返回不同类型 (K, V) - (K, C)。最常用的聚合操作。

1
2
3
4
x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
def add(a, b): return a + str(b)
sorted(x.combineByKey(str, add, add).collect())
[('a', '11'), ('b', '1')]

下面是combineByKey的源码和参数说明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def combineByKey[C](
createCombiner: V => C, // V => C的转变 / 初始值 / 创建one-element的list
mergeValue: (C, V) => C, // 将原V累加到新的C
mergeCombiners: (C, C) => C, // 两个C合并成一个
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null): RDD[(K, C)] = {
//实现略
}
// 求平均值
val scores = sc.parallelize(
List(("chinese", 88.0) , ("chinese", 90.5) , ("math", 60.0), ("math", 87.0))
)
val avg = scores.combineByKey(
(v) => (v, 1),
(acc: (Float, Int), V) => (acc._1 + v, acc._2 + 1),
(acc1: (Float, Int), acc2: (Float, Int) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
).map{case (key, value) => (key, value._1 / value._2.toFloat)}
1
2
3
4
5
6
7
8
9
10
11
# 求平均值
nums = sc.parallelize([('c', 90), ('m', 95), ('c', 80)])
sum_count = nums.combineByKey(
lambda x: (x, 1),
lambda acc, x: (acc[0] + x, acc[1] + 1),
lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])
)
# [('c', (170, 2)), ('m', (95, 1))]
avg_map = sum_count.mapValues(lambda (sum, count): sum/count).collectAsMap()
# {'c': 85, 'm': 95}
avg_map = sum_count.map(lambda key, s_c: (key, s_c[0]/s_c[1])).collectAsMap()

mapValues(f)

对每个pair RDD中的每个Value应用一个func,不改变Key。其实也是对value做map操作。一般我们只想访问pair的值的时候,可以用mapValues。类似于map{case (x, y): (x, func(y))}

1
2
3
4
x = sc.parallelize([("a", ["apple", "banana", "lemon"]), ("b", ["grapes"])])
def f(x): return len(x)
x.mapValues(f).collect()
[('a', 3), ('b', 1)]

mapPartitions(f)

map的一个变种,都需要传入一个函数f,去处理数据。不同点如下:

  • map: f应用于每一个元素。
  • mapPartitions: f应用于每一个分区。分区的内容以Iterator[T]传入f,f的输出结果是Iterator[U]。最终RDD的由所有分区经过输入函数处理后的结果得到的。

优点:我们可以为每一个分区做一些初始化操作,而不用为每一个元素做初始化。例如,初始化数据库,次数n。map时:n=元素数量,mapPartitions时:n=分区数量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 1. 每个元素加1
def f(iterator):
print ("called f")
return map(lambda x: x + 1, iterator)
rdd = sc.parallelize([1, 2, 3, 4, 5], 2)
rdd.mapPartitions(f).collect() # 只调用2次f
"""
called f
called f
[2, 3, 4, 5, 6]
"""

# 2. 分区求和
rdd = sc.parallelize([1, 2, 3, 4, 5, 6], 2)
def f(iterator):
print "call f"
yield sum(iterator)
rdd.mapPartitions(f).collect() # 调用2次f,分区求和
"""
call f
call f
[6, 15]
"""

mapPartitionsWithIndex(f)

mapPartitions一样,只是多了个partition的index。

1
2
3
4
5
6
7
8
9
10
11
12
13
rdd = sc.parallelize(["yellow","red","blue","cyan","black"], 3)
def g(index, item):
return "id-{}, {}".format(index, item)
def f(index, iterator):
print 'called f'
return map(lambda x: g(index, x), iterator)
rdd.mapPartitionsWithIndex(f).collect()
"""
called f
called f
called f
['id-0, yellow', 'id-1, red', 'id-1, blue', 'id-2, cyan', 'id-2, black']
"""

repartition(n)

生成新的RDD,分区数目为n。会增加或者减少 RDD的并行度。会对分布式数据集进行shuffle操作,效率低。如果只是想减少分区数,则使用coalesce,不会进行shuffle操作。

1
2
3
4
5
6
7
>>> rdd = sc.parallelize([1,2,3,4,5,6,7], 4)
>>> sorted(rdd.glom().collect())
[[1], [2, 3], [4, 5], [6, 7]]
>>> len(rdd.repartition(2).glom().collect())
2
>>> len(rdd.repartition(10).glom().collect())
10

coalesce(n)

合并,减少分区数,默认不执行shuffle操作。

1
2
3
4
sc.parallelize([1, 2, 3, 4, 5], 3).glom().collect()
# [[1], [2, 3], [4, 5]]
sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect()
# [[1, 2, 3, 4, 5]]

flatMapValues(f)

打平values,[("k", ["v1", "v2"])] -- [("k","v1"), ("k", "v2")]

1
2
3
4
x = sc.parallelize([("a", ["x", "y", "z"]), ("b", ["p", "r"])])
def f(x): return x
x.flatMapValues(f).collect()
# [('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')]

keys

values

sortByKey

返回一个对键进行排序的RDD。会生成范围分区的RDD

1
2
3
4
5
6
7
8
9
10
11
12
tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
sc.parallelize(tmp).sortByKey().first()
# ('1', 3)
sc.parallelize(tmp).sortByKey(True, 1).collect()
# [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
sc.parallelize(tmp).sortByKey(True, 2).collect()
# [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]

tmp2 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), ('lamb', 5)]
tmp2.extend([('whose', 6), ('fleece', 7), ('was', 8), ('white', 9)])
sc.parallelize(tmp2).sortByKey(True, 3, keyfunc=lambda k: k.lower()).collect()
# [('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5),...('white', 9), ('whose', 6)]

两个Pair RDD转化

substract

留下在x中却不在y中的元素

1
2
3
4
x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)])
y = sc.parallelize([("a", 3), ("c", None)])
sorted(x.subtract(y).collect())
#[('b', 4), ('b', 5)]

substractByKey

删掉X中在Y中也存在的Key所包含的所有元素

join

内连接,从x中去和y中一个一个的匹配,(k, v1), (k, v2) -- (k, (v1, v2))

1
2
3
4
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2), ("a", 3)])
sorted(x.join(y).collect())
# [('a', (1, 2)), ('a', (1, 3))]

leftOuterJoin

左边RDD的键都有,右边没有的配None

1
2
3
4
x = sc.parallelize([('a', 1), ('b', 4)])
y = sc.parallelize([('a', 2)])
sorted(x.leftOuterJoin(y).collect())
# [('a', (1, 2)), ('b', (4, None))]

rightOuterJoin

右边RDD的键都有,左边没有的配None

1
2
3
4
5
6
x = sc.parallelize([('a', 1), ('b', 4)])
y = sc.parallelize([('a', 2)])
sorted(x.rightOuterJoin(y).collect())
# [('a', (1, 2))]
sorted(y.rightOuterJoin(x).collect())
# [('a', (2, 1)), ('b', (None, 4))]

cogroup

将两个RDD中拥有相同键的value分组到一起,即使两个RDD的V不一样

1
2
3
4
5
6
x = sc.parallelize([('a', 1), ('b', 4)])
y = sc.parallelize([('a', 2)])
x.cogroup(y).collect()
# 上面显示的是16进制
[(x, tuple(map(list, y))) for x, y in sorted(x.cogroup(y).collect())]
# [('a', ([1], [2])), ('b', ([4], []))]

行动操作

countByKey

对每个键对应的元素分别计数

1
2
3
rdd = sc.parallelize([('a', 1), ('b', 1), ('a', 1)])
rdd.countByKey().items() # 转换成一个dict,再取所有元素
# [('a', 2), ('b', 1)]

collectAsMap

返回一个map

1
2
3
m = sc.parallelize([(1, 2), (3, 4)]).collectAsMap()
m[1] - 2 # 当做map操作即可
m[3] - 4

lookup(key)

返回键的RDD中的值列表。如果RDD具有已知的分区器,则通过仅搜索key映射到的分区来高效地完成该操作。

1
2
3
4
5
6
7
8
9
10
l = range(1000) # 1,2,3,...,1000
rdd = sc.parallelize(zip(l, l), 10) # 键和值一样,10个数据分片,10个并行度,10个task
rdd.lookup(42) # slow
# [42]
sorted_rdd = rdd.sortByKey()
sorted_rdd.lookup(42) # fast
# [42]
rdd = sc.parallelize([('a', 'a1'), ('a', 'a2'), ('b', 'b1')])
rdd.lookup('a')[0]
# 'a1'

聚合操作

当数据是键值对组织的时候,聚合具有相同键的元素是很常见的操作。基础RDD有fold(), combine(), reduce(),Pair RDD有combineByKey()最常用,reduceByKey(), foldByKey()等。

计算均值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
## 方法一:mapValues和reduceByKey
rdd = sc.parallelize([('a', 1), ('a', 3), ('b', 4)])
maprdd = rdd.mapValues(lambda x : (x, 1))
# [('a', (1, 1)), ('a', (3, 1)), ('b', (4, 1))]
reducerdd = maprdd.reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
# [('a', (4, 2)), ('b', (4, 1))]
reducerdd.mapValues(lambda x : x[0]/x[1]).collect()
# [('a', 2), ('b', 4)]

## 方法二 combineByKey 最常用的
nums = sc.parallelize([('c', 90), ('m', 95), ('c', 80)])
sum_count = nums.combineByKey(
lambda x: (x, 1),
lambda acc, x: (acc[0] + x, acc[1] + 1),
lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])
)
# [('c', (170, 2)), ('m', (95, 1))]
avg_map = sum_count.mapValues(lambda (sum, count): sum/count).collectAsMap()
# {'c': 85, 'm': 95}
avg_map = sum_count.map(lambda key, s_c: (key, s_c[0]/s_c[1])).collectAsMap()

统计单词计数

1
2
3
4
5
6
7
rdd = sc.textFile('README.md')
words = rdd.flatMap(lambda x: x.split(' '))
# 568个
result = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x+y)
# 289
result.top(2)
# [('your', 1), ('you', 4)]

数据分区

分区说明

在分布式程序中,通信的代价是很大的。因此控制数据分布以获得最少的网络传输可以极大地提升整体性能。Spark程序通过控制RDD分区方式来减少通信开销。使用partitionBy进行分区

  • 不需分区:给定RDD只需要被扫描一次
  • 需要分区:数据集在多次基于键的操作中使用,比如连接操作。partitionBy转化操作,生成新的RDD,为了多次计算,一般要进行持久化persist

Spark中所有的键值对RDD都可以进行分区。系统会根据一个针对键的函数对元素进行分组。Spark不能显示控制具体每个键落在哪一个工作节点上,但是Spark可以确保同一组的键出现在同一个节点上。

  • Hash分区:将一个RDD分成了100个分区,hashcode(key)%100 相同的,会在同一个节点上面
  • 范围分区:将key在同一个范围区间内的记录放在同一个节点上

一个简单的例子,内存中有一张很大的用户信息表 -- 即(UserId, UserInfo)组成的RDD,UserInfo包含用户订阅了的所有Topics。还有一张(UserId, LinkInfo)存放着过去5分钟用户浏览的Topic。现在要找出用户浏览了但是没有订阅的Topic数量。

1
2
3
4
5
6
7
8
9
10
11
12
val sc = new SparkContext(...)
val userData = sc.sequenceFile[UserId, UserInfo]("hdfs://...").persist()

def processNewLog(logFileName: String) {
val events = sc.sequenceFile[UserId, LinkInfo](logFileName)
val joined = userData.join(events) // (UserId, (UserInfo, LinkInfo))
val offTopicVisits = joined.filter{
case (UserId, (UserInfo, LinkInfo)) =>
!UserInfo.topics.contains(LinkInfo.topic)
}.count()
print ("浏览了且未订阅的数量:" + offTopicVisits)
}

这段代码不够高效。因为每次调用processNewLog都会用join操作,但我们却不知道数据集是如何分区的。

连接操作,会将两个数据集的所有键的哈希值都求出来,将该哈希值相同的记录通过网络传到同一台机器上,然后在机器上对所有键相同的记录进行操作。

因为userData比events要大的多并且基本不会变化,所以有很多浪费效率的事情:每次调用时都对userData表进行计算hash值计算和跨节点数据混洗。

解决方案:在程序开始的时候,对userData表使用partitionBy()转换操作,将这张表转换为哈希分区

1
2
3
val userData = sc.sequenceFile[UserId, UserInfo]("hdfs://...")
.partitionBy(new HashPartioner(100)) // 构造100个分区
.persist() // 持久化当前结果

events是本地变量,并且只使用一次,所以为它指定分区方式没有什么用处。

userData使用了partitionBy(),Spark就知道该RDD是根据键的hash值来分区的。在userData.join(events)时,Spark只会对events进行数据混洗操作。将events中特定UserId的记录发送到userData的对应分区所在的那台机器上。需要网络传输的数据就大大减少了,速度也就显著提升了。

分区相关的操作

Spark的许多操作都有将数据根据跨节点进行混洗的过程。所有这些操作都会从数据分区中获益。类似join这样的二元操作,预先进行数据分区会导致其中至少一个RDD不发生数据混洗

获取好处的操作:cogroup, groupWith, join, leftOuterJoin , rightOuterJoin, groupByKey, reduceByKey , combineByKey, lookup

为结果设好分区的操作:cogroup, groupWith, join, leftOuterJoin , rightOuterJoin, groupByKey, reduceByKey , combineByKey, partitionBy, sort, (mapValues, flatMapValues, filter 如果父RDD有分区方式的话)

其他所有的操作的结果都不会存在特定的分区方式。对于二元操作,输出数据的分区方式取决于父RDD的分区方式。默认情况结果会采取hash分区。

PageRank

PageRank的python版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
#!/usr/bin/env python
# -*- coding: utf-8 -*-

""" PageRank算法
author = PuLiming
运行: bin/spark-submit files/pagerank.py data/mllib/pagerank_data.txt 10
"""
from __future__ import print_function

import re
import sys
from operator import add

from pyspark import SparkConf, SparkContext

def compute_contribs(urls, rank):
""" 给urls计算
Args:
urls: 目标url相邻的urls集合
rank: 目标url的当前rank

Returns:
url: 相邻urls中的一个url
rank: 当前url的新的rank
"""
num_urls = len(urls)
for url in urls:
yield (url, rank / num_urls)


def split_url(url_line):
""" 把一行url切分开来
Args:
url_line: 一行url,如 1 2

Returns:
url, neighbor_url
"""
parts = re.split(r'\s+', url_line) # 正则
return parts[0], parts[1]


def compute_pagerank(sc, url_data_file, iterations):
""" 计算各个page的排名
Args:
sc: SparkContext
url_data_file: 测试数据文件
iterations: 迭代次数

Returns:
status: 成功就返回0
"""

# 读取url文件 ['1 2', '1 3', '2 1', '3 1']
lines = sc.textFile(url_data_file).map(lambda line: line.encode('utf8'))

# 建立Pair RDD (url, neighbor_urls) [(1,[2,3]), (2,[1]), (3, [1])]
links = lines.map(lambda line : split_url(line)).distinct().groupByKey().mapValues(lambda x: list(x)).cache()
# 初始化所有url的rank为1 [(1, 1), (2, 1), (3, 1)]
ranks = lines.map(lambda line : (line[0], 1))

for i in range(iterations):
# (url, [(neighbor_urls), rank]) join neighbor_urls and rank
# 把当前url的rank分别contribute到其他相邻的url (url, rank)
contribs = links.join(ranks).flatMap(
lambda url_urls_rank: compute_contribs(url_urls_rank[1][0], url_urls_rank[1][1])
)
# 把url的所有rank加起来,再赋值新的
ranks = contribs.reduceByKey(add).mapValues(lambda rank : rank * 0.85 + 0.15)

for (link, rank) in ranks.collect():
print("%s has rank %s." % (link, rank))

return 0

if __name__ == '__main__':

if len(sys.argv) != 3:
print("Usage: python pagerank.py <data.txt> <iterations>", file = sys.stderr)
sys.exit(-1)

# 数据文件和迭代次数
url_data_file = sys.argv[1]
iterations = int(sys.argv[2])

# 配置 SparkContext
conf = SparkConf().setAppName('PythonPageRank')
conf.setMaster('local')
sc = SparkContext(conf=conf)

ret = compute_pagerank(sc, url_data_file, iterations)
sys.exit(ret)

PageRank的scala版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
val sc = new SparkContext(...)
val links = sc.objectFile[(String, Seq[String])]("links")
.partitionBy(new HashPartitioner(100))
.persist()

var ranks = links.mapValues(_ => 1.0)

// 迭代10次
for (i <- 0 until 10) {
val contributions = links.join(ranks).flatMap {
case (pageId, (links, rank)) =>
links.map(dest => (dest, rank / links.size))
}
ranks = contributions.reduceByKey(_ + _).mapValues(0.15 + 0.85* _)
}
ranks.saveAsTextFile("ranks")

当前scala版本的PageRank算法的优点:

  • links每次都会和ranks发生连接操作,所以一开始就对它进行分区partitionBy,就不会通过网络进行数据混洗了,节约了相当可观的网络通信开销
  • 对links进行persist,留在内存中,每次迭代使用
  • 第一次创建ranks,使用mapValues保留了父RDD的分区方式,第一次连接开销就会很小
  • reduceByKey后已经是分区了,再使用mapValues分区方式,再次和links进行join就会更加高效

所以对分区后的RDD尽量使用mapValues保留父分区方式,而不要用map丢失分区方式。