i-Spark-3

一. Spark Streaming 简介

1.介绍

  • spark core 的扩展,针对实时数据流处理,具有可扩展、高吞吐量、容错.
  • 数据可以是来自于kafka, flume, tcpsocket,使用高级函数(map reduce filter ,join , window)
  • 处理的数据可以推送到 database, hdfs, 针对数据流处理可以应用到机器学习和图计算中。

实时计算相关技术
Strom / JStrom Spark Streming Flink
实时性高 有延迟 实时性高
吞吐量较低 吞吐量高 吞吐量高
只能实时计算 离线+实时 离线+实时
算子比较少 算子丰富 算子丰富
没有 机器学习 没有
没有 图计算 没有
使用比较少 非常火 一般

2. DStream (Discretized Stream ) 离散流

概念:

  • 离散流,表示的是连续的数据流。连续的RDD序列。准实时计算。
  • 通过kafka、flume等输入数据流产生,也可以通过对其他DStream进行高阶变换产生。
  • 在内部,DStream表现为RDD序列。

注意事项:

  • 启动上下文之后,不能启动新的离散流
  • 上下文停止后不能restart
  • 同一 JVM只有一个 active 的 StreamingContext
  • 停止StreamingContext会一同stop掉SparkContext,如若只停止StreamingContext. ssc.stop(false|true); :TODO
  • SparkContext可以创建多个StreamingContext, 创建新的之前停掉旧的。

DStream和Receiver:

  • 介绍
    • Receiver是接受者,从source接受数据,存储在内存中供spark处理。
    • 基本源:fileSystem | socket,内置API支持。
    • 高级源:kafka | flume | …,需要引入pom.xml依赖.
  • 注意
    • 使用local模式时,不能使用一个线程.使用的local[n],n需要大于receiver的个数。

二. 体验Spark Streaming

1.引入 pom 依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.0</version>
</dependency>

2. Scala 操作代码 in IDEA

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//local[n] n > 1
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")

//创建Spark流上下文,批次时长是1s
val ssc = new StreamingContext(conf, Seconds(1))

//创建socket文本流
val lines = ssc.socketTextStream("localhost", 9999)

//压扁
val words = lines.flatMap(_.split(" "))

//变换成对偶
val pairs = words.map((_,1));

// 化简
val count = pairs.reduceByKey(_+_) ;
count.print()

//启动
ssc.start()

//等待结束
ssc.awaitTermination()

3. 通过 nc 与 Spark 交互

1
2
3
4
5
6
7
8
9
10
11
12
1> 启动 nc
nc -lk 9999

2> 运行 Spark Streaming 代码, 开启监听 localhost:9999

3> 在 nc 命令行输入单词
$> hello world
$> ....

$> 观察 spark 控制台打印

PS: 把 log4j 文件放到项目的 resources 下, 设置log4j 的打印级别为 WARN, 否则很难观察清楚

log4j 文件请参考

4.导出 Spark Streaming 为 jar 包, 放到 Linux 下运行

注意: 直接spark-submit 或者 spark-submit –help , 会弹出帮助

1
2
3
4
$> spark-submit --name wcstreaming 
--class com.rox.spark.scala.SparkStreamingDemo
--master spark://cs1:7077
SparkDemo1-1.0-SNAPSHOT.jar

三. Kafka 与 Spark Streaming 整合

1. 回忆Kafka

1
2
3
4
5
6
7
8
9
10
11
12
13
1> 启动 kafka,cs2~cs4  前提先启动 zk
$> kafka-server-start.sh /home/ap/apps/kafka/config/server.properties

2> 查看主题:
$> kafka-topics.sh --zookeeper cs1:2181 --list

3> 开启消费者
[ap@cs4]~%> kafka-console-consumer.sh --zookeeper cs3:2181 --topic kafka-test

4> 开启生产者
[ap@cs2]~%> kafka-console-producer.sh --broker-list cs4:9092 --topic kafka-test

>> 在生产者发送消息, 查看消费者

2.Driver 高可用

1
2
3
4
5
6
7
8
9
10
11
# supervise
# Run on a Spark standalone cluster in cluster deploy mode with supervise
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://207.184.161.138:7077 \
--deploy-mode cluster \
--supervise \
--executor-memory 20G \
--total-executor-cores 100 \
/path/to/examples.jar \
1000

3. 使用 Java编写 Kafka-SparkStreaming 代码

具体见 github

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package com.rox.spark.java;

import java.util.*;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.Seconds;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka010.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import scala.Tuple2;

public class KafkaSparkstreamingDemo {
public static void main(String[] args) throws Exception {

SparkConf conf = new SparkConf();
conf.setAppName("KafkaSparkstreamingDemo");
conf.setMaster("local[4]");
//创建Spark流应用上下文
JavaStreamingContext streamingContext = new JavaStreamingContext(conf, Seconds.apply(2));

Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "cs2:9092,cs3:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "g6");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);

Collection<String> topics = Arrays.asList("kafka-test");

// 取出 kafka stream
final JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
);

// 压扁
JavaDStream<String> wordDS = stream.flatMap(new FlatMapFunction<ConsumerRecord<String, String>, String>() {
@Override
public Iterator<String> call(ConsumerRecord<String, String> r) throws Exception {
String value = r.value();
List<String> list = new ArrayList<>();
String[] arr = value.split(" ");
for (String s : arr) {
list.add(s);
}
return list.iterator();
}
});

// 映射成元祖 (拼1)
JavaPairDStream<String,Integer> pairDS = wordDS.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<String, Integer>(s,1);
}
});

