Flume

1.Flume 概念

提供 收集、移动、聚合大量日志数据的服务。

基于流数据的架构,用于在线日志分析。

基于事件。

在生产和消费者之间启动协调作用。

提供了事务保证,确保消息一定被分发。

多种 Source, Channel, Sink配置方式。

与 Sqoop 区别

  • Sqoop 用来采集关系型数据库数据
  • Flume 用 来采集流动型数据。

关键词解释

  • Client

    • Client 是一个将原始 log 包装成 events 并且发送他们到一个或多个 agent 的实体, 交给 source 处理
  • Event

    • Event 由可选的 header 和载有数据的一个 byte array 构成。
  • Source

    • 接受数据,类型有多种。
  • Channel
    • 临时存放地,对Source中来的数据进行缓冲,直到sink消费掉。
  • Sink
    • 从channel提取数据存放到中央化存储(hadoop / hbase)。
  • Iterator
    • 作用于 Source,按照预设的顺序在必要地方装饰和过滤
  • events Channel Selector
    • 允许 Source 基于预设的标准,从所有 channel 中,选择一个或者多个 channel

A simple Flume agent with one source, channel, and sink

image-20180630223348497

Source ==>Channel

image-20180630222908165

Channel

image-20180630222943933


Flume 的数据流由事件(Event)贯穿始终。事件是 Flume 的基本数据单位,它携带日志数据(字 节数组形式)并且携带有头信息,这些 Event 由 Agent 外部的 Source 生成,当 Source 捕获事 件后会进行特定的格式化,然后 Source 会把事件推入(单个或多个)Channel 中。你可以把 Channel 看作是一个缓冲区,它将保存事件直到 Sink 处理完该事件。Sink 负责持久化日志或 者把事件推向另一个 Source。

Flume 以 agent 为最小的独立运行单位。一个 agent 就是一个 JVM。单 agent 由 Source、Sink 和 Channel 三大组件构成,如下图:

image-20180702100612437

参考文档

2.安装 Flume

1.下载

2.tar

3.环境变量

注意: 配置的话, 使用脚本就方便些, 但是如果配置文件在 flume 文件夹中, 路径就比较长

不配置的话, 要进入到flume 路径下使用, 但是此时配置文件的路径就比较短了

PS: 如果.sh文件没有执行权限, 即x权限, 要用相对路径来启动, 即如果在 bin 目录, 要用./flume-ng, 在 bin 外,要用 bin/flume执行, 有x权限的话, 就可以直接在 bin 下, 使用 flume-ng, 如果配置了环境变量, 可以在任何路径下使用 flume-ng了.

  • Open flume-env.sh file and set the JAVA_Home to the folder where Java was installed in your system.

    1
    export JAVA_HOME=/usr/local/jdk1.8.0_73
  • In the .zshrc file, set FLUME_HOME

    1
    2
    3
    4
    5
    #flume
    export FLUME_HOME=/home/ap/apps/flume
    export PATH=$PATH:$FLUME_HOME/bin
    ---
    source .zshrc

4.验证

` $> flume-ng version`            //next generation.下一代.

3.配置flume准备工作

3.0如果 yum 没有设置网络源的, 设置一下 阿里 | 网易

1
2
3
4
5
6
7
8
9
10
11
12
# 先备份原来的 CentOS-Base.repo 为 CentOS-Base.repo.bak
mv CentOS-Base.repo CentOS-Base.repo.bak

# 下载阿里基本源
$>sudo wget -O /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-7.repo

# 下载阿里epel源
$>sudo wget -O /etc/yum.repos.d/epel.repo http://mirrors.aliyun.com/repo/epel-7.repo

# 生成缓存文件
$>sudo yum clean all
$>sudo yum makecache
3.1 所有机器安装 nc 或者 telnet
1
2
3
$>yum search nc
$> xcall.sh sudo yum install -y nc.x86_64
$> xcall.sh sudo yum install -y telnet
3.2 使用 nc
1
2
3
4
5
6
7
8
9
10
11
12
cs1_1> nc -lk 8888  : 开启8888端口监听
-------
再开一个cs1 session
cs1_2> netstat -anop | grep 8888 : 查看是有有此端口
或者 > lsof -i tcp:8888 : 可以直接看到 pid
tcp 0 0 0.0.0.0:8888 0.0.0.0:* LISTEN 17251/nc off (0.00/0/0)
---------
有的话就可以测试连接了 (客户端连接)
cs1_2> nc localhost 8888

