一. Storm 概述
免费、开源、分布式、实时计算系统。
吞吐量高。
每秒每节点百万元组。
Storm & Hadoop 对比
storm | hadoop |
---|---|
实时流处理 | 批处理 |
无状态 | 有状态 |
使用zk协同的主从架构 | 无zk的主从架构。 |
每秒处理数万消息 | HDFS MR数分钟、数小时 |
不会主动停止 | 终有完成的时候。 |
核心概念
Topology
- Spout + bolt连接在一起形成一个top拓扑,形成有向图,定点就是计算,边是数据流。
- Storm 的拓扑是对实时计算应用 逻辑的封装,它的作用与 MapReduce 的任务(Job)很相似,区别在于 MapReduce 的一个 Job 在得到结果之后总会结束,而拓扑会一直在集群中运行
Tuple
- 主要的数据结构,有序元素的列表。
Stream
Streams 是 storm 最核心的抽象概念,一个 Stream 是分布式环境中并行创建和处理的一个没有边界的 tuple 序列,Streams 是由 Tuple(元组)组成的,Tuple 支持的类型有 Integer、Long、 Short、Byte、String、Double、Float、Boolean、Byte Arrays
当然,Tuple 也支持可序列化的对象。
数据流可以由一种能够表述数据流中元组的域(fields)的模式来定义。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20/**
* 定义数据流的字段名称和顺序
*/
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("word"));
}
/**
* 该方法就是接受到一行字符串,然后按照空格切割成多个单词
* 每个单词都通过 outputCollector 往外发送给下一个处理单元
*/
public void execute(Tuple tuple) {
String line = tuple.getStringByField("line");
String[] words = line.trim().split(" ");
for(String word : words){
this.outputCollector.emit(new Values(word));
}
}
Spouts
- 数据流源头。可以读取kafka队列消息。可以自定义。
- Spout 是主动模式,Spout 继承 BaseRichSpout 或者实现 IRichSpout 接口不断的调用 nextTuple() 函数,然后通过 emit 发送数据流。
- 有
ack
和fail
方法, 处理回调的 ack, fail 事件
Bolts
- bolt 接收 Spout 或者上游的 Bolt 发来的 Tuple(数据流),拓扑中所有的数据处理均是由 Bolt 完成的。通过数据过滤(filter)、函数处理(function)、聚合(aggregations)、联结(joins)、 数据库交互等功能,Bolt 几乎能够完成任何一种数据处理需求。一个 Bolt 可以实现简单的数 据流转换,而更复杂的数据流变换通常需要使用多个 Bolt 并通过多个步骤完成。
- Bolt 是被动模式,Bolt 继承 BaseBasicBolt 类或者实现 IRichBolt 接口等来实现,当 Bolt 接收 Spout 或者上游的 Bolt 发来的 Tuple(数据流)时调用 execute 方法,并对数据流进行处理完, OutputCollector 的 emit 发送数据流,execute 方法在 Bolt 中负责接收数据流和发送数据流。
task
- Bolt中每个Spout或者bolt都是一个task.
Stream Grouping ※
Storm 是通过
Stream Grouping
把spouts
和Bolts
串联起来组成了流数据处理结构。数据流分组定义了在 Bolt 的不 同任务(tasks)中划分数据流的方式。
Storm 对 Stream groupings 定义了 8 种数据流分组方式:
1、Shuffle grouping(随机分组):对 Tuple(数据流)随机的分发到下一个 bolt 的 Tasks(任务), 每个任务同等机会获取 Tuple,保证了集群的负载均衡。
2、Fields grouping(字段分组):对指定的字段对数据流进行分组,相同的字段对应的数据 流都会交给同个 bolt 中 Task 来处理,不同的字段的数据流会分发到不同的 Task 来处理。
3、Partial Key grouping(部分字段分组):按字段进行分组,这种跟 Fields grouping 很相似, 但这种方式会考虑下游 Bolt 数据处理的均衡性问题,会提供更好的资源利用。
4、All grouping(完全分组):Tuple(数据流)会被同时的发送到下一个 bolt 中的所有 Task(任 务)。这种会导致网络传输量很大,慎重使用
5、Global grouping(全局分组):Tuple(数据流)会被发送到下一个 Bolt 的 Id 最小的 Task(任 务)。
6、None grouping(无分组):使用这种方式说明你不关心数据流如何分组。目前这种方式的 结果与随机分组完全等效,不过未来可能会考虑通过非分组方式来让 Bolt 和它所订阅的 Spout 或 Bolt 在同一个线程中执行。
7、Direct grouping(直接分组):通过 OutputCollector emitDirect 方法指定下一个 bolt 的具体 Task 来处理。
8、Local or shuffle grouping(本地或随机分组):如果目标 Bolt 有一个或更多的任务在相同 的 Worker 进程中,Tuple 就发送给这些 Task,否则 Tuple 会被随机分发(跟 Shuffle grouping 一样)。
Storm 架构
Nimbus(灵气)
master节点。
核心组件,运行top。
分析top并收集运行task。分发task给supervisor.
监控top。
无状态,依靠zk监控top的运行状况。Supervisor(监察)
每个supervisor有n个worker进程,负责代理task给worker。
worker在孵化执行线程最终运行task。
storm使用内部消息系统在nimbus和supervisor之间进行通信。接受nimbus指令,管理worker进程完成task派发。
worker
执行特定的task,worker本身不执行任务,而是孵化executors,
让executors执行task。
Storm 工作流程
- nimbus等待提交的top
- 提交top后,nimbus收集task,
- nimbus分发task给所有可用的supervisor
- supervisor周期性发送心跳给nimbus表示自己还活着。
- 如果supervisor挂掉,不会发送心跳给nimubs,nimbus将task发送给其他的supervisor
- nimubs挂掉,super会继续执行自己task。
- task完成后,supervisor等待新的task
- 同时,挂掉的nimbus可以通过监控工具软件自动重启。
Storm 集群安装 & 启动
1 | 1. jdk |
二. 代码实例
1. 编程实现CallLog日志统计
xml 文件
1
2
3
4
5
6
7<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.0.3</version>
</dependency>
</dependencies>代码
打成 jar 包在 centos 上运行
1
2# 在centos上运行top
$> storm jar StormCalllogDemo-1.0-SNAPSHOT.jar com.rox.storm_wordcount.App
2. 使用storm流计算实现wordcount
注意点:
- 放到集群上运行后, 搜集结果不容易
- 加上了一个 Util 工具类.
- 3个阶段 向
cs1
通过socket
经由 不同的 端口port
发送自身消息, 在cs1
上, 用nc
开启监听
- 3个阶段 向
3. 设置top的并发程度和任务
parallelism = (task.count)spout + (task.count)bolt
The total parallelism of the topology can be calculated with the total parallelism = number of spout tasks + number of bolt tasks formula.
配置并发度.
设置worker数据
conf.setNumWorkers(1);设置executors个数
//设置Spout的并发hint (executor个数)
builder.setSpout(“wcspout”, new WordCountSpout(),3);//设置bolt的并发暗示
builder.setBolt(“split-bolt”, new SplitBolt(),4)设置task个数
builder.setSpout(“wcspout”, new WordCountSpout(),3).setNumTasks(3);
//
builder.setBolt(“split-bolt”, new SplitBolt(),4).shuffleGrouping(“wcspout”).setNumTasks(4);并发度 = 所有的task个数的总和。
如果手动设置 task, task 为设置的数量, 不设置则为并发度的数量
- task执行在 executor 上, executor是线程级别的, 一个executor默认是执行一个 task, 如果不手动设置的话, 设置用 setNumTasks(x), 注意这里设置的是当前总 task 的数量.
- executor 执行在 worker 上, worker 是进程级别的, 一个 worker 可以有多个 executor
- worker 由 supervisor 管理, 一个 supervisor 节点可以有多个 worker , 设置用 setNumWorkers(x)
一个 Storm 集群只有一个Nimbus 的 leader。
==> 注意: 这里 cs4,37774 进程中的 一个executor 中启动了2个 task, 因为设置了并行 parallelism_hint = 3.setNumTasks(4), 那么肯定有一个 executor 中会执行2个 task
==> 证实: 设置了多少个 worker, 就有多少个进程, worker 数超过执行节点(supervisor)的数量时, 某一个sp 肯定会开启多个 worker, tasks 数量, 等价于对象的 个数