一. SparkSQL 的前世今生
- Hive => MapReduce => HDFS
- Shark => 使用 Hive 的 SQL 解析引擎 => RDD => 通过Hive 的metadata表去操作 HDFS
- SparkSQL => 使用自己SQL 解析引擎 => RDD => 通过Hive 的metadata表去操作 HDFS
二. SparkSession
在 Spark 的早期版本(1.x 版本)中,SparkContext 是 Spark 的主要切入点,由于 RDD 是主要的 API,我 们通过 sparkContext 来创建和操作 RDD。对于每个其他的 API,我们需要使用不同的 context。 例如:
SparkContext => 创建 RDD
StreamingContext => 创建 Streaming
SQLContext => 创建 SQL
HiveContext => 创建 Hive
从 Spark 2.0开始, 引入 SparkSession
—- 为用户提供一个统一的切入点使用 Spark 各项功能
—- 允许用户通过它调用 DataFrame 和 Dataset 相关 API 来编写程序
—- 减少了用户需要了解的一些概念,可以很容易的与 Spark 进行交互
—- 与 Spark 交互之时不需要显示的创建 SparkConf、SparkContext 以及 SQlContext,这些对象已经封闭在 SparkSession 中
—- SparkSession 提供对 Hive 特征的内部支持:用 HiveQL 写 SQL 语句,访问 Hive UDFs,从 Hive 表中读取数据。
三. DataFrame
注意: 这里的操作都是基于1.x 版本, 与2.x 的区别就是2.x 统一了操作入口为 SparkSession
从 json 文件读取为 dataframe, 使用spark 的 api 调用
1 | import org.apache.spark.SparkConf |
四. RDD 转为 DataFrame
1.使用对象 RDD 反射 Reflection 的方式
Scala 方式
1 | 步骤: |
Java 方式
1 | import org.apache.spark.SparkConf; |
2. 使用构造StructType方式
scala
1 | 要点: |
Java
1 | import java.util.ArrayList; |
3.DataSet & DataFrame & RDD
RDD 仅表示数据集,RDD 没有元数据,也就是说没有字段语义定义
DataFrame = RDD+Schema = SchemaRDD
Schema 是就是元数据,是语义描述信息。
DataFrame 是一种特殊类型的 Dataset,DataSet[Row] = DataFrame
DataFrame 的数据类型统一是 Row, 缺点:
- Row 不能直接操作 domain 对象
- 函数风格编程,没有面向对象风格的 API
所以,Spark SQL 引入了 Dataset,扩展了 DataFrame API,提供了编译时类型检查,面向对象风格的 API。
- Dataset 可以和 DataFrame、RDD 相互转换。DataFrame=Dataset[Row]
- 可见 DataFrame 是一种特殊的 Dataset。
既然 Spark SQL 提供了 SQL 访问方式,那为什么还需要 DataFrame 和 Dataset 的 API 呢?
- SQL 的表达能力却是有限的
- DataFrame 和 Dataset 可以采用更加通用的语言(Scala 或 Python)来表达用户的 查询请求。
- DataFrame / Dataset 的面向对象语法, 可以更快捕捉错误,因为 SQL 是运行时捕获异常,而 Dataset 是 编译时检查错误。
DataFrame & Dataset 的部分 api
总的来讲
1 | DataFream=DataSet[row] //弱类型 |
举个🌰
SparkCore: RDD
1 | scala> case class Person(name: String, age: Long) |
SparkSQL: DataFrame
1 | scala> val df = spark.read.json("/user/ap/sparkdatas/people") |
SparkSQL: DataSet
1 | // as[Person] 把 Json 读出来的 DataFrame 直接反射到 Person类上, 转为 DataSet !! 还有这种骚操作?? |
来张图!
五. 数据源之load 和 Save
1 | import org.apache.spark.SparkConf |
六. 数据源之parquet files 合并
1 | val df1 = sc.makeRDD(1 to 5).map(i => (i, i * 2)).toDF("single", "double") |
七. 数据源之 MySQL
注意: 启动 Spark Shell,必须指定 mysql 连接驱动 jar 包
启动本机的单进程 Shell
1 | spark-shell \ |
连接 Spark 集群的 shell
1 | spark-shell \ |
通过 Spark-submit 的方式
编写代码
1 | //通过读取文件创建 RDD |
准备数据:student.txt 存储在 HDFS 上的/student 目录中
给项目打成 jar 包,上传到客户端
提交任务给 Spark 集群:
1 | $SPARK_HOME/bin/spark-submit \ |
八. 数据源之json
1 | // 方式 1 |
九. 数据源之 Hive
与之前的自己构造数据, 或者读json
数据等,得到 DataFrame
, 然后把此 df.createTempView(“tmpView”)
然后再对此 tmpView
使用 sql 语句查询不一样
直接用 sparkSession.sql(“...”)
操作的对象默认就是 hive
==> 访问 hive 的元数据库(mysql), 把 hive 底层的执行引擎 MapReduce
换成了SparkCore