Spark重点解析(四)=> Spark Streaming

0. StreamingContext 的创建过程解析

1.直接上代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
1> 典型的创建方法
首先创建一个 SparkConf 对象
传入 conf 和 Streaming 的 batchTime, 创建 StreamingContext
-------
val conf = new SparkConf().setAppName("KafkaDirectWordCount").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(5))

2> 进入 StreamingContext
内部会创建一个 StreamingContext, 用传进来的 conf 作为参数
-------
def this(conf: SparkConf, batchDuration: Duration) = {
this(StreamingContext.createNewSparkContext(conf), null, batchDuration)
}

3> 进入 createNewSparkContext, 再进入 SparkContext中, 找到最长的 this(..,..,..)方法
这里其实也就是 SparkContext 的初始化流程, 参考下图
-------
private[streaming] def createNewSparkContext(conf: SparkConf): SparkContext = {
new SparkContext(conf)
}
-------
/**
* Alternative constructor that allows setting common Spark properties directly
*
* @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
* @param appName A name for your application, to display on the cluster web UI.
* @param sparkHome Location where Spark is installed on cluster nodes.
* @param jars Collection of JARs to send to the cluster. These can be paths on the local file
* system or HDFS, HTTP, HTTPS, or FTP URLs.
* @param environment Environment variables to set on worker nodes.
*/
def this(
master: String,
appName: String,
sparkHome: String = null,
jars: Seq[String] = Nil,
environment: Map[String, String] = Map()) = {
this(SparkContext.updatedConf(new SparkConf(), master, appName, sparkHome, jars, environment))
}

=--------------
> 暂存

// Worker to Master

/**
* @param id the worker id
* @param host the worker host
* @param port the worker post
* @param worker the worker endpoint ref
* @param cores the core number of worker
* @param memory the memory size of worker
* @param workerWebUiUrl the worker Web UI address
* @param masterAddress the master address used by the worker to connect
*/
case class RegisterWorker(
id: String,
host: String,
port: Int,
worker: RpcEndpointRef,
cores: Int,
memory: Int,
workerWebUiUrl: String,
masterAddress: RpcAddress)
extends DeployMessage {
Utils.checkHost(host)
assert (port > 0)
}

一. DStream 源码解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// RDDs generated, marked as private[streaming] so that testsuites can access it
// 内部有一个generatedRDDs方法, 是一个 HashMap[Time, RDD], key:time value:RDD
@transient
private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]]()

// 根据指定的时间产生一个 RDD
/** Method that generates an RDD for the given time */
def compute(validTime: Time): Option[RDD[T]]


// 对 DStream 进行操作, 本质上是对 依赖的 父 RDD 进行操作
1>
/** Return a new DStream by applying a function to all elements of this DStream. */
def map[U: ClassTag](mapFunc: T => U): DStream[U] = ssc.withScope {
new MappedDStream(this, context.sparkContext.clean(mapFunc))
}

2>
package org.apache.spark.streaming.dstream
class MappedDStream[T: ClassTag, U: ClassTag] (
parent: DStream[T],
mapFunc: T => U
) extends DStream[U](parent.ssc) {
override def dependencies: List[DStream[_]] = List(parent)
override def slideDuration: Duration = parent.slideDuration
override def compute(validTime: Time): Option[RDD[U]] = {
parent.getOrCompute(validTime).map(_.map[U](mapFunc))
}
}

3>
_.map[U]: 最后的 map 方法,是调用 RDD 的 map 方法, 传入自己定义的处理函数
再掉 getOrCompute 方法 , 里面使用了 generatedRDDs

private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {
// If RDD was already generated, then retrieve it from HashMap,
// or else compute the RDD
generatedRDDs.get(time).orElse {

4>
发现父类创建的 RDD 是一个 HashMap[Time, RDD[T]]
@transient
private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]]()

总结:

  • Spark Streaming是一个基于Spark Core之上的实时计算框架,可以从很多数据源消费数据并对数据进行处理
  • 在Spark Streaing中有一个最基本的抽象叫DStream(代理),本质上就是一系列连续的RDD,DStream其实就是对RDD的封装
  • DStream可以认为是一个RDD的工厂,该DStream里面生产都是相同业务逻辑的RDD,只不过是RDD里面要读取数据的不相同
  • 深入理解DStream: 他是sparkStreaming中的一个最基本的抽象,代表了一下列连续的数据流,本质上是一系列连续的RDD,你对DStream进行操作,就是对RDD进行操作
  • DStream每隔一段时间生成一个RDD,你对DStream进行操作,本质上是对里面的对应时间的RDD进行操作
  • DSteam和DStream之间存在依赖关系,在一个固定的时间点,对个存在依赖关系的DSrteam对应的RDD也存在依赖关系,
  • 每个一个固定的时间,其实生产了一个小的DAG,周期性的将生成的小DAG提交到集群中运行

二. Spark Streaming & Kafka 连接的2种方式

Receiver-based

基于 Receiver 的wordcount 代码

