一. Spark 核心 API
[SparkContext]
- 连接到spark集群,入口点.
[HadoopRDD]
- 读取hadoop上的数据,
[MapPartitionsRDD]
- 针对父RDD的每个分区提供了函数构成的新类型RDD.
[PairRDDFunctions]
- 对偶RDD函数类。
- 可用于KV类型RDD的附加函数。可以通过隐式转化得到.
[ShuffleRDD]
- 从Shuffle中计算结果的RDD.
[RDD]
- 是分区的集合, 弹性分布式数据集, 不可变的数据分区集合.
- 基本操作(map filter , persist)
- 特点:
- 分区列表 //数据
- 应用给每个切片的计算函数 //行为
- 到其他RDD的依赖列表 //依赖关系
- (可选)针对kv类型RDD的分区类
- (可选)首选位置列表
[DAGScheduler]
- 高级调度器层面,实现按照阶段(stage) 进行 shuffle
- 对每个JOB的各阶段(stage)计算有向无环图(DAG),并且跟踪RDD和每个阶段的输出。
- 找出最小调度运行作业, 将Stage对象以TaskSet方式提交给底层的调度器。
- 底层调度器实现TaskScheduler, 进而在cluster上运行job.
- TaskSet已经包含了全部的单独的task,这些Task都能够基于cluster的数据进行正确运行。
- Stage通过在需要shuffle的边界处将RDD打碎来创建Stage对象。
- 具有
窄依赖
的RDD操作(比如map /filter)被管道化至一个taskset中. - 而具有shuffle依赖的操作则包含多个Stage(一个进行输出,另一个进行输入)
- 最后,每个stage都有一个针对其他stage的shuffle依赖,可以计算多个操作。
- DAG调度器检测首选位置来运行task,通过基于当前的缓存状态,并传递给底层的task调度器来实现。根据shuffle的输出是否丢失处理故障问题。
- 不是因为随机文件丢失造成的故障会由任务调度程序处理,它在取消整个stage前花一小段时间重试每个任务
- 为了容错,同一stage可能会运行多次,称之为”attempts”,如果task调度器报告了一个故障(该故障是由于上一个stage丢失输出文件而导致的)DAG调度就会重新提交丢失的stage。这个通过具有 FetchFailed的CompletionEvent对象或者ExecutorLost进行检测的。
- DAG调度器会等待一段时间看其他节点或task是否失败,然后对丢失的stage重新提交taskset,计算丢失的task。
术语介绍
[ job ]
- 提交给调度的顶层的工作项目,由 ActiveJob 表示。
- 是Stage集合。
[Stage]
- 是task的集合,计算job中的中间结果。同一RDD的每个分区都会应用相同的计算函数。
- 在shuffle的边界处进行隔离(因此引入了隔断,需要上一个stage完成后,才能得到output结果)
- Stage有两个子类:
- 1) ResultStage,用于执行action动作的最终stage。
- 2) ShuffleMapStage, 对shuffle进行输出文件的写操作的。如果job重用了同一个rdd的话,stage通常可以跨越多个job实现共享。
- 并行任务的集合,都会计算同一函数。所有task有着同样的shuffle依赖,调度器运行的task DAG 在shuffle边界处划分成不同阶段。调度器以拓扑顺序执行.
- 每个stage可以shuffleMapStage,该阶段下输出是下一个stage的输入,也可以是resultStage,该阶段 task直接执行spark action。对于shuffleMapStage,需要跟踪每个输出分区所在的节点。
- 每个stage都有FirstJobId,区分于首次提交的id
[ShuffleMapStage]
- 产生输出数据,在每次shuffle之前发生。内部含有shuffleDep字段,有相关字段记录产生多少输出以及多少输出可用
- DAGScheduler.submitMapStage()方法可以单独提交submitMapStage().
[ResultStage]
- 该阶段在RDD的一些分区中应用函数来计算Action的结果。有些stage并不会在所有分区上执行。例如first(),lookup();
[Task]
- 单独的工作单元,每个发送给一台主机。
[Cache tracking]
- Dag调度器找出哪些RDD被缓存,避免不必要的重复计算,同时,也会记住哪些shuffleMap已经输出了结果,避免map端shuffle的重复处理。
[Preferred locations]
- dag调度器根据rdd的中首选位置属性计算task在哪里运行。
[Cleanup]
- 运行的job如果完成就会清楚数据结构避免内存泄漏,主要是针对耗时应用。
[ActiveJob]
- 在Dag调度器中运行job。作业分为两种类型,
- 1) result job,计算ResultStage来执行action.
- 2 )map-state job,为shuffleMapState结算计算输出结果以供下游stage使用。主要使用finalStage字段进行类型划分。
- job只跟踪客户端提交的”leaf” stage,通过调用Dag调度器的submitjob或者- submitMapStage()方法实现.
- job类型引发之前stage的执行,而且多个job可以共享之前的stage。这些依赖关系由DAG调度器内部管理。
[LiveListenerBus]
- 异步传输spark监听事件到监听器事件集合中。
[EventLoop]
- 从caller接受事件,在单独的事件线程中处理所有事件,该类的唯一子类是DAGSchedulerEventProcessLoop。
[LiveListenerBus]
- 监听器总线,存放Spark监听器事件的队列。用于监控。
[OutputCommitCoordinator]
- 输出提交协调器.决定提交的输出是否进入hdfs。
[TaskScheduler]
- 底层的调度器,唯一实现TaskSchedulerImpl。可插拔,同Dag调度器接受task,发送给cluster,运行任务,失败重试,返回事件给DAG调度器。
[TaskSchedulerImpl]
- TaskScheduler调度器的唯一实现,通过BackendScheduler(后台调度器)实现各种类型集群的任务调度。
[SchedulerBackend]
可插拔的后台调度系统,本地调度,mesos调度,。。。
在SchedulerBackend下方,实现有三种
- LocalSchedulerBackend 本地后台调度器,启动task.
StandaloneSchedulerBackend 独立后台调度器
CoarseGrainedSchedulerBackend 粗粒度后台调度器
[Executor]
- spark程序执行者,通过线程池执行任务。
[Dependency]:依赖
NarrowDependency: 子RDD的每个分区依赖于父RDD的少量分区, 也叫完全依赖
|
/ \
—
|—- OneToOneDependency //父子RDD之间的分区存在一对一关系。
|—- RangeDependency //父RDD的一个分区范围和子RDD存在一对一关系。
|—- PruneDependency // 在PartitionPruningRDD和其父RDD之间的依赖, 子RDD包含了父RDD的分区子集。
ShuffleDependency: 在shuffle阶段输出时的一种依赖, 属于一种宽依赖, 也叫部分依赖
二. Spark 其它概念
1. 创建 Spark 上下文(Context)
[本地模式,通过线程模拟]
本地后台调度器
spark local
spark local[3] //3线程,模拟cluster集群
spark local[*] //匹配cpu个数,
spark local[3,2] //3:3个线程,2为maxFailures。
- // local[*, M] means the number of cores on the computer with M failures
- // local[N, M] means exactly N threads with M failures
- 0和1等价,只执行一次, 失败后不会重试
[相当于伪分布式]
StandaloneSchedulerBackend
spark local-cluster[N, cores, memory] //模拟spark集群。
[完全分布式]
StandaloneSchedulerBackend
spark spark://cs1:7077 //连接到spark集群上.
2. RDD 持久化
跨操作进行RDD的内存式存储。
持久化RDD时,节点上的每个分区都会保存操内存中,以备在其他操作中进行重用。
缓存技术是迭代式计算和交互式查询的重要工具。
使用persist()和cache()进行rdd的持久化。
cache()是persist()一种.
action第一次计算时会发生persist().
spark的cache是容错的,如果rdd的任何一个分区丢失了,都可以通过最初创建rdd的进行重新计算。
persist可以使用不同的存储级别进行持久化。
1
2
3
4
5
6
7
8MEMORY_ONLY //只在内存
MEMORY_AND_DISK
MEMORY_ONLY_SER //内存存储(串行化)
MEMORY_AND_DISK_SER
DISK_ONLY //硬盘 // 默认的路径似乎在程序执行完后自己会删掉??
MEMORY_ONLY_2 //带有副本
MEMORY_AND_DISK_2 //快速容错。
OFF_HEAP // 离堆内存删除持久化数据
rdd.unpersist();
3.数据传递
- map(),filter()高级函数中访问的对象被串行化到各个节点。每个节点都有一份拷贝。
- 变量值并不会回传到driver程序。
4.共享变量
spark通过广播变量和累加器实现共享变量。
[广播变量]
1 | //创建广播变量 |
[累加器]
1 | // 创建累加器 |
5.通过 Spark 实现 PI 的分布式计算
:TODO 计算出来的结果不精准 ??
1 | sc.parallelize(1 to 100000000).map(e=>{ |
三. SparkSQL
Hive //hadoop mr sql
pheonix //hbase之上构建sql交互过程
DataFrame //收据框.表.该模块能在spark运行sql语句。
SparkSQL //SQL | DataFrame API.
1.Spark SQL Shell 操作
1 | //创建样例类 |
2. IDEA 操作 SQL
1> 通过读写文件完成基本 SQL 操作
1 | // 源码 ==> //DataFrame 类似于table操作。 |
2> 通过 JDBC 操作 MySQL
添加 pom 依赖
mysql-connector-java
1
2
3
4
5<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.17</version>
</dependency>
3.整合 Hive
1.配置 & Spark-shell 操作
1 | 1> .hive的类库需要在spark worker节点。 |
2. Java版SparkSQL 操作 Hive 表
1 | <dependency> |
Java 代码
1 | SparkConf conf = new SparkConf(); |
3. Spark下 分布式访问 Hive (Spark-Shell) —– 开启thriftserver, 通过 beeline 访问 hive
1 | 1> 在默认库下创建hive的数据表。 |
4. 使用 Java 通过 ThreadServer, 使用 JDBC 访问 Hive
1 | Class.forName("org.apache.hive.jdbc.HiveDriver"); |
5. 直接使用 $SPARK_HOME/bin/spark-sql
脚本访问 hive
1.配置 hive-site.xml
, 分发到各个节点
1 | <!--加上一下配置, 指明访问 hive 的 metadata 服务--> |
2.在装有 hive & mysql 的节点上启动 hive 的 metastore 服务
1 | nohup hive --service metastore 1>/home/ap/logs/hive_thriftserver.log 2>&1 & |
3.在 cs1节点启动 spark-sql
服务, 即可进入 spark-sql
命令行模式
1 | $cs1> spark-sql |
6.在 idea 上操作 hive
1.遇到问题
spark-hive_2.1.1 pom 导入失败
解决:
- 首先看报错时, 缺少哪些库, 然后直接选择
option+return
, 有一个自动导入 maven 的选项, 不过最好是自己知道要导入哪些库 - 如果
pom.xml
一直导入失败, 看下报错的提示, 有哪些库导入不成功, 可以reimport
, 还是不行的话, 找到报错的类, 找到自己的 maven 仓库, 删掉原来下载好的, 重新 导入, 重新点击 maven 刷新
idea 编码中, 关键点在.enableHiveSupport()
, 和pom.xml
配置`
1 | System.setProperty("HADOOP_USER_NAME","ap"); |
2.自己的 xml 备份
1 | <?xml version="1.0" encoding="UTF-8"?> |
3.比较全的 xml 备份 (Spark)
1 | <?xml version="1.0" encoding="UTF-8"?> |