i-Storm

一. Storm 概述

image-20180802004158022

官网:

下载:

文档:

参考 PDF:

免费、开源、分布式、实时计算系统。

吞吐量高。

每秒每节点百万元组。

image-20180802004403219

Storm & Hadoop 对比

storm hadoop
实时流处理 批处理
无状态 有状态
使用zk协同的主从架构 无zk的主从架构。
每秒处理数万消息 HDFS MR数分钟、数小时
不会主动停止 终有完成的时候。

核心概念

Topology

  • Spout + bolt连接在一起形成一个top拓扑,形成有向图,定点就是计算,边是数据流。
  • Storm 的拓扑是对实时计算应用 逻辑的封装,它的作用与 MapReduce 的任务(Job)很相似,区别在于 MapReduce 的一个 Job 在得到结果之后总会结束,而拓扑会一直在集群中运行

Tuple

  • 主要的数据结构,有序元素的列表。

Stream

  • Streams 是 storm 最核心的抽象概念,一个 Stream 是分布式环境中并行创建和处理的一个没有边界的 tuple 序列,Streams 是由 Tuple(元组)组成的,Tuple 支持的类型有 Integer、Long、 Short、Byte、String、Double、Float、Boolean、Byte Arrays

    当然,Tuple 也支持可序列化的对象。

    数据流可以由一种能够表述数据流中元组的域(fields)的模式来定义。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    /**
    * 定义数据流的字段名称和顺序
    */
    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    outputFieldsDeclarer.declare(new Fields("word"));
    }

    /**
    * 该方法就是接受到一行字符串,然后按照空格切割成多个单词
    * 每个单词都通过 outputCollector 往外发送给下一个处理单元
    */
    @Override
    public void execute(Tuple tuple) {
    String line = tuple.getStringByField("line");
    String[] words = line.trim().split(" ");
    for(String word : words){
    this.outputCollector.emit(new Values(word));
    }
    }

Spouts

  • 数据流源头。可以读取kafka队列消息。可以自定义。
  • Spout 是主动模式,Spout 继承 BaseRichSpout 或者实现 IRichSpout 接口不断的调用 nextTuple() 函数,然后通过 emit 发送数据流。
  • ackfail方法, 处理回调的 ack, fail 事件

Bolts

  • bolt 接收 Spout 或者上游的 Bolt 发来的 Tuple(数据流),拓扑中所有的数据处理均是由 Bolt 完成的。通过数据过滤(filter)、函数处理(function)、聚合(aggregations)、联结(joins)、 数据库交互等功能,Bolt 几乎能够完成任何一种数据处理需求。一个 Bolt 可以实现简单的数 据流转换,而更复杂的数据流变换通常需要使用多个 Bolt 并通过多个步骤完成。
  • Bolt 是被动模式,Bolt 继承 BaseBasicBolt 类或者实现 IRichBolt 接口等来实现,当 Bolt 接收 Spout 或者上游的 Bolt 发来的 Tuple(数据流)时调用 execute 方法,并对数据流进行处理完, OutputCollector 的 emit 发送数据流,execute 方法在 Bolt 中负责接收数据流和发送数据流。

task

  • Bolt中每个Spout或者bolt都是一个task.

Stream Grouping ※

  • Storm 是通过 Stream GroupingspoutsBolts 串联起来组成了流数据处理结构。

  • 数据流分组定义了在 Bolt 的不 同任务(tasks)中划分数据流的方式。

  • Storm 对 Stream groupings 定义了 8 种数据流分组方式

  • 1、Shuffle grouping(随机分组):对 Tuple(数据流)随机的分发到下一个 bolt 的 Tasks(任务), 每个任务同等机会获取 Tuple,保证了集群的负载均衡。

    2、Fields grouping(字段分组):对指定的字段对数据流进行分组,相同的字段对应的数据 流都会交给同个 bolt 中 Task 来处理,不同的字段的数据流会分发到不同的 Task 来处理。

    3、Partial Key grouping(部分字段分组):按字段进行分组,这种跟 Fields grouping 很相似, 但这种方式会考虑下游 Bolt 数据处理的均衡性问题,会提供更好的资源利用。

    4、All grouping(完全分组):Tuple(数据流)会被同时的发送到下一个 bolt 中的所有 Task(任 务)。这种会导致网络传输量很大,慎重使用

    5、Global grouping(全局分组):Tuple(数据流)会被发送到下一个 Bolt 的 Id 最小的 Task(任 务)。

    6、None grouping(无分组):使用这种方式说明你不关心数据流如何分组。目前这种方式的 结果与随机分组完全等效,不过未来可能会考虑通过非分组方式来让 Bolt 和它所订阅的 Spout 或 Bolt 在同一个线程中执行。

    7、Direct grouping(直接分组):通过 OutputCollector emitDirect 方法指定下一个 bolt 的具体 Task 来处理。

    8、Local or shuffle grouping(本地或随机分组):如果目标 Bolt 有一个或更多的任务在相同 的 Worker 进程中,Tuple 就发送给这些 Task,否则 Tuple 会被随机分发(跟 Shuffle grouping 一样)。