主要是 直接使用 高级 api KafkaUtils.createStream

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package com.rox.spark.scala

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}


/**
* SparkStreaming-Kafka
*/
object KafkaWordCount {
def main(args: Array[String]): Unit = {

val conf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[*]")

val ssc = new StreamingContext(conf,Seconds(3))

val zkQuorum = "cs1:2181,cs2:2181,cs3:2181"
val groupId = "g1"
val topic = Map[String, Int]("kafka-test" -> 1)

// 创建 DStream, 需要 KafkaDStream
val data: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, zkQuorum, groupId, topic)

//对数据进行处理
//Kafak的ReceiverInputDStream[(String, String)]里面装的是一个元组(key是写入的key,value是实际写入的内容)
val lines: DStream[String] = data.map(_._2)

println("这里打印的是 元祖 中取出的第一个元素--key" + data.map(_._1).toString)

//对DSteam进行操作,你操作这个抽象(代理,描述),就像操作一个本地的集合一样
//切分压扁
val words: DStream[String] = lines.flatMap(_.split(" "))

// 单词和1 组合
val wordAndOne: DStream[(String, Int)] = words.map((_,1))

// 聚合
val reduced: DStream[(String, Int)] = wordAndOne.reduceByKey(_+_)

// 打印结果
reduced.print()

// 启动 sparkStreaming 程序
ssc.start()

// 等待优雅的退出
ssc.awaitTermination()
}
}

Direct Approach 直连

区别

  • Receiver 接收固定时间间隔的数据(放在内存中的), 使用 Kafka 高级 API, 自动维护偏移量, 数据达到固定时间才进行处理, 效率低并且 容易丢失数据
    • 直接把偏移量记录到了 zk 当中, 但是效率低, 先拉数据, 拉到一定的时间再处理, 这样就有可能数据溢出, 为了保证数据不溢出, 就有一个 WALS 的机制(write ahead logs), 会把溢出的数据写入到 hdfs 中 , 效率低
  • Direct 直连方式, 相当于直接链接到 Kafka 的分区上, 使用 Kafka 底层的 API, 效率高, 需要自己维护偏移量

    • 直连方式, 就是一个迭代器, 变读边计算, 使用了 kafka 底层的 api, 直接连到了kafka 的分区文件上, 效率更高
  • receiver 接受数据是在Executor端 cache

    • 如果使用的窗口函数的话,没必要进行cache, 默认就是cache, WAL ;
    • 如果采用的不是窗口函数操作的话,你可以cache, 数据会放做一个副本放到另外一台节点上做容错
  • direct直连方式, 接受数据是在Driver端

SparkStreaming 直连 kafka 代码

另外一版本见, 主要是有个地方用的transform, 区别不大

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package com.rox.spark.scala

import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
import org.I0Itec.zkclient.ZkClient
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Duration, Seconds, StreamingContext}


/**
*
* 直连: 本质上就是创建一个 Kafka 的 inputDStream 的迭代器, kafka 产生一个数据, 这边就按照批次消费
* 还可以提交到关系型数据库, 提交事务`
*/
object KafkaDirectWordCount2 {

def main(args: Array[String]): Unit = {

val conf = new SparkConf().setAppName("KafkaDirectWordCount").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(5))

val brokerList = "cs2:9092,cs3:9092,cs4:9092"
val zkQuorum = "cs1:2181,cs2:2181,cs3:2181,"
val topic = "wordcount"
val topics = Set(topic)
val group = "g001"


val kafkaParams = Map(
"metadata.broker.list" -> brokerList,
"group.id" -> group,
//从头开始读取数据
"auto.offset.reset" -> kafka.api.OffsetRequest.SmallestTimeString
)
val zkClient = new ZkClient(zkQuorum)

val topicDirs = new ZKGroupTopicDirs(group, topic)
val zkTopicPath: String = s"${topicDirs.consumerOffsetDir}" // ../topic => 拿到 zk 中存储的 consumerffset 的路径
val childrenNum: Integer = zkClient.countChildren(zkTopicPath) // 拿到路径 ../topic 下的子节点(分区)

var kafkaStream: InputDStream[(String, String)] = null // 定义kafkaStream 流对象
var fromOffsets: Map[TopicAndPartition, Long] = Map() // 以topic+partition 为 key, offset 为 value, 定义集合 Map 保证唯一性

if (childrenNum > 0) { // 存在分区信息
for (i <- 0 until childrenNum) {
val partitionOffset = zkClient.readData[String](s"$zkTopicPath/${i}") // 取出分区对应信息--> offset
val tp = TopicAndPartition(topic, i) // 创建 tp 对象 (主题,分区)
fromOffsets += (tp -> partitionOffset.toLong)
}
println("-------打印取出来的主题,分区 和 offset-------")
fromOffsets.foreach(println)
println("------------------------------------------")

val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key(), mmd.message()) //定义 kafka 消息的 处理函数

kafkaStream = KafkaUtils.createDirectStream
[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler) // 拿到 kafkaDirectStream
}
else { // zk 中不存在分区信息, 就直接(从头)读取主题
kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
}