==================
telnet 使用方式类似, 可查看帮助

4. flume配置

官方文档

总体概述

image-20180702101155514

实时处理架构

flume: 监控日志文件

  • 每次产生一条新的数据, 会被实时的收集到下一跳

Kafka: 消息系统

  • 对消息进行简单处理, 当然, 具有简单缓冲作用

Storm / SparkStreaming:

  • 流式的分布式计算引擎

Redis: 缓存组件

  • 内存数据库

Hbase / HDFS /ES …:

  • 最后持久化到这些地方..

启动某个flume task标准写法:

注意: 一定要指定 -c 或者 --conf

1
2
3
$ bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console
# 最好是写当前的相对路径, 下面的是标准写法
$> bin/flume-ng agent -c conf -f conf/a1.conf -n a1 -Dflume.root.logger=INFO,console

4.1 Flume Source

注意: 启动 flume 配置的时候, 后面加上 -Dflume.root.logger=INFO,console

还有一种更详细的 : -Dflume.root.logger=DEBUG,console

打印更详细的日志

4.1.1 netcat

通过消息通信的方式, 进行采集

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
1.创建配置文件
[/soft/flume/conf/helloworld.conf]
#声明三种组件
a1.sources = r1
a1.channels = c1
a1.sinks = k1

#定义source信息
a1.sources.r1.type=netcat
a1.sources.r1.bind=localhost
a1.sources.r1.port=8888

#定义sink信息
a1.sinks.k1.type=logger

#定义channel信息
a1.channels.c1.type=memory

#绑定在一起
ps: 注意: 一个source(源) 可以输出到多个 channels(通道)
一个sink(沉漕)只能从一个 channel(通道)中获取数据

a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

2.运行
a)启动flume agent
$> bin/flume-ng agent -f conf/helloworld.conf -n a1 -Dflume.root.logger=INFO,console
b)启动nc的客户端
$>nc localhost 8888
$nc>hello world

c)在flume的终端输出hello world.

4.1.2 exec

实时日志收集,实时收集日志。

测试:

  1. 先启动监控, 再创建文件, 能否监控到

    结论: 可以监控到

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
1. 配置
# 声明3种组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1

# 定义 source 信息
a1.sources.r1.type=exec
a1.sources.r1.command=tail -F /home/ap/test.txt #收集最后的10行
a1.sources.r1.command=tail -F -c +0 /home/ap/test.txt #从第0行开始收集

# 定义 sink 信息
a1.sinks.k1.type=logger

# 定义 channel 信息
a1.channels.c1.type=memory

# 绑定在一起
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

2> 新建文件 test.txt

3> 启动
[ap@cs1]~% flume-ng agent -f apps/flume/conf/confs/exec.conf -n a1 -Dflume.root.logger=INFO,console

4> 修改 test.txt 文件
$> echo ddd > test.txt 观察 agent 控制台变化

4.1.3 批量收集(spool)

监控一个文件夹,静态文件。

文件最好是直接从外部移入, 文件夹内最好不要做文件编辑。

收集完之后,会重命名文件成新文件。.compeleted.

之后就不会再次处理这个文件了.

只会监控新增加的文件, 不会监控删除的本地文件.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
a) 配置
# 声明3种组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1

# 定义 source 信息
a1.sources.r1.type=spooldir
a1.sources.r1.spoolDir=/home/ap/spool
a1.sources.r1.fileHeader=true

# 定义 sink 信息
a1.sinks.k1.type=logger

# 定义 channel 信息
a1.channels.c1.type=memory

# 绑定在一起
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

b)创建目录
$>mkdir ~/spool

c)启动flume
可以直接启动
$>flume-ng agent -f apps/flume/conf/confs/spool.conf -n a1 -Dflume.root.logger=INFO,console

4.1.4 序列sources (seq)

一个简单的序列生成器, 它连续生成一个计数器, 它从0开始, 增量为 1, 并在 totalEvents 停止。

主要用于测试。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
1) 配置
# 声明3种组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1


# 定义 source 信息
a1.sources.r1.type=seq
a1.sources.r1.totalEvents=1000


# 定义 sink 信息
a1.sinks.k1.type=logger

# 定义 channel 信息
a1.channels.c1.type=memory

# 绑定在一起
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

