Spark-SQL的简略笔记
📅 发表于 2017/03/20
🔄 更新于 2017/03/20
👁️ 次访问
📝 0 字
⏳ 0 分钟
大数据
#Spark
#Spark SQL
几个基本的Spark-SQL概念。没多少东西
Spark SQL是用来处理结构化数据的模块。与基本RDD相比,Spark SQL提供了更多关于数据结构和计算方面的信息(在内部有优化效果)。通常通过SQL和Dataset API来和Spark SQL进行交互。
Dataset<Row>
表示DataFrameSparkSession
SparkSession
是Spark所有功能的入口点,用SparkSession.builder()
就可以。
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
DataFrameReader
从外部系统加载数据,返回DataFrame。例如文件系统、键值对等等。通过spark.read
来获取Reader。
# 1. json 键值对
df1 = spark.read.json("python/test_support/sql/people.json")
df1.dtypes
# [('age', 'bigint'), ('name', 'string')]
df2 = sc.textFile("python/test_support/sql/people.json")
# df1.dtypes 和 df2.dtypes是一样的
# 2. text 文本文件
# 每一行就是一个Row,默认的列名是Value
df = spark.read.text("python/test_support/sql/text-test.txt")
df.collect()
# [Row(value=u'hello'), Row(value=u'this')]
# 3. load
# 从数据源中读取数据
创建DataFrames
从RDD、Hive Table、Spark data source、外部文件中都可以创建DataFrames。
通过DataFrameReader
,读取外部文件
# spark.read返回一个DataFrameReader
df = spark.read.json("examples/src/main/resources/people.json")
df.show()
df.dtypes
通过spark.createDataFrame()
,读取RDD、List或pandas.DataFrame
person_list = [('AA', 18), ('PLM', 23)]
rdd = sc.parallelize(person_list)
# 1. list
df = spark.createDataFrame(person_list) # 没有指定列名,默认为_1 _2
df = spark.createDataFrame(person_list, ['name', 'age']) # 指定了列名
df.collect() # df.show()
#[Row(name='AA', age=18), Row(name='PLM', age=23)]
# 2. RDD
rdd = sc.parallelize(person_list)
df = spark.createDataFrame(rdd, ['name', 'age'])
# 3. Row
from pyspark.sql import Row
Person = Row('name', 'age') # 指定模板属性
persons = rdd.map(lambda r: Person(*r)) # 把每一行变成Person
df = spark.createDataFrame(persons)
df.collect()
# 4. 指定类型StructType
from pyspark.sql.types import *
field_name = StructField('name', StringType(), True) # 名,类型,非空
field_age = StructField('age', IntegerType, True)
person_type = StructType([field_name, field_age])
# 通过链式add也可以
# person_type = StructType.add("name", StringType(), True).add(...)
df = spark.createDataFrame(rdd, person_type)
Row
Row
是DataFrame
中的,它可以定义一些属性,这些属性在DataFrame里面可以被访问。比如:row.key
(像属性)和row['key']
(像dict)
from pyspark import Row
# 1. 创建一个模板
Person = Row('name', 'age') # <Row(name, age)>
'name' in Person # True,有这个属性
'sex' in Person # False
# 2. 以Person为模板,创建alice, plm
alice = Person('Alice', 22) # Row(name='Alice', age=22)
plm = Person('PLM', 23)
# 访问属性
name, age = alice['name'], alice['age']
# 返回dict
plm.asDict()
# {'age': 23, 'name': 'PLM'}
# 3. 多个person创建一个DataFrame
p_list = [alice, plm]
p_df = spark.createDataFrame(p_list)
p_df.collect()
# [Row(name=u'Alice', age=22), Row(name=u'PLM', age=23)]
DataFrame的操作
在2.0中,DataFrames只是Scala和Java API中的Rows数据集。它的操作称为非类型转换,与带有强类型Scala和Java数据集的类型转换相反。
Python tips: df.age
和df['age']
都可以使用,前者在命令行里面方便,但是建议使用后者。
df.printSchema()
df.select("name").show()
df.select(df['name'], df['age'] + 1).show()
df.filter(df['age'] > 21).show()
df.groupBy("age").count().show()
DataFrame的Python Api,DataFrame的函数API
编程方式运行SQL
通过spark.sql()
执行,返回一个DataFrame
# 通过df创建一个本地临时视图,与创建这个df的SparkSession同生命周期
df.createOrReplaceTempview("people")
sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
# 展示为Table
sqlDf.collect()
# [Row(age=None, name=u'Michael'), Row(age=30, name=u'Andy'), Row(age=19, name=u'Justin')]
# 全局临时视图
# 在所有session中共享,直到spark application停止
df.createGlobalTempView("people")
spark.sql("SELECT * FROM global_temp.people").show()
spark.newSession().sql("SELECT * FROM global_temp.people").show()