/**
* 遍历kafkaStream
* 数据收集到 kafka 中的时候, 对应的OffsetRange 啥的就已经确定了, 比如当前的 untilOffset 是多少
* 这里只是拿到了这个流, 从中读取数据后, 顺便再把这个offset 存到zk 中去罢了
这里可以先搞个 transform 把 offsetRanges 取出来

var offsetRanges = Array[OffsetRange]()
val transform: DStream[(String, String)] = kafkaStream.transform { rdd =>
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}
*/

kafkaStream.foreachRDD { rdd => // 迭代
/**
* OffsetRange(topic: 'wordcount', partition: 0, range: [20 -> 20])
OffsetRange(topic: 'wordcount', partition: 1, range: [19 -> 19])
OffsetRange(topic: 'wordcount', partition: 2, range: [19 -> 19])
*/
var offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges // 强转为HasOffsetRanges, 取出 kafka 中的 offsetRanges(是一个由 OffsetRange 组成的数组)

// 这里打印的都是从 kafka 读到的实时流的 信息
println("-------遍历offsetRanges数组----------------")
offsetRanges.foreach(x => println(x))
println("------------------------------------------")

val inputV = rdd.map(_._2) // 从 kafkaStream 中取出 真正的内容

inputV.foreachPartition(partition => //对 RDD 进行操作, 触发 Action
//++++++++++++++++++++++++++++++++++++//++++++++++++++++++++++++++++++++++++
partition.foreach(println) // 这里处理所有的业务逻辑代码, 这里仅仅只是做了打印
//++++++++++++++++++++++++++++++++++++//++++++++++++++++++++++++++++++++++++
)

// 这里其实就相当于: 读取一个 (DStream) 的数据, 就往 zk 中更新一次 offset
for (o <- offsetRanges) { // 从存储偏移量数组中取出值
val zkPath = s"${topicDirs.consumerOffsetDir}/${o.partition}" // 从 zk 中取出当前消费的主题分区路径模板, 拼接上当前读到的分区, 更新上最新读到哪里
ZkUtils.updatePersistentPath(zkClient, zkPath, o.untilOffset.toString)
}
}
// 启动
ssc.start()

// 优雅的等待结束
ssc.awaitTermination()
}
}

注意点:

RDD 的分区 & Kafka 的分区

  • Kafka 的分区是物理的
  • Rdd 的分区是抽象的
  • Rdd 的分区 与 Kafka 的分区是 一一对应

updateByKey 代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package com.rox.spark.scala


import org.apache.spark.{HashPartitioner, SparkConf}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}


/**
* 累加的 wordcount
*/
object KafkaStatefulWordCount{
def main(args: Array[String]): Unit = {

val conf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[*]")

val ssc = new StreamingContext(conf,Seconds(3))

// 如果要使用updateStateByKey累加历史数据, 那么要把每次的结果保存起来
ssc.checkpoint("file:///Users/shixuanji/Documents/Code/Datas/updatebykey")

val zkQuorum = "cs1:2181,cs2:2181,cs3:2181"
val groupId = "g1"
val topic = Map[String, Int]("kafka-test" -> 1)

// 创建 DStream, 需要 KafkaDStream
val data: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, zkQuorum, groupId, topic)

//对数据进行处理
//Kafak的ReceiverInputDStream[(String, String)]里面装的是一个元组(key是写入的key,value是实际写入的内容)
val lines: DStream[String] = data.map(_._2)

println("这里打印的是 元祖 中取出的第一个元素--key" + data.map(_._1).toString)

//对DSteam进行操作,你操作这个抽象(代理,描述),就像操作一个本地的集合一样
//切分压扁
val words: DStream[String] = lines.flatMap(_.split(" "))

// 单词和1 组合
val wordAndOne: DStream[(String, Int)] = words.map((_,1))

// 聚合
val reduced: DStream[(String, Int)] = wordAndOne.updateStateByKey(updateFunc,new HashPartitioner(ssc.sparkContext.defaultParallelism),true)

// 打印结果(Action)
reduced.print()

// 启动 sparkStreaming 程序
ssc.start()

// 等待优雅的退出
ssc.awaitTermination()
}


/**
* updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)]
*
*/
val updateFunc = (iter:Iterator[(String, Seq[Int],Option[Int])]) => {
/**
* 第一个参数:聚合的key,就是单词
* 第二个参数:当前批次产生批次该单词在每一个分区出现的次数
* 第三个参数:初始值或累加的中间结果
*/
// iter.map(t => (t._1, t._2.sum + t._3.getOrElse(0)))

// 这里可以使用更高级的模式匹配
iter.map{case(x, y, z) => (x, y.sum + z.getOrElse(0))}
}
}

三. 数据累加的方式

1.. 使用 updateByKey

见上面代码, 需要设置一个处理的函数, 需要设置 checkPoint, 不如直接在数据库中累加 和查询的方便

如果帮到你, 可以给我赞助杯咖啡☕️
0%