Spark重点解析(三) => Spark SQL

一. SparkSQL 的前世今生

  • Hive => MapReduce => HDFS
  • Shark => 使用 Hive 的 SQL 解析引擎 => RDD => 通过Hive 的metadata表去操作 HDFS
  • SparkSQL => 使用自己SQL 解析引擎 => RDD => 通过Hive 的metadata表去操作 HDFS


二. SparkSession

在 Spark 的早期版本(1.x 版本)中,SparkContext 是 Spark 的主要切入点,由于 RDD 是主要的 API,我 们通过 sparkContext 来创建和操作 RDD。对于每个其他的 API,我们需要使用不同的 context。 例如:

SparkContext => 创建 RDD

StreamingContext => 创建 Streaming

SQLContext => 创建 SQL

HiveContext => 创建 Hive

从 Spark 2.0开始, 引入 SparkSession

—- 为用户提供一个统一的切入点使用 Spark 各项功能

—- 允许用户通过它调用 DataFrame 和 Dataset 相关 API 来编写程序

—- 减少了用户需要了解的一些概念,可以很容易的与 Spark 进行交互

—- 与 Spark 交互之时不需要显示的创建 SparkConf、SparkContext 以及 SQlContext,这些对象已经封闭在 SparkSession 中

—- SparkSession 提供对 Hive 特征的内部支持:用 HiveQL 写 SQL 语句,访问 Hive UDFs,从 Hive 表中读取数据。


三. DataFrame

注意: 这里的操作都是基于1.x 版本, 与2.x 的区别就是2.x 统一了操作入口为 SparkSession

2.x 的操作代码见我的另一篇 bolg

从 json 文件读取为 dataframe, 使用spark 的 api 调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext

object DataFrameOperation {
def main(args: Array[String]): Unit = {
//create datarame
//首先创建程序入口
val conf=new SparkConf().setAppName("DataFrameOperation")
val sc=new SparkContext(conf);
//create sqlcontext
val sqlContext=new SQLContext(sc);
val df=sqlContext.read.json("hdfs://mycluster/user/ap/sparkdatas/people")
df.show();

//print schema
df.printSchema()
//name age
df.select("name").show();
//
df.select(df("name"),df("age")+1).show()
//where
df.filter(df("age") > 21).show()
//groupby
df.groupBy("age").count().show();
}
}

四. RDD 转为 DataFrame

1.使用对象 RDD 反射 Reflection 的方式

Scala 方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
步骤: 
1) 构造一个封装了指定对象的RDD
2) 引入sparksession的隐式转换
3) 调用 rdd.toDF()
=====================================================================

import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

case class Person(age:Int,name:String)

object RDD2DataFrameReflection {
def main(args: Array[String]): Unit = {
//create datarame
//首先创建程序入口
val conf=new SparkConf().setAppName("RDD2DataFrameReflection")
val sc=new SparkContext(conf);

//create sqlcontext
val sqlContext=new SQLContext(sc);
import sqlContext.implicits._
val personRDD= sc.textFile("hdfs://mycluster/user/ap/sparkdatas/peopletxt/people.txt", 2)
.map { line => line.split(",") }.map { p => Person(p(1).trim().toInt,p(0)) }
val personDF=personRDD.toDF()
// 或者 personDF.createOrReplaceTempView("person") / createGlobalTempView / createTempView
personDF.registerTempTable("person")
val personDataframe=sqlContext.sql("select name,age from person where age > 13 and age <= 19")
personDataframe.rdd.foreach { row => println(row.getString(0)+" "+row.getString(1)) }
personDataframe.rdd.saveAsTextFile("hdfs://hadoop1:9000/reflectionresult/")
}
}

Java 方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;

