1.Flume 概念
提供 收集、移动、聚合大量日志数据的服务。
基于流数据的架构,用于在线日志分析。
基于事件。
在生产和消费者之间启动协调作用。
提供了事务保证,确保消息一定被分发。
多种 Source
, Channel
, Sink
配置方式。
与 Sqoop 区别
- Sqoop 用来采集关系型数据库数据
- Flume 用 来采集流动型数据。
关键词解释
Client
- Client 是一个将原始 log 包装成 events 并且发送他们到一个或多个 agent 的实体, 交给 source 处理
Event
- Event 由可选的 header 和载有数据的一个 byte array 构成。
Source
- 接受数据,类型有多种。
- Channel
- 临时存放地,对Source中来的数据进行缓冲,直到sink消费掉。
- Sink
- 从channel提取数据存放到中央化存储(hadoop / hbase)。
- Iterator
- 作用于 Source,按照预设的顺序在必要地方装饰和过滤
- events Channel Selector
- 允许 Source 基于预设的标准,从所有 channel 中,选择一个或者多个 channel
A simple Flume agent with one source, channel, and sink
Source ==>Channel
Channel
Flume 的数据流由事件(Event)贯穿始终。事件是 Flume 的基本数据单位,它携带日志数据(字 节数组形式)并且携带有头信息,这些 Event 由 Agent 外部的 Source 生成,当 Source 捕获事 件后会进行特定的格式化,然后 Source 会把事件推入(单个或多个)Channel 中。你可以把 Channel 看作是一个缓冲区,它将保存事件直到 Sink 处理完该事件。Sink 负责持久化日志或 者把事件推向另一个 Source。
Flume 以 agent 为最小的独立运行单位。一个 agent 就是一个 JVM。单 agent 由 Source、Sink 和 Channel 三大组件构成,如下图:
2.安装 Flume
1.下载
2.tar
3.环境变量
注意: 配置的话, 使用脚本就方便些, 但是如果配置文件在 flume 文件夹中, 路径就比较长
不配置的话, 要进入到flume 路径下使用, 但是此时配置文件的路径就比较短了
PS: 如果.sh
文件没有执行权限, 即x
权限, 要用相对路径来启动, 即如果在 bin 目录, 要用./flume-ng
, 在 bin 外,要用 bin/flume
执行, 有x
权限的话, 就可以直接在 bin 下, 使用 flume-ng
, 如果配置了环境变量, 可以在任何路径下使用 flume-ng
了.
Open
flume-env.sh
file and set theJAVA_Home
to the folder where Java was installed in your system.1
export JAVA_HOME=/usr/local/jdk1.8.0_73
In the
.zshrc
file, set FLUME_HOME1
2
3
4
5#flume
export FLUME_HOME=/home/ap/apps/flume
export PATH=$PATH:$FLUME_HOME/bin
---
source .zshrc
4.验证
` $> flume-ng version` //next generation.下一代.
3.配置flume准备工作
3.0如果 yum 没有设置网络源的, 设置一下 阿里 | 网易
1 | # 先备份原来的 CentOS-Base.repo 为 CentOS-Base.repo.bak |
3.1 所有机器安装 nc 或者 telnet
1 | $>yum search nc |
3.2 使用 nc
1 | cs1_1> nc -lk 8888 : 开启8888端口监听 |
4. flume配置
总体概述
实时处理架构
flume: 监控日志文件
- 每次产生一条新的数据, 会被实时的收集到下一跳
Kafka: 消息系统
- 对消息进行简单处理, 当然, 具有简单缓冲作用
Storm / SparkStreaming:
- 流式的分布式计算引擎
Redis: 缓存组件
- 内存数据库
Hbase / HDFS /ES …:
- 最后持久化到这些地方..
启动某个flume task标准写法:
注意: 一定要指定 -c
或者 --conf
1 | $ bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console |
4.1 Flume Source
注意: 启动 flume 配置的时候, 后面加上 -Dflume.root.logger=INFO,console
还有一种更详细的 : -Dflume.root.logger=DEBUG,console
会打印更详细的日志
4.1.1 netcat
通过消息通信的方式, 进行采集
1 | 1.创建配置文件 |
4.1.2 exec
实时日志收集,实时收集日志。
测试:
先启动监控, 再创建文件, 能否监控到
结论: 可以监控到
1 | 1. 配置 |
4.1.3 批量收集(spool)
监控一个文件夹,静态文件。
文件最好是直接从外部移入, 文件夹内最好不要做文件编辑。
收集完之后,会重命名文件成新文件。.compeleted.
之后就不会再次处理这个文件了.
只会监控新增加的文件, 不会监控删除的本地文件.
1 | a) 配置 |
4.1.4 序列sources (seq)
一个简单的序列生成器, 它连续生成一个计数器, 它从0开始, 增量为 1, 并在 totalEvents 停止。
主要用于测试。
1 | 1) 配置 |
4.1.5 压力 source (用于压力测试)
1 | a1.sources = stresssource-1 |
4.1.6 Multiplexing Channel Selector :TODO
flume 多路复用
4.2. Flume Sink
沉漕, source 经过 channel, 最后下沉到 sink, 再由 sink 输出
注意, flume 后面加上 &
是指后台运行
4.2.1输出 (sink) 到 HDFS
1 | 1) 配置 |
4.2.2 输出到 Hive
写入太慢, 因为要转为 MR, 所以一般不会用
4.2.3 输出 (sink) 到 HBase
1 | 3.1) 配置文件 |
4.2.4 输出到 kafka
:TODO
4.3. Flume Channel
4.3.1.Memory Channel
以上用的都是Memory Channel, 不再显示
4.3.2 FileChannel
1 | 1) 配置 |
4.3.3 可溢出文件通道
This channel is currently experimental and not recommended for use in production.
1 | [spillable_channel.conf] |
4.4 使用AvroSource和AvroSink实现跃点agent处理
1 | [avro_hop.conf] |
4.5 Flume 的高可用配置
是通过配置优先级来实现主从, 优先级是组内的优先级, 需要先设置为一组
优先级高的 down 掉了, 再启动起来, 依然是主 ==> 已验证
1
2
3
4
5
6
7
8
9
10
11==============================
--设置优先级代配置如下----------
#set sink group 设置为一组
a1.sinkgroups.g1.sinks = k1 k2
#set failover 设置容灾相关的参数
a1.sinkgroups.g1.processor.type = failover
# 设置 组内成员的优先级, 决定 主备
a1.sinkgroups.g1.processor.priority.k1 = 10
a1.sinkgroups.g1.processor.priority.k2 = 1
a1.sinkgroups.g1.processor.maxpenalty = 10000
- 以下配置方案与上图不同的, 用的是1个
channel
, 2个`sink`
1 | 1. 配置 agent ==> cs1, cs2 |
4.6 实用业务场景
知识点: 拦截器 interceptors
4.6.1 需求:
A、B 两台日志服务机器实时生产日志主要类型为 access.log、nginx.log、web.log 现在要求:
把 A、B 机器中的 access.log、nginx.log、web.log 采集汇总到 C 机器上然后统一收集到 hdfs 中。
但是在 hdfs 中要求的目录为:
/source/logs/access/20160101/**
/source/logs/nginx/20160101/**
/source/logs/web/20160101/**
4.6.2 需求分析
需求分析图示
数据处理流程分析
- 在每一台节点上都要 搜集 3个 source 源
- 搜集完成后发往同一台节点
4.6.3 需求实现
1 | 1.在cs1,cs2上配置flume, 通过 avro, sink到cs3 |
5.Flume的Maven依赖
1 | <?xml version="1.0" encoding="UTF-8"?> |