一. Spark Streaming 简介
1.介绍
- 是
spark core
的扩展,针对实时数据流处理,具有可扩展、高吞吐量、容错. - 数据可以是来自于
kafka, flume, tcpsocket
,使用高级函数(map reduce filter ,join , window) - 处理的数据可以推送到
database
,hdfs
, 针对数据流处理可以应用到机器学习和图计算中。
实时计算相关技术
Strom / JStrom Spark Streming Flink
实时性高 有延迟 实时性高
吞吐量较低 吞吐量高 吞吐量高
只能实时计算 离线+实时 离线+实时
算子比较少 算子丰富 算子丰富
没有 机器学习 没有
没有 图计算 没有
使用比较少 非常火 一般
2. DStream (Discretized Stream ) 离散流
概念:
- 离散流,表示的是连续的数据流。连续的RDD序列。准实时计算。
- 通过kafka、flume等输入数据流产生,也可以通过对其他DStream进行高阶变换产生。
- 在内部,DStream表现为RDD序列。
注意事项:
- 启动上下文之后,不能启动新的离散流
- 上下文停止后不能restart
- 同一 JVM只有一个 active 的 StreamingContext
- 停止StreamingContext会一同stop掉SparkContext,如若只停止StreamingContext.
ssc.stop(false|true);
:TODO - SparkContext可以创建多个StreamingContext, 创建新的之前停掉旧的。
DStream和Receiver:
- 介绍
- Receiver是接受者,从source接受数据,存储在内存中供spark处理。
- 源
- 基本源:fileSystem | socket,内置API支持。
- 高级源:kafka | flume | …,需要引入pom.xml依赖.
- 注意
- 使用local模式时,不能使用一个线程.使用的local[n],n需要大于receiver的个数。
二. 体验Spark Streaming
1.引入 pom 依赖
1 | <dependency> |
2. Scala 操作代码 in IDEA
1 | //local[n] n > 1 |
3. 通过 nc 与 Spark 交互
1 | 1> 启动 nc |
4.导出 Spark Streaming 为 jar 包, 放到 Linux 下运行
注意: 直接spark-submit 或者 spark-submit –help , 会弹出帮助
1 | $> spark-submit --name wcstreaming |
三. Kafka 与 Spark Streaming 整合
1. 回忆Kafka
1 | 1> 启动 kafka,cs2~cs4 前提先启动 zk |
2.Driver 高可用
1 | # supervise |
3. 使用 Java编写 Kafka-SparkStreaming 代码
1 | package com.rox.spark.java; |
四. 跨单位时间,单位距离 (Window) , 跨批次(updateStateByKey)
1. Window
关键参数:
batch interval
- 批次的间隔.
windows length
- 窗口长度,跨批次。是批次的整数倍。
slide interval
- 滑动间隔,窗口计算的间隔时间,有时批次interval的整倍数。
关键代码示例:
1 | //聚合 |
2. updateStateByKey
关键代码示例
1 | // 可用于跨批次统计 updateStateByKeya |
五. spark streaming中的容错实现 ※
1.生产环境中spark streaming的job的注意事项
避免单点故障。
Driver
- 驱动,运行用户编写的程序代码的主机。
Executors
- 执行的spark driver提交的job,内部含有附加组件比如receiver
- receiver接受数据并以block方式保存在memory中,同时,将数据块复制到其他executor中,以备容错。
- 每个批次末端会形成新的DStream,交给 下游处理。
- 如果receiver故障,其他执行器中的receiver会启动进行数据的接收。
checkpoint
- 检查点
- 用于容错处理, 设置后, 会把数据存储到检查点目录, 当出问题后, 从检查点恢复.
2. 通过checkpoint 实现 Spark Streaming 的容错
如果executor故障,所有未被处理的数据都会丢失,解决办法可以通过wal (hbase,hdfs/WALs)方式将数据预先写入到hdfs或者s3.
如果Driver故障,driver程序就会停止,所有executor都是丢失连接,停止计算过程。
解决办法需要配置和编程。
- 配置Driver程序自动重启,使用特定的clustermanager实现。 :TODO how?
- 重启时,从宕机的地方进行重启,通过检查点机制可以实现该功能。
- 检查点目录可以是本地,可以是hdfs, 生产中一般都是 hdfs
- 不再使用new方式创建
SparkStreamContext
对象,而是通过工厂方式JavaStreamingContext.getOrCreate()
方法创建 - 上下文对象,首先会检查检查点目录,看是否有job运行,没有就new新的。
- 编写容错测试代码,计算过程编写到
Function0
的call
方法中。
知识点: Function0, 1, 2, 3, 4
1 | 为何要使用 Function0? |
1 |
|