public class RDD2DataFrameReflection {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setAppName("RDD2DataFrameReflection");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
JavaRDD<Person> PersonRDD = sc.textFile("hdfs://hadoop1:9000/examples/src/main/resources/people.txt")
.map(new Function<String, Person>() {

public Person call(String line) throws Exception {
String[] strs = line.split(",");
String name=strs[0];
int age=Integer.parseInt(strs[1].trim());
Person person=new Person(age,name);
return person;
}
});
DataFrame personDF = sqlContext.createDataFrame(PersonRDD, Person.class);
personDF.registerTempTable("person");

DataFrame resultperson = sqlContext.sql("select name,age from person where age > 13 and age <= 19");
resultperson.javaRDD().foreach(new VoidFunction<Row>() {

private static final long serialVersionUID = 1L;

public void call(Row row) throws Exception {
//把每一条数据都看成是一个row row(0)=name row(1)=age
System.out.println("name"+row.getString(0));
System.out.println("age"+row.getInt(1));
}
});
resultperson.javaRDD().saveAsTextFile("hdfs://hadoop1:9000/reflectionresult");
}
}

2. 使用构造StructType方式

scala

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
要点: 
1) 构造 StructType
2) 构造 rowRDD
val rowRDD = studentRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).trim, p(3).toInt, p(4).trim))
3) 构造 DataFrame
df = sparksession.createDataFrame(rowRDD, schema)


import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.Row

object RDD2DataFrameProgrammatically {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setAppName("RDD2DataFrameProgrammatically")
val sc=new SparkContext(conf);

//create sqlcontext
val sqlContext=new SQLContext(sc);
import sqlContext.implicits._
val personRDD= sc.textFile("hdfs://hadoop1:9000/examples/src/main/resources/people.txt", 1)

////////////////////////////////////////////////////////////////////////
//create schema
val schemaString="name age";
val schema=StructType(
schemaString.split("\t").map { fieldsName => StructField(fieldsName,StringType,true) }
)
/** 参数
case class StructField(
name: String,
dataType: DataType,
nullable: Boolean = true,
metadata: Metadata = Metadata.empty)
构造方式:
val schema = StructType(
List(
StructField("id", IntegerType, true),
StructField("name", StringType, true),
StructField("sex", StringType, true),
StructField("age", IntegerType, true),
StructField("department", StringType, true)
)
)
*/
//create rowrdd
val rowRDD=personRDD.map { line => line.split(",") }.map { p => Row(p(0),p(1)) }
val personDF=sqlContext.createDataFrame(rowRDD, schema)

////////////////////////////////////////////////////////////////////////
personDF.registerTempTable("person");
val personDataframe=sqlContext.sql("select name,age from person where age > 13 and age <= 19")

personDataframe.rdd.foreach { row => println(row.getString(0)+"=> "+row.getString(1)) }
personDataframe.rdd.saveAsTextFile("hdfs://hadoop1:9000/RDD2DataFrameProgrammatically/")
}
}

Java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import java.util.ArrayList;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

public class RDD2DataFrameProgrammactically {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setAppName("RDD2DataFrameProgrammactically");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
JavaRDD<String> personRDD = sc.textFile("hdfs://hadoop1:9000/examples/src/main/resources/people.txt");
/**
* 这里的schemaString 是从数据库里面动态获取从来的
* 在实际的开发中我们需要写另外的代码去获取
*/
String schemaString="name age";
//create schema
ArrayList<StructField> list = new ArrayList<StructField>();
for(String str:schemaString.split("\t")){
list.add(DataTypes.createStructField(str, DataTypes.StringType, true));
}
StructType schema = DataTypes.createStructType(list);
/**
* 需要将RDD转换为一个JavaRDD《Row》
*/
JavaRDD<Row> rowRDD = personRDD.map(new Function<String, Row>() {

public Row call(String line) throws Exception {
String[] fields = line.split(",");

return RowFactory.create(fields[0],fields[1]);
}
});
DataFrame personDF = sqlContext.createDataFrame(rowRDD, schema);
personDF.registerTempTable("person");

DataFrame resultperson = sqlContext.sql("select name,age from person where age > 13 and age <= 19");
resultperson.javaRDD().foreach(new VoidFunction<Row>() {

private static final long serialVersionUID = 1L;

public void call(Row row) throws Exception {
//把每一条数据都看成是一个row row(0)=name row(1)=age
System.out.println("name"+row.getString(0));
System.out.println("age"+row.getInt(1));
}
});
resultperson.javaRDD().saveAsTextFile("hdfs://hadoop1:9000/reflectionresult");
}
}

3.DataSet & DataFrame & RDD

RDD 仅表示数据集,RDD 没有元数据,也就是说没有字段语义定义

DataFrame = RDD+Schema = SchemaRDD

