几个基本的Spark-SQL概念。没多少东西
基础
概念
Spark SQL是用来处理结构化数据的模块。与基本RDD相比,Spark SQL提供了更多关于数据结构和计算方面的信息(在内部有优化效果)。通常通过SQL和Dataset API来和Spark SQL进行交互。
- SQL: 进行SQL查询,从各种结构化数据源(Json, Hive, Parquet)读取数据。返回Dataset/DataFrame。
- Dataset: 分布式的数据集合。
- DataFrame
- 是一个组织有列名的Dataset。类似于关系型数据库中的表。
- 可以使用结构化数据文件、Hive表、外部数据库、RDD等创建。
- 在Scala和Java中,DataFrame由Rows和Dataset组成。在Scala中,DataFrame只是Dataset[Row]的类型别名。在Java中,用Dataset
表示DataFrame
开始
SparkSession
SparkSession
是Spark所有功能的入口点,用SparkSession.builder()
就可以。
1 2 3 4 5 6 7
| from pyspark.sql import SparkSession
spark = SparkSession \ .builder \ .appName("Python Spark SQL basic example") \ .config("spark.some.config.option", "some-value") \ .getOrCreate()
|
DataFrameReader
从外部系统加载数据,返回DataFrame。例如文件系统、键值对等等。通过spark.read
来获取Reader。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| df1 = spark.read.json("python/test_support/sql/people.json") df1.dtypes
df2 = sc.textFile("python/test_support/sql/people.json")
df = spark.read.text("python/test_support/sql/text-test.txt") df.collect()
|
创建DataFrames
从RDD、Hive Table、Spark data source、外部文件中都可以创建DataFrames。
通过DataFrameReader
,读取外部文件
1 2 3 4
| df = spark.read.json("examples/src/main/resources/people.json") df.show() df.dtypes
|
通过spark.createDataFrame()
,读取RDD、List或pandas.DataFrame
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| person_list = [('AA', 18), ('PLM', 23)] rdd = sc.parallelize(person_list)
df = spark.createDataFrame(person_list) df = spark.createDataFrame(person_list, ['name', 'age']) df.collect()
rdd = sc.parallelize(person_list) df = spark.createDataFrame(rdd, ['name', 'age'])
from pyspark.sql import Row Person = Row('name', 'age') persons = rdd.map(lambda r: Person(*r)) df = spark.createDataFrame(persons) df.collect()
from pyspark.sql.types import * field_name = StructField('name', StringType(), True) field_age = StructField('age', IntegerType, True) person_type = StructType([field_name, field_age])
df = spark.createDataFrame(rdd, person_type)
|
Row
Row
是DataFrame
中的,它可以定义一些属性,这些属性在DataFrame里面可以被访问。比如:row.key
(像属性)和row['key']
(像dict)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| from pyspark import Row
Person = Row('name', 'age') 'name' in Person 'sex' in Person
alice = Person('Alice', 22) plm = Person('PLM', 23)
name, age = alice['name'], alice['age']
plm.asDict()
p_list = [alice, plm] p_df = spark.createDataFrame(p_list) p_df.collect()
|
DataFrame的操作
在2.0中,DataFrames只是Scala和Java API中的Rows数据集。它的操作称为非类型转换,与带有强类型Scala和Java数据集的类型转换相反。
Python tips: df.age
和df['age']
都可以使用,前者在命令行里面方便,但是建议使用后者。
1 2 3 4 5
| df.printSchema() df.select("name").show() df.select(df['name'], df['age'] + 1).show() df.filter(df['age'] > 21).show() df.groupBy("age").count().show()
|
DataFrame的Python Api,DataFrame的函数API
编程方式运行SQL
通过spark.sql()
执行,返回一个DataFrame
1 2 3 4 5 6 7 8 9 10 11 12 13
| df.createOrReplaceTempview("people") sqlDF = spark.sql("SELECT * FROM people") sqlDF.show()
sqlDf.collect()
df.createGlobalTempView("people") spark.sql("SELECT * FROM global_temp.people").show() spark.newSession().sql("SELECT * FROM global_temp.people").show()
|