2) 运行
$>bin/flume-ng agent -f conf/confs/helloworld.seq.conf -n a1 -Dflume.root.logger=INFO,console

4.1.5 压力 source (用于压力测试)

1
2
3
4
5
6
a1.sources = stresssource-1
a1.channels = memoryChannel-1
a1.sources.stresssource-1.type = org.apache.flume.source.StressSource
a1.sources.stresssource-1.size = 10240
a1.sources.stresssource-1.maxTotalEvents = 1000000
a1.sources.stresssource-1.channels = memoryChannel-1

4.1.6 Multiplexing Channel Selector :TODO

flume 多路复用

官网


4.2. Flume Sink

沉漕, source 经过 channel, 最后下沉到 sink, 再由 sink 输出

注意, flume 后面加上 &是指后台运行

官网

4.2.1输出 (sink) 到 HDFS

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
1) 配置
===================================================================
a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 8888

a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H/%M/%S
a1.sinks.k1.hdfs.filePrefix = events-

# round目录 是否会产生新目录,每十分钟产生一个新目录,一般控制的目录方面。
#2017-12-12 -->
#2017-12-12 -->%H%M%S
比如 写上 1 / day, 就是一天产生的数据文件都在 一个日期的目录下.
---
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 1
a1.sinks.k1.hdfs.roundUnit = day

#使用本地时间戳为时间序列头
a1.sinks.k1.hdfs.useLocalTimeStamp=true

#是否产生新文件。
-------------
只要3个条件中某一个满足, 就会滚动一个文件
# roll滚动, 此事件内的日志会写入到一个文件
# 目前的猜测: 10个字节就会触发, 如果10秒内 0 < input < 10字节, 也会滚动 => 是的
# 等待滚动当前的文件 10秒,Number of seconds to wait before rolling current file (0 = never roll based on time interval)
a1.sinks.k1.hdfs.rollInterval=10

# 这是指10个字节就触发滚动 File size to trigger roll, in bytes (0: never roll based on file size)
a1.sinks.k1.hdfs.rollSize=10

# Number of events written to file before it rolled (0 = never roll based on number of events) 一行就是一个事件
a1.sinks.k1.hdfs.rollCount=3

a1.channels.c1.type=memory

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

-----------------
# sink其它可选配置参数(详情可查官网)
agent1.sinks.sink1.type = hdfs
#a1.sinks.k1.channel = c1
agent1.sinks.sink1.hdfs.path =hdfs://myha01/weblog/flume-event/%y-%m-%d/%H-%M agent1.sinks.sink1.hdfs.filePrefix = tomcat_
agent1.sinks.sink1.hdfs.maxOpenFiles = 5000
agent1.sinks.sink1.hdfs.batchSize= 100
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.hdfs.writeFormat =Text
agent1.sinks.sink1.hdfs.rollSize = 102400
agent1.sinks.sink1.hdfs.rollCount = 1000000
agent1.sinks.sink1.hdfs.rollInterval = 60
agent1.sinks.sink1.hdfs.round = true
agent1.sinks.sink1.hdfs.roundValue = 10
agent1.sinks.sink1.hdfs.roundUnit = minute
agent1.sinks.sink1.hdfs.useLocalTimeStamp = true
>>>>>>>>>>>>>>>>>
如果 HDFS 集群是高可用集群,那么必须要放入 core-site.xml 和 hdfs-site.xml 文件到 $FLUME_HOME/conf/confs 目录中, 就是跟配置文件同级目录.
>>>>>>>>>>>>>>>>>

2) 开启agent
===================================================================
[ap@cs1]~/apps/flume/conf/confs% flume-ng agent -f hdfs.conf -n a1

3) nc 连接
===================================================================
nc local 8888
连接上了之后, 发出去的消息就会写入 hdfs

4) 写入时, 完整的 log 是这样的
===================================================================
18/06/30 18:39:18 INFO hdfs.HDFSSequenceFile: writeFormat = Writable, UseRawLocalFileSystem = false
18/06/30 18:39:18 INFO hdfs.BucketWriter: Creating /user/ap/flume/events/18-06-30/18/39/00/events-.1530355158463.tmp
18/06/30 18:39:28 INFO hdfs.BucketWriter: Closing /user/ap/flume/events/18-06-30/18/39/00/events-.1530355158463.tmp
18/06/30 18:39:28 INFO hdfs.BucketWriter: Renaming /user/ap/flume/events/18-06-30/18/39/00/events-.1530355158463.tmp to /user/ap/flume/events/18-06-30/18/39/00/events-.1530355158463
18/06/30 18:39:28 INFO hdfs.HDFSEventSink: Writer callback called.

