Spark-KV-RDD的API使用
PairRDD及其创建
键值对RDD称为PairRDD,通常用来做聚合计算
。Spark为Pair RDD提供了许多专有的操作。
# 创建pair rdd: map 或者 读取键值对格式自动转成pairrdd
# 每行的第一个单词作为key,line作为value
pairs = lines.map(lambda line: (line.split(' ')[0], line))
转化操作
Pair RDD 的转化操作分为单个和多个RDD的转化操作。
单个Pair RDD转化
reduceByKey(func)
合并
含有相同键的值,也称作聚合
from operator import add
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
sorted(rdd.reduceByKey(add).collect())
# [('a', 2), ('b', 1)]
# 这种写法也可以
rdd.reduceByKey(lambda x, y: x+y).collect()
groupByKey
对具有相同键的值进行分组
。会生成hash分区的RDD
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
sorted(rdd.groupByKey().mapValues(len).collect())
# [('a', 2), ('b', 1)]
sorted(rdd.groupByKey().mapValues(list).collect())
[('a', [1, 1]), ('b', [1])]
说明:如果对键进行分组以便对每个键进行聚合(如sum和average),则用reduceByKey
和aggregateByKey
性能更好
combineByKey
合并具有相同键的值,但是返回不同类型
(K, V) - (K, C)。最常用的聚合操作。
x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
def add(a, b): return a + str(b)
sorted(x.combineByKey(str, add, add).collect())
[('a', '11'), ('b', '1')]
下面是combineByKey的源码和参数说明
def combineByKey[C](
createCombiner: V => C, // V => C的转变 / 初始值 / 创建one-element的list
mergeValue: (C, V) => C, // 将原V累加到新的C
mergeCombiners: (C, C) => C, // 两个C合并成一个
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null): RDD[(K, C)] = {
//实现略
}
// 求平均值
val scores = sc.parallelize(
List(("chinese", 88.0) , ("chinese", 90.5) , ("math", 60.0), ("math", 87.0))
)
val avg = scores.combineByKey(
(v) => (v, 1),
(acc: (Float, Int), V) => (acc._1 + v, acc._2 + 1),
(acc1: (Float, Int), acc2: (Float, Int) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
).map{case (key, value) => (key, value._1 / value._2.toFloat)}
# 求平均值
nums = sc.parallelize([('c', 90), ('m', 95), ('c', 80)])
sum_count = nums.combineByKey(
lambda x: (x, 1),
lambda acc, x: (acc[0] + x, acc[1] + 1),
lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])
)
# [('c', (170, 2)), ('m', (95, 1))]
avg_map = sum_count.mapValues(lambda (sum, count): sum/count).collectAsMap()
# {'c': 85, 'm': 95}
avg_map = sum_count.map(lambda key, s_c: (key, s_c[0]/s_c[1])).collectAsMap()
mapValues(f)
对每个pair RDD中的每个Value应用一个func,不改变Key。其实也是对value做map
操作。一般我们只想访问pair的值的时候,可以用mapValues
。类似于map{case (x, y): (x, func(y))}
x = sc.parallelize([("a", ["apple", "banana", "lemon"]), ("b", ["grapes"])])
def f(x): return len(x)
x.mapValues(f).collect()
[('a', 3), ('b', 1)]
mapPartitions(f)
是map
的一个变种,都需要传入一个函数f,去处理数据。不同点如下:
- map: f应用于每一个元素。
- mapPartitions: f应用于每一个分区。分区的内容以Iterator[T]传入f,f的输出结果是Iterator[U]。最终RDD的由所有分区经过输入函数处理后的结果得到的。
优点:我们可以为每一个分区做一些初始化操作,而不用为每一个元素做初始化。例如,初始化数据库,次数n。map时:n=元素数量,mapPartitions时:n=分区数量。
# 1. 每个元素加1
def f(iterator):
print ("called f")
return map(lambda x: x + 1, iterator)
rdd = sc.parallelize([1, 2, 3, 4, 5], 2)
rdd.mapPartitions(f).collect() # 只调用2次f
"""
called f
called f
[2, 3, 4, 5, 6]
"""
# 2. 分区求和
rdd = sc.parallelize([1, 2, 3, 4, 5, 6], 2)
def f(iterator):
print "call f"
yield sum(iterator)
rdd.mapPartitions(f).collect() # 调用2次f,分区求和
"""
call f
call f
[6, 15]
"""
mapPartitionsWithIndex(f)
和mapPartitions
一样,只是多了个partition的index。
rdd = sc.parallelize(["yellow","red","blue","cyan","black"], 3)
def g(index, item):
return "id-{}, {}".format(index, item)
def f(index, iterator):
print 'called f'
return map(lambda x: g(index, x), iterator)
rdd.mapPartitionsWithIndex(f).collect()
"""
called f
called f
called f
['id-0, yellow', 'id-1, red', 'id-1, blue', 'id-2, cyan', 'id-2, black']
"""
repartition(n)
生成新的RDD,分区数目为n。会增加或者减少 RDD的并行度。会对分布式数据集进行shuffle
操作,效率低。如果只是想减少分区数,则使用coalesce
,不会进行shuffle操作。
>>> rdd = sc.parallelize([1,2,3,4,5,6,7], 4)
>>> sorted(rdd.glom().collect())
[[1], [2, 3], [4, 5], [6, 7]]
>>> len(rdd.repartition(2).glom().collect())
2
>>> len(rdd.repartition(10).glom().collect())
10
coalesce(n)
合并,减少分区数,默认不执行shuffle操作。
sc.parallelize([1, 2, 3, 4, 5], 3).glom().collect()
# [[1], [2, 3], [4, 5]]
sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect()
# [[1, 2, 3, 4, 5]]
flatMapValues(f)
打平values,[("k", ["v1", "v2"])] -- [("k","v1"), ("k", "v2")]
x = sc.parallelize([("a", ["x", "y", "z"]), ("b", ["p", "r"])])
def f(x): return x
x.flatMapValues(f).collect()
# [('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')]
keys
values
sortByKey
返回一个对键进行排序的RDD。会生成范围分区的RDD
tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
sc.parallelize(tmp).sortByKey().first()
# ('1', 3)
sc.parallelize(tmp).sortByKey(True, 1).collect()
# [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
sc.parallelize(tmp).sortByKey(True, 2).collect()
# [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
tmp2 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), ('lamb', 5)]
tmp2.extend([('whose', 6), ('fleece', 7), ('was', 8), ('white', 9)])
sc.parallelize(tmp2).sortByKey(True, 3, keyfunc=lambda k: k.lower()).collect()
# [('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5),...('white', 9), ('whose', 6)]
两个Pair RDD转化
substract
留下在x中却不在y中的元素
x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)])
y = sc.parallelize([("a", 3), ("c", None)])
sorted(x.subtract(y).collect())
#[('b', 4), ('b', 5)]
substractByKey
删掉X中在Y中也存在的Key所包含的所有元素
join
内连接,从x中去和y中一个一个的匹配,(k, v1), (k, v2) -- (k, (v1, v2))
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2), ("a", 3)])
sorted(x.join(y).collect())
# [('a', (1, 2)), ('a', (1, 3))]
leftOuterJoin
左边RDD的键都有,右边没有的配None
x = sc.parallelize([('a', 1), ('b', 4)])
y = sc.parallelize([('a', 2)])
sorted(x.leftOuterJoin(y).collect())
# [('a', (1, 2)), ('b', (4, None))]
rightOuterJoin
右边RDD的键都有,左边没有的配None
x = sc.parallelize([('a', 1), ('b', 4)])
y = sc.parallelize([('a', 2)])
sorted(x.rightOuterJoin(y).collect())
# [('a', (1, 2))]
sorted(y.rightOuterJoin(x).collect())
# [('a', (2, 1)), ('b', (None, 4))]
cogroup
将两个RDD中拥有相同键的value分组到一起,即使两个RDD的V不一样
x = sc.parallelize([('a', 1), ('b', 4)])
y = sc.parallelize([('a', 2)])
x.cogroup(y).collect()
# 上面显示的是16进制
[(x, tuple(map(list, y))) for x, y in sorted(x.cogroup(y).collect())]
# [('a', ([1], [2])), ('b', ([4], []))]
行动操作
countByKey
对每个键对应的元素分别计数
rdd = sc.parallelize([('a', 1), ('b', 1), ('a', 1)])
rdd.countByKey().items() # 转换成一个dict,再取所有元素
# [('a', 2), ('b', 1)]
collectAsMap
返回一个map
m = sc.parallelize([(1, 2), (3, 4)]).collectAsMap()
m[1] - 2 # 当做map操作即可
m[3] - 4
lookup(key)
返回键的RDD中的值列表。如果RDD具有已知的分区器,则通过仅搜索key映射到的分区来高效地完成该操作。
l = range(1000) # 1,2,3,...,1000
rdd = sc.parallelize(zip(l, l), 10) # 键和值一样,10个数据分片,10个并行度,10个task
rdd.lookup(42) # slow
# [42]
sorted_rdd = rdd.sortByKey()
sorted_rdd.lookup(42) # fast
# [42]
rdd = sc.parallelize([('a', 'a1'), ('a', 'a2'), ('b', 'b1')])
rdd.lookup('a')[0]
# 'a1'
聚合操作
当数据是键值对组织的时候,聚合具有相同键的元素是很常见的操作。基础RDD有fold()
, combine()
, reduce()
,Pair RDD有combineByKey()
最常用,reduceByKey()
, foldByKey()
等。
计算均值
## 方法一:mapValues和reduceByKey
rdd = sc.parallelize([('a', 1), ('a', 3), ('b', 4)])
maprdd = rdd.mapValues(lambda x : (x, 1))
# [('a', (1, 1)), ('a', (3, 1)), ('b', (4, 1))]
reducerdd = maprdd.reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
# [('a', (4, 2)), ('b', (4, 1))]
reducerdd.mapValues(lambda x : x[0]/x[1]).collect()
# [('a', 2), ('b', 4)]
## 方法二 combineByKey 最常用的
nums = sc.parallelize([('c', 90), ('m', 95), ('c', 80)])
sum_count = nums.combineByKey(
lambda x: (x, 1),
lambda acc, x: (acc[0] + x, acc[1] + 1),
lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])
)
# [('c', (170, 2)), ('m', (95, 1))]
avg_map = sum_count.mapValues(lambda (sum, count): sum/count).collectAsMap()
# {'c': 85, 'm': 95}
avg_map = sum_count.map(lambda key, s_c: (key, s_c[0]/s_c[1])).collectAsMap()
统计单词计数
rdd = sc.textFile('README.md')
words = rdd.flatMap(lambda x: x.split(' '))
# 568个
result = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x+y)
# 289
result.top(2)
# [('your', 1), ('you', 4)]
数据分区
分区说明
在分布式程序中,通信的代价是很大的。因此控制数据分布以获得最少的网络传输可以极大地提升整体性能。Spark程序通过控制RDD分区方式来减少通信开销。使用partitionBy
进行分区
- 不需分区:给定RDD只需要被扫描一次
- 需要分区:数据集在多次基于键的操作中使用,比如连接操作。
partitionBy
是转化操作,生成新的RDD,为了多次计算,一般要进行持久化persist
Spark中所有的键值对RDD都可以进行分区。系统会根据一个针对键的函数对元素进行分组。Spark不能显示控制具体每个键落在哪一个工作节点上,但是Spark可以确保同一组的键出现在同一个节点上。
- Hash分区:将一个RDD分成了100个分区,hashcode(key)%100 相同的,会在同一个节点上面
- 范围分区:将key在同一个范围区间内的记录放在同一个节点上
一个简单的例子,内存中有一张很大的用户信息表 -- 即(UserId, UserInfo)组成的RDD,UserInfo包含用户订阅了的所有Topics。还有一张(UserId, LinkInfo)存放着过去5分钟用户浏览的Topic。现在要找出用户浏览了但是没有订阅的Topic数量。
val sc = new SparkContext(...)
val userData = sc.sequenceFile[UserId, UserInfo]("hdfs://...").persist()
def processNewLog(logFileName: String) {
val events = sc.sequenceFile[UserId, LinkInfo](logFileName)
val joined = userData.join(events) // (UserId, (UserInfo, LinkInfo))
val offTopicVisits = joined.filter{
case (UserId, (UserInfo, LinkInfo)) =>
!UserInfo.topics.contains(LinkInfo.topic)
}.count()
print ("浏览了且未订阅的数量:" + offTopicVisits)
}
这段代码不够高效。因为每次调用processNewLog
都会用join
操作,但我们却不知道数据集是如何分区的。
连接操作,会将两个数据集的所有键的哈希值都求出来,将该哈希值相同的记录通过网络传到同一台机器上,然后在机器上对所有键相同的记录进行操作。
因为userData比events要大的多并且基本不会变化,所以有很多浪费效率的事情:每次调用时都对userData表进行计算hash值计算和跨节点数据混洗。
解决方案:在程序开始的时候,对userData表使用partitionBy()
转换操作,将这张表转换为哈希分区
val userData = sc.sequenceFile[UserId, UserInfo]("hdfs://...")
.partitionBy(new HashPartioner(100)) // 构造100个分区
.persist() // 持久化当前结果
events是本地变量,并且只使用一次,所以为它指定分区方式没有什么用处。
userData使用了partitionBy()
,Spark就知道该RDD是根据键的hash值来分区的。在userData.join(events)
时,Spark只会对events进行数据混洗操作。将events中特定UserId的记录发送到userData的对应分区所在的那台机器上。需要网络传输的数据就大大减少了,速度也就显著提升了。
分区相关的操作
Spark的许多操作都有将数据根据跨节点进行混洗的过程。所有这些操作都会从数据分区中获益。类似join
这样的二元操作,预先进行数据分区会导致其中至少一个RDD不发生数据混洗
。
获取好处的操作:
cogroup
,groupWith
,join
,leftOuterJoin
,rightOuterJoin
,groupByKey
,reduceByKey
,combineByKey
,lookup
为结果设好分区的操作:
cogroup
,groupWith
,join
,leftOuterJoin
,rightOuterJoin
,groupByKey
,reduceByKey
,combineByKey
,partitionBy
,sort
, (mapValues
,flatMapValues
,filter
如果父RDD有分区方式的话)
其他所有的操作的结果都不会存在特定的分区方式。对于二元操作,输出数据的分区方式取决于父RDD的分区方式。默认情况结果会采取hash分区。
PageRank
PageRank的python版本
#!/usr/bin/env python
# -*- coding: utf-8 -*-
""" PageRank算法
author = PuLiming
运行: bin/spark-submit files/pagerank.py data/mllib/pagerank_data.txt 10
"""
from __future__ import print_function
import re
import sys
from operator import add
from pyspark import SparkConf, SparkContext
def compute_contribs(urls, rank):
""" 给urls计算
Args:
urls: 目标url相邻的urls集合
rank: 目标url的当前rank
Returns:
url: 相邻urls中的一个url
rank: 当前url的新的rank
"""
num_urls = len(urls)
for url in urls:
yield (url, rank / num_urls)
def split_url(url_line):
""" 把一行url切分开来
Args:
url_line: 一行url,如 1 2
Returns:
url, neighbor_url
"""
parts = re.split(r'\s+', url_line) # 正则
return parts[0], parts[1]
def compute_pagerank(sc, url_data_file, iterations):
""" 计算各个page的排名
Args:
sc: SparkContext
url_data_file: 测试数据文件
iterations: 迭代次数
Returns:
status: 成功就返回0
"""
# 读取url文件 ['1 2', '1 3', '2 1', '3 1']
lines = sc.textFile(url_data_file).map(lambda line: line.encode('utf8'))
# 建立Pair RDD (url, neighbor_urls) [(1,[2,3]), (2,[1]), (3, [1])]
links = lines.map(lambda line : split_url(line)).distinct().groupByKey().mapValues(lambda x: list(x)).cache()
# 初始化所有url的rank为1 [(1, 1), (2, 1), (3, 1)]
ranks = lines.map(lambda line : (line[0], 1))
for i in range(iterations):
# (url, [(neighbor_urls), rank]) join neighbor_urls and rank
# 把当前url的rank分别contribute到其他相邻的url (url, rank)
contribs = links.join(ranks).flatMap(
lambda url_urls_rank: compute_contribs(url_urls_rank[1][0], url_urls_rank[1][1])
)
# 把url的所有rank加起来,再赋值新的
ranks = contribs.reduceByKey(add).mapValues(lambda rank : rank * 0.85 + 0.15)
for (link, rank) in ranks.collect():
print("%s has rank %s." % (link, rank))
return 0
if __name__ == '__main__':
if len(sys.argv) != 3:
print("Usage: python pagerank.py <data.txt> <iterations>", file = sys.stderr)
sys.exit(-1)
# 数据文件和迭代次数
url_data_file = sys.argv[1]
iterations = int(sys.argv[2])
# 配置 SparkContext
conf = SparkConf().setAppName('PythonPageRank')
conf.setMaster('local')
sc = SparkContext(conf=conf)
ret = compute_pagerank(sc, url_data_file, iterations)
sys.exit(ret)
PageRank的scala版本
val sc = new SparkContext(...)
val links = sc.objectFile[(String, Seq[String])]("links")
.partitionBy(new HashPartitioner(100))
.persist()
var ranks = links.mapValues(_ => 1.0)
// 迭代10次
for (i <- 0 until 10) {
val contributions = links.join(ranks).flatMap {
case (pageId, (links, rank)) =>
links.map(dest => (dest, rank / links.size))
}
ranks = contributions.reduceByKey(_ + _).mapValues(0.15 + 0.85* _)
}
ranks.saveAsTextFile("ranks")
当前scala版本的PageRank算法的优点:
- links每次都会和ranks发生连接操作,所以一开始就对它进行分区
partitionBy
,就不会通过网络进行数据混洗了,节约了相当可观的网络通信开销 - 对links进行
persist
,留在内存中,每次迭代使用 - 第一次创建ranks,使用
mapValues
保留了父RDD的分区方式,第一次连接开销就会很小 reduceByKey
后已经是分区了,再使用mapValues
分区方式,再次和links进行join
就会更加高效
所以对分区后的RDD尽量使用mapValues
保留父分区方式,而不要用map
丢失分区方式。