几个基本的Spark-SQL概念。没多少东西

基础

概念

Spark SQL是用来处理结构化数据的模块。与基本RDD相比,Spark SQL提供了更多关于数据结构和计算方面的信息(在内部有优化效果)。通常通过SQL和Dataset API来和Spark SQL进行交互。

  • SQL: 进行SQL查询,从各种结构化数据源(Json, Hive, Parquet)读取数据。返回Dataset/DataFrame。
  • Dataset: 分布式的数据集合。
  • DataFrame
  • 是一个组织有列名的Dataset。类似于关系型数据库中的表。
  • 可以使用结构化数据文件、Hive表、外部数据库、RDD等创建。
  • 在Scala和Java中,DataFrame由Rows和Dataset组成。在Scala中,DataFrame只是Dataset[Row]的类型别名。在Java中,用Dataset表示DataFrame

开始

SparkSession

SparkSession是Spark所有功能的入口点,用SparkSession.builder()就可以。

1
2
3
4
5
6
7
from pyspark.sql import SparkSession

spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()

DataFrameReader

从外部系统加载数据,返回DataFrame。例如文件系统、键值对等等。通过spark.read来获取Reader。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 1. json 键值对
df1 = spark.read.json("python/test_support/sql/people.json")
df1.dtypes
# [('age', 'bigint'), ('name', 'string')]
df2 = sc.textFile("python/test_support/sql/people.json")
# df1.dtypes 和 df2.dtypes是一样的

# 2. text 文本文件
# 每一行就是一个Row,默认的列名是Value
df = spark.read.text("python/test_support/sql/text-test.txt")
df.collect()
# [Row(value=u'hello'), Row(value=u'this')]

# 3. load
# 从数据源中读取数据

创建DataFrames

从RDD、Hive Table、Spark data source、外部文件中都可以创建DataFrames。

通过DataFrameReader,读取外部文件

1
2
3
4
# spark.read返回一个DataFrameReader
df = spark.read.json("examples/src/main/resources/people.json")
df.show()
df.dtypes

通过spark.createDataFrame(),读取RDD、List或pandas.DataFrame

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
person_list = [('AA', 18), ('PLM', 23)]
rdd = sc.parallelize(person_list)

# 1. list
df = spark.createDataFrame(person_list) # 没有指定列名,默认为_1 _2
df = spark.createDataFrame(person_list, ['name', 'age']) # 指定了列名
df.collect() # df.show()
#[Row(name='AA', age=18), Row(name='PLM', age=23)]

# 2. RDD
rdd = sc.parallelize(person_list)
df = spark.createDataFrame(rdd, ['name', 'age'])

# 3. Row
from pyspark.sql import Row
Person = Row('name', 'age') # 指定模板属性
persons = rdd.map(lambda r: Person(*r)) # 把每一行变成Person
df = spark.createDataFrame(persons)
df.collect()

# 4. 指定类型StructType
from pyspark.sql.types import *
field_name = StructField('name', StringType(), True) # 名,类型,非空
field_age = StructField('age', IntegerType, True)
person_type = StructType([field_name, field_age])
# 通过链式add也可以
# person_type = StructType.add("name", StringType(), True).add(...)
df = spark.createDataFrame(rdd, person_type)

Row

RowDataFrame中的,它可以定义一些属性,这些属性在DataFrame里面可以被访问。比如:row.key(像属性)和row['key'](像dict)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from pyspark import Row
# 1. 创建一个模板
Person = Row('name', 'age') # <Row(name, age)>
'name' in Person # True,有这个属性
'sex' in Person # False

# 2. 以Person为模板,创建alice, plm
alice = Person('Alice', 22) # Row(name='Alice', age=22)
plm = Person('PLM', 23)
# 访问属性
name, age = alice['name'], alice['age']
# 返回dict
plm.asDict()
# {'age': 23, 'name': 'PLM'}

# 3. 多个person创建一个DataFrame
p_list = [alice, plm]
p_df = spark.createDataFrame(p_list)
p_df.collect()
# [Row(name=u'Alice', age=22), Row(name=u'PLM', age=23)]

DataFrame的操作

在2.0中,DataFrames只是Scala和Java API中的Rows数据集。它的操作称为非类型转换,与带有强类型Scala和Java数据集的类型转换相反。

Python tips: df.agedf['age']都可以使用,前者在命令行里面方便,但是建议使用后者。

1
2
3
4
5
df.printSchema()
df.select("name").show()
df.select(df['name'], df['age'] + 1).show()
df.filter(df['age'] > 21).show()
df.groupBy("age").count().show()

DataFrame的Python ApiDataFrame的函数API

编程方式运行SQL

通过spark.sql()执行,返回一个DataFrame

1
2
3
4
5
6
7
8
9
10
11
12
13
# 通过df创建一个本地临时视图,与创建这个df的SparkSession同生命周期
df.createOrReplaceTempview("people")
sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
# 展示为Table
sqlDf.collect()
# [Row(age=None, name=u'Michael'), Row(age=30, name=u'Andy'), Row(age=19, name=u'Justin')]

# 全局临时视图
# 在所有session中共享,直到spark application停止
df.createGlobalTempView("people")
spark.sql("SELECT * FROM global_temp.people").show()
spark.newSession().sql("SELECT * FROM global_temp.people").show()