Schema 是就是元数据,是语义描述信息。

image-20180727164726341

DataFrame 是一种特殊类型的 Dataset,DataSet[Row] = DataFrame

DataFrame 的数据类型统一是 Row, 缺点:

  • Row 不能直接操作 domain 对象
  • 函数风格编程,没有面向对象风格的 API

所以,Spark SQL 引入了 Dataset,扩展了 DataFrame API,提供了编译时类型检查,面向对象风格的 API。

  • Dataset 可以和 DataFrame、RDD 相互转换。DataFrame=Dataset[Row]
  • 可见 DataFrame 是一种特殊的 Dataset。

既然 Spark SQL 提供了 SQL 访问方式,那为什么还需要 DataFrame 和 Dataset 的 API 呢?

  • SQL 的表达能力却是有限的
  • DataFrame 和 Dataset 可以采用更加通用的语言(Scala 或 Python)来表达用户的 查询请求。
  • DataFrame / Dataset 的面向对象语法, 可以更快捕捉错误,因为 SQL 是运行时捕获异常,而 Dataset 是 编译时检查错误。

DataFrame & Dataset 的部分 api

image-20180727165823013

总的来讲

1
2
3
DataFream=DataSet[row]  //弱类型
DataFream=Untyped Dataset
DataSet=Untyped Dataset +typed Dataset

举个🌰

SparkCore: RDD

1
2
3
4
5
6
7
8
9
scala> case class Person(name: String, age: Long)
defined class Person

scala> val rdd = sc.textFile("/user/ap/sparkdatas/peopletxt/people.txt").map(line => line.split(",")).map(x => Person(x(0),x(1).trim.toLong))
rdd: org.apache.spark.rdd.RDD[Person] = MapPartitionsRDD[14] at map at <console>:26
// 这里得到的是 RDD[Person] 类型

scala> rdd.collect
res11: Array[Person] = Array(Person(Michael,29), Person(Andy,30), Person(Justin,19))

SparkSQL: DataFrame

1
2
3
4
scala> val df = spark.read.json("/user/ap/sparkdatas/people")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

// Row =>相当于 数据库的表里面的一行数据 DataFrame=DataSet[Row]

SparkSQL: DataSet

1
2
3
4
5
6
7
8
// as[Person] 把 Json 读出来的 DataFrame 直接反射到 Person类上, 转为 DataSet  !! 还有这种骚操作??
scala> val personDS = spark.read.json("/user/ap/sparkdatas/people").as[Person]
//
personDS: org.apache.spark.sql.Dataset[Person] = [age: bigint, name: string]

// 再把 dataSet 转为 dataFrame
val personDS2 = personDS.toDF
res13: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

来张图!

image-20180727225640182


五. 数据源之load 和 Save

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.hadoop.hdfs.server.namenode.SafeMode
import org.apache.spark.sql.SaveMode

object DatasourceLoadAndSave {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setAppName("RDD2DataFrameReflection")
val sc=new SparkContext(conf);
val sqlContext=new SQLContext(sc);
// sqlContext.read.json("person.json");
//sparksql默认支持的是parquet文件格式
val df= sqlContext.read.load("examples/src/main/resources/users.parquet");
// sqlContext.read.format("json").load("person.json")
// sqlContext.read.format("parquet").load("users.parquet")
//df保存的时候,如果不指定保存的文件格式,默认就是parquet
df.select("name", "age").write.save("namesAndAges.parquet");
// df.select("name", "age").write.json("user.json")
// df.select("name", "age").write.format("json").save("user.json")
df.select("name", "age").write.format("parquet").save("user.parquet")

df.select("name", "age").write.mode(SaveMode.ErrorIfExists).format("json").save("hdfs://hadoop1:9000/user.json");

/**
* 保存结果文件的时候的策略
* if data/table already exists,
Append, // contents of the DataFrame are expected to be appended to existing data
Overwrite, // existing data is expected to be overwritten
ErrorIfExists, // an exception is expected to be thrown
*/
}
}

六. 数据源之parquet files 合并

1
2
3
4
5
6
7
8
9
10
11
12
val df1 = sc.makeRDD(1 to 5).map(i => (i, i * 2)).toDF("single", "double")
df1.write.parquet("/user/ap/test_table/key=1")