// 聚合
JavaPairDStream<String, Integer> countDS = pairDS.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});

// 打印计算结果
countDS.print();

streamingContext.start();

streamingContext.awaitTermination();
}
}

四. 跨单位时间,单位距离 (Window) , 跨批次(updateStateByKey)

1. Window

关键参数:

batch interval

  • 批次的间隔.

windows length

  • 窗口长度,跨批次。是批次的整数倍。

slide interval

  • 滑动间隔,窗口计算的间隔时间,有时批次interval的整倍数。


关键代码示例:

具体查看 github

1
2
3
4
5
6
7
8
9
10
11
12
//聚合
/** 统计同一 window 下的 key 的聚合 (用的比较多...)
def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V],
windowDuration: Duration,
slideDuration: Duration): JavaPairDStream[K, V]
>>> Return a new DStream by applying `reduceByKey` over a sliding window
*/
JavaPairDStream<String,Integer> countDS = pairDS.reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer v1, Integer v2) {
return v1 + v2;
}
},Seconds.apply(6),Seconds.apply(4));

2. updateStateByKey

跨批次统计, 会一直累加, 具体查看 github

关键代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
// 可用于跨批次统计 updateStateByKeya
JavaPairDStream<String,Integer> jps = pairDS.updateStateByKey(new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
public Optional<Integer> call(List<Integer> v1, Optional<Integer> v2) throws Exception {
Integer newCount = v2.isPresent() ? v2.get() : 0 ;

System.out.println("old value : " + newCount);
for(Integer i : v1){
System.out.println("new value : " + i);
newCount = newCount + i;
}
return Optional.of(newCount);
}
});

五. spark streaming中的容错实现 ※

1.生产环境中spark streaming的job的注意事项

避免单点故障。

Driver

  • 驱动,运行用户编写的程序代码的主机。

Executors

  • 执行的spark driver提交的job,内部含有附加组件比如receiver
  • receiver接受数据并以block方式保存在memory中,同时,将数据块复制到其他executor中,以备容错。
  • 每个批次末端会形成新的DStream,交给 下游处理。
  • 如果receiver故障,其他执行器中的receiver会启动进行数据的接收。

checkpoint

  • 检查点
  • 用于容错处理, 设置后, 会把数据存储到检查点目录, 当出问题后, 从检查点恢复.

2. 通过checkpoint 实现 Spark Streaming 的容错

如果executor故障,所有未被处理的数据都会丢失,解决办法可以通过wal (hbase,hdfs/WALs)方式将数据预先写入到hdfs或者s3.

如果Driver故障,driver程序就会停止,所有executor都是丢失连接,停止计算过程。

解决办法需要配置和编程。

  • 配置Driver程序自动重启,使用特定的clustermanager实现。 :TODO how?
  • 重启时,从宕机的地方进行重启,通过检查点机制可以实现该功能。
    • 检查点目录可以是本地,可以是hdfs, 生产中一般都是 hdfs
    • 不再使用new方式创建SparkStreamContext对象,而是通过工厂方式JavaStreamingContext.getOrCreate()方法创建
    • 上下文对象,首先会检查检查点目录,看是否有job运行,没有就new新的。
  • 编写容错测试代码,计算过程编写到 Function0call方法中。

知识点: Function0, 1, 2, 3, 4

1
2
3
4
5
6
7
8
为何要使用 Function0?
因为 JavaStreamingContext.getOrCreate(path,function0) 的第二个参数就是 Function0
/**
* A zero-argument function that returns an R.
*/
public interface Function0<R> extends Serializable {
R call() throws Exception;
}

Java 代码, 具体见 github

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61

package com.rox.spark.java;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function0;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

public class SparkStreamingCheckpointDemo {
public static void main(String[] args) throws Exception {

/**
* Create a factory object that can create and setup a new JavaStreamingContext
* 可以使用 Function0
* A zero-argument function that returns an R.
*/
Function0<JavaStreamingContext> contextFactory = new Function0<JavaStreamingContext>(){
@Override
// 首次创建 context 时, 调用此方法
public JavaStreamingContext call() throws Exception {
SparkConf conf = new SparkConf();
conf.setMaster("local[4]");
conf.setAppName("SparkStreamingCheckpointDemo");

// 创建流上下文对象
JavaStreamingContext jsc = new JavaStreamingContext(conf, new Duration(2 * 1000));

// Create an input stream from network source hostname:port.
JavaDStream<String> lines = jsc.socketTextStream("localhost", 10086);

// =============== 变换代码 ===============
// 设置一个窗口时长为1天, 滚动间隔为 2s
JavaDStream<Long> longJavaDStream = lines.countByWindow(new Duration(24 * 60 * 60 * 1000), new Duration(2 * 1000));

longJavaDStream.print();

// 设置检查点 目录
jsc.checkpoint("file:///Users/shixuanji/Documents/Code/temp/check");

// 返回流上下文对象
return jsc;
}
};


/**
* def getOrCreate(
checkpointPath: String,
creatingFunc: JFunction0[JavaStreamingContext]
): JavaStreamingContext
注意: 第2个参数是一个 Function0对象
*/
// 失败后, 重新创建时, 会经过检查点
JavaStreamingContext context = JavaStreamingContext.getOrCreate("file:///Users/shixuanji/Documents/Code/temp/check", contextFactory);

context.start();
context.awaitTermination();

}
}
如果帮到你, 可以给我赞助杯咖啡☕️
0%