5) 查看
===================================================================
注意: 序列文件用 text 查看

4.2.2 输出到 Hive

写入太慢, 因为要转为 MR, 所以一般不会用

4.2.3 输出 (sink) 到 HBase

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
3.1) 配置文件
===================================================================
a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 8888

a1.sinks.k1.type = hbase
a1.sinks.k1.table = ns1:t12
a1.sinks.k1.columnFamily = f1
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer

a1.channels.c1.type=memory

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

3.2) 开启 nc 输入
===================================================================

3.3) 在 hbase shell 中 scan 表 'ns1:t12'
===================================================================

4.2.4 输出到 kafka

:TODO

4.3. Flume Channel

4.3.1.Memory Channel

以上用的都是Memory Channel, 不再显示

4.3.2 FileChannel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
1) 配置
===================================================================
a1.sources = r1
a1.sinks= k1
a1.channels = c1

a1.sources.r1.type=netcat
a1.sources.r1.bind=localhost
a1.sources.r1.port=8888

a1.sinks.k1.type=logger

a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /home/centos/flume/fc_check
a1.channels.c1.dataDirs = /home/centos/flume/fc_data

a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

2) 创建的文件大概是这样
===================================================================
$~/flumedata> ls -r fc*
fc_data:
log-1.meta log-1 in_use.lock

fc_check:
queueset in_use.lock inflighttakes inflightputs checkpoint.meta checkpoint

4.3.3 可溢出文件通道

This channel is currently experimental and not recommended for use in production.

1
2
3
4
5
6
7
8
9
10
11
12
[spillable_channel.conf]

a1.channels = c1
a1.channels.c1.type = SPILLABLEMEMORY
#0表示禁用内存通道,等价于文件通道
a1.channels.c1.memoryCapacity = 0
#0,禁用文件通道,等价内存通道。
a1.channels.c1.overflowCapacity = 2000

a1.channels.c1.byteCapacity = 800000
a1.channels.c1.checkpointDir = /user/centos/flume/fc_check
a1.channels.c1.dataDirs = /user/centos/flume/fc_data

4.4 使用AvroSource和AvroSink实现跃点agent处理

image-20180630224236332

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
 [avro_hop.conf]
############ a1 ############
# 声明3种组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1


# 定义 source 信息
a1.sources.r1.type=netcat
a1.sources.r1.bind=localhost
a1.sources.r1.port=8888

# 定义 sink 信息
a1.sinks.k1.type=avro
a1.sinks.k1.hostname=localhost
a1.sinks.k1.port=9999

# 定义 channel 信息
a1.channels.c1.type=memory

# 绑定在一起
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

############ a2 ############
# 声明3种组件
a2.sources=r2
a2.channels=c2
a2.sinks=k2


# 定义 source 信息
# source 是 avro, 绑定端口9999
a2.sources.r2.type=avro
a2.sources.r2.bind=localhost
a2.sources.r2.port=9999


# 定义 sink 信息
# sink 打印到控制台
a2.sinks.k2.type=logger

# 定义 channel 信息
a2.channels.c2.type=memory

# 绑定在一起
a2.sources.r2.channels=c2
a2.sinks.k2.channel=c2

2.启动a2
===================================================================
$>flume-ng agent -f /soft/flume/conf/avro_hop.conf -n a2 -Dflume.root.logger=INFO,console

3.验证a2
===================================================================
$>netstat -anop | grep 9999

4.启动a1
===================================================================
$>flume-ng agent -f /soft/flume/conf/avro_hop.conf -n a1

5.验证a1
===================================================================
$>netstat -anop | grep 8888

6.nc 连接a1 & 发消息
===================================================================
$>nc localhost 8888