val df2 = sc.makeRDD(6 to 10).map(i => (i, i * 3)).toDF("single", "triple")
df2.write.parquet("/user/ap/test_table/key=2")

//合并两个dataframe, 我们期望的就是合并出来应该是三个属性分别是single double triple
//实际上最后还会有一个分区就是keys
val df3 = sqlContext.read.option("mergeSchema", "true").parquet("/user/ap/test_table")

df3.printSchema()
df3.show()

七. 数据源之 MySQL

在 idea 中访问 mysql 的, 看这里

注意: 启动 Spark Shell,必须指定 mysql 连接驱动 jar 包

启动本机的单进程 Shell

1
2
3
spark-shell \
--jars $SPARK_HOME/jars/mysql-connector-java-5.1.40-bin.jar \
--driver-class-path $SPARK_HOME/jars/mysql-connector-java-5.1.40-bin.jar

连接 Spark 集群的 shell

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
spark-shell \
--driver-class-path $SPARK_HOME/jars/mysql-connector-java-5.1.40-bin.jar \
--master yarn

// 查询
scala> val jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://cs2:3306/test").option("dbtable", "student").option("user", "root").option("password", "123").option("driver","com.mysql.jdbc.Driver").load();

jdbcDF: org.apache.spark.sql.DataFrame = [id: int, name: string ... 2 more fields]

scala> jdbcDF.show
+---+------------+--------+-----+
| id| name| course|score|
+---+------------+--------+-----+
| 1| huangbo| math| 81|
| 2| huangbo| englist| 87|
| 3| huangbo|computer| 67|
| 4| xuzheng| math| 89|
| 5| xuzheng| english| 92|
| 6| xuzheng|computer| 83|
| 7|wangbaoqiang| math| 78|
| 8|wangbaoqiang| english| 88|
| 9|wangbaoqiang|computer| 90|
| 10| dengchao| math| 88|
+---+------------+--------+-----+

通过 Spark-submit 的方式

编写代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
//通过读取文件创建 RDD
val studentRDD = sc.textFile(args(0)).map(_.split(","))

//通过 StructType 直接指定每个字段的 schema
val schema = StructType(
List(
StructField("id", IntegerType, true),
StructField("name", StringType, true),
StructField("sex", StringType, true),
StructField("age", IntegerType, true),
StructField("department", StringType, true)
)
)

//将 RDD 映射到 rowRDD
val rowRDD = studentRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).trim, p(3).toInt,
p(4).trim))

//将 schema 信息应用到 rowRDD 上
val studentDataFrame = sqlContext.createDataFrame(rowRDD, schema)

//创建 Properties 存储数据库相关属性
val prop = new Properties()
prop.put("user", "root")
prop.put("password", "root")

//将数据追加到数据库
studentDataFrame.write.mode("append").jdbc("jdbc:mysql://hadoop02:3306/spider",
"student", prop)

//停止 SparkContext
sc.stop()

准备数据:student.txt 存储在 HDFS 上的/student 目录中

给项目打成 jar 包,上传到客户端

提交任务给 Spark 集群:

1
2
3
4
5
6
$SPARK_HOME/bin/spark-submit \
--class com.mazh.spark.sql.SparkSQL_JDBC \
--master yarn \
--driver-class-path $SPARK_HOME/jars/mysql-connector-java-5.1.40-bin.jar \
/home/hadoop/Spark_WordCount-1.0-SNAPSHOT.jar \
hdfs://mycluster/user/ap/sparkdatas/student.txt

八. 数据源之json

1
2
3
4
5
// 方式 1
val df1 = sparkSession.read.json("file://..")

// 方式 2
val df2 = sparkSession.read.format("json").load("hdfs://mycluster/...")

九. 数据源之 Hive

其它可以参考我的另一篇文章

与之前的自己构造数据, 或者读json数据等,得到 DataFrame , 然后把此 df.createTempView(“tmpView”)

然后再对此 tmpView 使用 sql 语句查询不一样

直接用 sparkSession.sql(“...”) 操作的对象默认就是 hive

==> 访问 hive 的元数据库(mysql), 把 hive 底层的执行引擎 MapReduce 换成了SparkCore

十. 数据源之 HBase

如果帮到你, 可以给我赞助杯咖啡☕️
0%