Storm 架构

  1. Nimbus(灵气)
    master节点。
    核心组件,运行top。
    分析top并收集运行task。分发task给supervisor.
    监控top。
    无状态,依靠zk监控top的运行状况。

  2. Supervisor(监察)
    每个supervisor有n个worker进程,负责代理task给worker。
    worker在孵化执行线程最终运行task。
    storm使用内部消息系统在nimbus和supervisor之间进行通信。

    接受nimbus指令,管理worker进程完成task派发。

  3. worker
    执行特定的task,worker本身不执行任务,而是孵化executors,
    让executors执行task。

Storm 工作流程

  1. nimbus等待提交的top
  2. 提交top后,nimbus收集task,
  3. nimbus分发task给所有可用的supervisor
  4. supervisor周期性发送心跳给nimbus表示自己还活着。
  5. 如果supervisor挂掉,不会发送心跳给nimubs,nimbus将task发送给其他的supervisor
  6. nimubs挂掉,super会继续执行自己task。
  7. task完成后,supervisor等待新的task
  8. 同时,挂掉的nimbus可以通过监控工具软件自动重启。

Storm 集群安装 & 启动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
1. jdk
2. tar
3. 环境变量
4. 验证安装
$> source ~/.zshrc
$> storm version
5. 分发安装文件到其他节点。

6. 配置[storm/conf/storm.yaml]

storm.local.dir: "/home/ap/storm"
storm.zookeeper.servers:
- "cs1"
- "cs2"
- "cs2"
### nimbus.* configs are for the master
nimbus.seeds : ["cs1","cs6"]

7. 分发

8. 启动进程
1、首先在 cs1 和 cs6 机器上启动 nimbus:
nohup $STORM_HOME/bin/storm nimbus 1>~/logs/storm-nibus.log 2>&1 &

2、然后在 cs1 和 cs6 节点上启动 Storm UI:
nohup $STORM_HOME/bin/storm ui 1>~/logs/storm-ui.log 2>&1 &

3、然后在每一个 supervisor 节点(cs2,3,4)上启动 supervisor 进程:
nohup $STORM_HOME/bin/stor
m supervisor 1>~/logs/storm-supervisor.log 2>&1 &

9.通过webui查看
http://cs1:8080/
http://cs6:8080/


10. 启动 logviewer (可选)
supervisor 节点:
$> storm logviewer &
点击 cs1:8080 中的 host 名字, 就会进到对应的 logger 中

二. 代码实例

1. 编程实现CallLog日志统计

  • xml 文件

    1
    2
    3
    4
    5
    6
    7
    <dependencies>
    <dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>1.0.3</version>
    </dependency>
    </dependencies>
  • 代码

  • 打成 jar 包在 centos 上运行

    1
    2
    # 在centos上运行top
    $> storm jar StormCalllogDemo-1.0-SNAPSHOT.jar com.rox.storm_wordcount.App

2. 使用storm流计算实现wordcount

代码见我的 github

注意点:

  • 放到集群上运行后, 搜集结果不容易
  • 加上了一个 Util 工具类.
    • 3个阶段 向 cs1 通过 socket 经由 不同的 端口port 发送自身消息, 在cs1上, 用nc 开启监听

3. 设置top的并发程度和任务

parallelism = (task.count)spout + (task.count)bolt

The total parallelism of the topology can be calculated with the total parallelism = number of spout tasks + number of bolt tasks formula.

配置并发度.

  1. 设置worker数据
    conf.setNumWorkers(1);

  2. 设置executors个数
    //设置Spout的并发hint (executor个数)
    builder.setSpout(“wcspout”, new WordCountSpout(),3);

    //设置bolt的并发暗示
    builder.setBolt(“split-bolt”, new SplitBolt(),4)

  3. 设置task个数
    builder.setSpout(“wcspout”, new WordCountSpout(),3).setNumTasks(3);
    //
    builder.setBolt(“split-bolt”, new SplitBolt(),4).shuffleGrouping(“wcspout”).setNumTasks(4);

  4. 并发度 = 所有的task个数的总和。
    如果手动设置 task, task 为设置的数量, 不设置则为并发度的数量

  • task执行在 executor 上, executor是线程级别的, 一个executor默认是执行一个 task, 如果不手动设置的话, 设置用 setNumTasks(x), 注意这里设置的是当前总 task 的数量.
  • executor 执行在 worker 上, worker 是进程级别的, 一个 worker 可以有多个 executor
  • worker 由 supervisor 管理, 一个 supervisor 节点可以有多个 worker , 设置用 setNumWorkers(x)

一个 Storm 集群只有一个Nimbus 的 leader。

image-20180802013109948

==> 注意: 这里 cs4,37774 进程中的 一个executor 中启动了2个 task, 因为设置了并行 parallelism_hint = 3.setNumTasks(4), 那么肯定有一个 executor 中会执行2个 task

==> 证实: 设置了多少个 worker, 就有多少个进程, worker 数超过执行节点(supervisor)的数量时, 某一个sp 肯定会开启多个 worker, tasks 数量, 等价于对象的 个数

如果帮到你, 可以给我赞助杯咖啡☕️
0%