4.5 Flume 的高可用配置

  • 是通过配置优先级来实现主从, 优先级是组内的优先级, 需要先设置为一组

  • 优先级高的 down 掉了, 再启动起来, 依然是主 ==> 已验证

  • 1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    ==============================
    --设置优先级代配置如下----------
    #set sink group 设置为一组
    a1.sinkgroups.g1.sinks = k1 k2

    #set failover 设置容灾相关的参数
    a1.sinkgroups.g1.processor.type = failover
    # 设置 组内成员的优先级, 决定 主备
    a1.sinkgroups.g1.processor.priority.k1 = 10
    a1.sinkgroups.g1.processor.priority.k2 = 1
    a1.sinkgroups.g1.processor.maxpenalty = 10000

  • 以下配置方案与上图不同的, 用的是1个 channel, 2个`sink`
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
1. 配置 agent ==> cs1, cs2
================================================================================================
[ha_agent.conf]
------------------

#agent name: a1
a1.channels = c1
a1.sources = r1
a1.sinks = k1 k2

#set gruop
a1.sinkgroups = g1

#set channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# set sources
a1.sources.r1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/ap/flumedata/testha.log

# 这里有疑问 : 这里设置拦截器的意义?? 随便设置的吗?? :TODO
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = Type
a1.sources.r1.interceptors.i1.value = LOGIN
a1.sources.r1.interceptors.i2.type = timestamp

# set sink1 -> 通过 avro 沉到 cs3的 source 上
a1.sinks.k1.channel = c1
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = cs3
a1.sinks.k1.port = 52020

# set sink2 -> 通过 avro 沉到 cs4的 source 上
a1.sinks.k2.channel = c1
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = cs4
a1.sinks.k2.port = 52020

#set sink group 设置为一组
a1.sinkgroups.g1.sinks = k1 k2

#set failover 设置容灾相关的参数
a1.sinkgroups.g1.processor.type = failover
# 设置 组内成员的优先级, 决定 主备
a1.sinkgroups.g1.processor.priority.k1 = 10
a1.sinkgroups.g1.processor.priority.k2 = 1
a1.sinkgroups.g1.processor.maxpenalty = 10000


2. 配置 collector ==> cs3,cs4
================================================================================================
[ha_collector.conf]

#set agent name
a1.sources = r1
a1.channels = c1
a1.sinks = k1

#set channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# other node,nna to nns
a1.sources.r1.type = avro

# :TODO 这些 key, value 设置了之后, 用在什么地方???
## 当前主机是什么,就修改成什么主机名
a1.sources.r1.bind = cs3
a1.sources.r1.port = 52020
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = Collector
## 当前主机是什么,就修改成什么主机名
a1.sources.r1.interceptors.i1.value = cs3
a1.sources.r1.channels = c1

#set sink to hdfs
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path=/user/ap/flume/event_ha/loghdfs
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.writeFormat=TEXT
a1.sinks.k1.hdfs.rollInterval=10
a1.sinks.k1.channel=c1
a1.sinks.k1.hdfs.filePrefix=%Y-%m-%d



3. 启动
================================================================================================
# 先启动 cs3,cs4 ==> collector
$> [ap@cs4]~/apps/flume/conf/confs%
flume-ng agent -f ha_collector.conf -n a1 -Dflume.root.logger=DEBUG,console

# 再启动 cs1, cs2 ==> agent, 连接 collector
[ap@cs1]~/apps/flume/conf/confs%
flume-ng agent -f ha_agent.conf -n a1 -Dflume.root.logger=DEBUG,console

4.6 实用业务场景

知识点: 拦截器 interceptors

4.6.1 需求:

A、B 两台日志服务机器实时生产日志主要类型为 access.log、nginx.log、web.log 现在要求:

把 A、B 机器中的 access.log、nginx.log、web.log 采集汇总到 C 机器上然后统一收集到 hdfs 中。

但是在 hdfs 中要求的目录为:

/source/logs/access/20160101/**

/source/logs/nginx/20160101/**

/source/logs/web/20160101/**

4.6.2 需求分析

需求分析图示

image-20180702171339961

数据处理流程分析

  • 每一台节点上都要 搜集 3个 source 源
  • 搜集完成后发往同一台节点

image-20180702171443885

4.6.3 需求实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
1.在cs1,cs2上配置flume, 通过 avro, sink到cs3
=============================================================
[exec_source_avro_sink.conf]
------------------------------
# 指定各个核心组件
a1.sources = r1 r2 r3
a1.sinks = k1
a1.channels = c1

# 准备数据源
# static 拦截器的功能就是往采集到的数据的 header 中插入自己定义的 key-value 对
# sources->r1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F -c +0 /home/ap/flumedata/access.log
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = type
a1.sources.r1.interceptors.i1.value = access

# sources->r2
a1.sources.r2.type = exec
a1.sources.r2.command = tail -F -c +0 /home/ap/flumedata/nginx.log
a1.sources.r2.interceptors = i2
a1.sources.r2.interceptors.i2.type = static
a1.sources.r2.interceptors.i2.key = type
a1.sources.r2.interceptors.i2.value = nginx

# sources->r3
a1.sources.r3.type = exec
a1.sources.r3.command = tail -F -c +0 /home/ap/flumedata/web.log
a1.sources.r3.interceptors = i3
a1.sources.r3.interceptors.i3.type = static
a1.sources.r3.interceptors.i3.key = type
a1.sources.r3.interceptors.i3.value = web


# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = cs3
a1.sinks.k1.port = 41414

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 20000
a1.channels.c1.transactionCapacity = 10000

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sources.r2.channels = c1
a1.sources.r3.channels = c1
a1.sinks.k1.channel = c1

2. 在 cs3上配置 flume, source 为 avro, 接受来自 cs1,cs2下沉的数据
======================================================================
[avro_source_hdfs_sink.conf]
-------------------------------
#定义 agent 名, source、channel、sink 的名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#定义 source
a1.sources.r1.type = avro
a1.sources.r1.bind = localhost
a1.sources.r1.port =41414

#添加时间拦截器
#Note:For all of the time related escape sequences, a header with the key “timestamp” must #exist among the headers of the event (unless hdfs.useLocalTimeStamp is set to true). One way #to add this automatically is to use the TimestampInterceptor.
a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type=org.apache.flume.interceptor.TimestampInterceptor$Builder

#定义 channels
a1.channels.c1.type = memory
#The maximum number of events stored in the channel
a1.channels.c1.capacity = 20000
#The maximum number of events the channel will take from a source or give to a sink per transaction
a1.channels.c1.transactionCapacity = 10000

#定义 sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /user/ap/flume/event/%{type}/%Y%m%d
a1.sinks.k1.hdfs.filePrefix = events
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text

#时间类型
a1.sinks.k1.hdfs.useLocalTimeStamp = true

#生成的文件不按条数生成, 否则就是按照设定的 event产生的, 1行就是一个 event
#Number of events written to file before it rolled (0 = never roll based on number of events)
a1.sinks.k1.hdfs.rollCount = 0

#生成的文件按时间生成,单位是秒
#Number of seconds to wait before rolling current file (0 = never roll based on time interval)
a1.sinks.k1.hdfs.rollInterval = 30

#生成的文件按大小生成, 单位是 bytes
#1 MB = 1,024 KB = 1,048,576 Bytes
#File size to trigger roll, in bytes (0: never roll based on file size)
a1.sinks.k1.hdfs.rollSize = 10485760 #这里是10MB

#批量写入 hdfs 的个数
a1.sinks.k1.hdfs.batchSize = 20

#flume 操作 hdfs 的线程数(包括新建,写入等)
a1.sinks.k1.hdfs.threadsPoolSize=10

#操作 hdfs 超时时间
a1.sinks.k1.hdfs.callTimeout=30000

#组装 source、channel、sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

3.启动服务
============================================================================
# 启动cs3 => 服务器 c
[ap@cs3]~/apps/flume/conf/confs% flume-ng agent -f avro_source_hdfs_sink.conf -n a1 -Dflume.root.logger=DEBUG,console

# 启动cs1, cs2 => 服务器 a, b
# 注意: 千万别启错了, 否则很难找到原因..
[ap@cs2]~/apps/flume/conf/confs% flume-ng agent -f exec_source_avro_sink.conf -n a1 -Dflume.root.logger=DEBUG,console

==============
遇到问题:
这种通过avro多点 sink 到一个 source 的时候, 实现不创建文件夹/文件的, 好像确实写入不进去...
===> md, 原因应该是启动错配置了

5.Flume的Maven依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.rox</groupId>
<artifactId>Flume_test</artifactId>
<version>1.0-SNAPSHOT</version>

<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.flume.flume-ng-sinks</groupId>
<artifactId>flume-hdfs-sink</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.flume.flume-ng-sinks</groupId>
<artifactId>flume-ng-hbase-sink</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.flume.flume-ng-channels</groupId>
<artifactId>flume-file-channel</artifactId>
<version>1.6.0</version>
</dependency>
</dependencies>
</project>
如果帮到你, 可以给我赞助杯咖啡☕️
0%