Kafka

1.概览

分布式流处理平台。

分布式, 副本, 容错的分布式存储流.

在系统之间构建实时数据流管道。

以topic分类对记录进行存储

每个记录包含key-value+timestamp

每秒钟百万消息吞吐量。

Java 的 JMS 有2种模式: 发布订阅模式( 拿到的是 copy) & 队列模式(queue):

kafka 通过 group 把这两种模式完美的结合起来

  • 如果想要实现queue 模式, 就把消费者放在同一组, 此组中只能有一个成员来消费此消息
  • 想要实现发布订阅模式, 就把每个消费者各自一组, 每个人都可以拿到一个消息副本

关键词:

  • producer //消息生产者
  • consumer //消息消费者
  • consumer group //消费者组
  • kafka server //broker,kafka服务器
  • topic //主题,副本数,分区.
  • zookeeper //hadoop namenoade + RM HA | hbase | kafka

优化方案:

  • 把磁盘挂载在某个目录, 然后让这个目录, 只是存储某个分区的数据

kafka 的要点

一条消息是存在一个分区的

副本–可靠性

分区–高效性 => 并发访问

分区(partition)是 producer 通过负载均衡选择的

一个分区存着多条消息, 消息是有序的

一个分区内的消息只能被一个组内的一个成员消费

topic 是在 kafka server 上

topic 包含 副本数, 分区

producer & consumer 都是针对 topic 来操作的

kafka 的 consumer 是拉模式, 从 kafka去pull 消息, 实时计算框架有多大能力, 就 pull 多少

所以 kafka 一般都用在实时/准实时 计算中

image-20180703112535633

image-20180703112545937

顺序读写

  • 追加数据, 是追加到最后
  • 读取是从开头读取
  • 可用 offset 跳转到指定的数据段
  • 数据可重复消费

Consumers

  • 一条数据只能被同组内一个成员消费
  • 不同组的可以消费同一个数据

image-20180703112634857

SSD读写速度: 1200 ~ 3500 MB/s

image-20180703112302238

注意点:

  • Kafka 的分区数, 可以动态调整
  • Kafka 的每个分区都有对应的冗余数量, 默认是1
  • 每个分区如果有多个副本, 这些副本中, 会有一个是 active 的状态, 负责读写, 其它的都是 standby 的状态
  • 对于HDFS和kafka的 block partition 对于不同的文件或者topic来说,都可以有不同的副本数!(每个 topic 可以单独设置)
    设置的副本数不能超过broker的数量!!!

Kafka 的大致工作模式:

1、启动 ZooKeeper 的 server

2、启动 Kafka 的 server

3、Producer 生产数据,然后通过 ZooKeeper 找到 Broker,再将数据 push 到 Broker 保存

4、Consumer 通过 ZooKeeper 找到 Broker,然后再主动 pull 数据

2.安装 kafka

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
0.选择cs1 ~ cs6,  6台主机都安装kafka
---------------------------------------------------------------------
1.准备zk

---------------------------------------------------------------------
2.jdk

---------------------------------------------------------------------
3.tar文件
---------------------------------------------------------------------
4.环境变量
---------------------------------------------------------------------

5.配置kafka
---------------------------------------------------------------------
[kafka/config/server.properties]
...
# 这里每台不一样
broker.id=1
...
listeners=PLAINTEXT://:9092
...
# 这里每台不一样
advertised.listeners=PLAINTEXT://cs1:9092
host.name=cs1
...
log.dirs=/home/centos/kafka/logs
...
zookeeper.connect=s201:2181,s202:2181,s203:2181

6.分发整个文件 & 符号链接, .zshrc配置文件, 同时修改每个server.properties 文件的broker.id参数
---------------------------------------------------------------------

7.启动kafka服务器
---------------------------------------------------------------------
a)先启动zk
b)启动kafka
b1)前台启动
[cs2 ~ cs4]
[ap@cs1]~% kafka-server-start.sh /home/ap/apps/kafka/config/server.properties

b2)后台启动
# 这里日志直接打印在控制台
$> kafka-server-start.sh -daemon /home/ap/apps/kafka/config/server.properties
[或者]
# 这样能把日志输出到文件中
nohup kafka-server-start.sh /home/ap/apps/kafka/config/server.properties 1><sub>/kafka/logs/kafka_std.log 2></sub>/kafka/logs/kafka_err.log &

c)验证kafka服务器是否启动
$>netstat -anop | grep 9092

8.创建主题
---------------------------------------------------------------------
[ap@cs2]~% kafka-topics.sh --create --zookeeper cs1:2181 --replication-factor 3 --partitions 3 --topic test

9.查看主题列表
---------------------------------------------------------------------
[ap@cs2]~% kafka-topics.sh --list --zookeeper cs1:2181

10.启动控制台生产者 (前提是 操作本机 & cs2 上已经启动kafka服务)
---------------------------------------------------------------------
注意 : producer 不直接连接 zk, 而是连接 broker, 也就是 kafka server.
其它的都是连 zk
如果producer 连的某一台挂掉的话, 可以在 --broker-list 后面, 加上多台服务器,用逗号隔开
[ap@cs2]~% kafka-console-producer.sh --broker-list cs2:9092 --topic test

11.启动控制台消费者 (前提是 操作本机 & cs2 上已经启动卡夫卡服务)
---------------------------------------------------------------------
# 使用bootstrap-server参数 替代zookeeper
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
[ap@cs3]~% kafka-console-consumer.sh --bootstrap-server cs2:9092 --topic test --from-beginning

12.在生产者控制台输入hello world
---------------------------------------------------------------------

3.kafka集群在zk的配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/controller            ===>    {"version":1,"brokerid":202,"timestamp":"1490926369148"

/controller_epoch ===> 1

/brokers
/brokers/ids
/brokers/ids/202 ===> {"jmx_port":-1,"timestamp":"1490926370304","endpoints":["PLAINTEXT://s202:9092"],"host":"s202","version":3,"port":9092}
/brokers/ids/203
/brokers/ids/204


/brokers/topics/test/partitions/0/state ===>{"controller_epoch":1,"leader":203,"version":1,"leader_epoch":0,"isr":[203,204,202]}
/brokers/topics/test/partitions/1/state ===>...
/brokers/topics/test/partitions/2/state ===>...

/brokers/seqid ===> null

/admin
/admin/delete_topics/test ===>标记删除的主题, 但是实际删除后, 这里没出现

/isr_change_notification

/consumers/xxxx/
/config

4.kafka 主题&副本操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
---------------------------------------------------------------------
PS: 查看帮助套路
* 如果看 topic 相关帮助
* bin/kafka-topic.sh 脚本直接回车, 查看帮助
---------------------------------------------------------------------

查看主题列表
---------------------------------------------------------------------
[ap@cs2]~% kafka-topics.sqqh --list --zookeeper cs1:2181


创建主题
---------------------------------------------------------
repliation_factor 2 partitions 5

$>kafka-topics.sh --zookeeper cs2:2181 --replication-factor 2 --partitions 3 --create --topic test2

2 x 3 = 6 //个文件夹

2份副本, 每份副本有3个分区
总分区数为 6
每个分区都有一个 leader, follower 就是其它的副本



删除主题
------------------------------------------------------------
[ap@cs2]~% kafka-topics.sh --zookeeper cs1:2181,cs2:2181,cs3:2181 --delete --topic kafka_test1

查看主题描述
--------------------------------------------------------
[ap@cs2]~% kafka-topics.sh --zookeeper cs1:2181,cs2:2181,cs3:2181 --describe --topic kafka_test1

增加/删除分区数
--------------------------------------------------------
kafka-topics.sh \
--alter \
--zookeeper cs1:2181,cs2:2181,cs3:2181 \
--topic kafka_test1 \
--partitions 20
--------------
kafka-topics.sh \
--alter \
--zookeeper hadoop02:2181,hadoop03:2181,hadoop04:2181 \ --topic kafka_test \
--replication-factor 2


查看某 topic 某个分区的偏移量最大值和最小值 :TODO 啥意思??
--------------------------------------------------------
kafka-run-class.sh kafka.tools.GetOffsetShell --topic kafka_test1 --time -1 --broker-list cs1:9092,cs2:9092,cs3:9092 --partitions 1


重新布局分区和副本,手动再平衡 :TODO 还未测试
------------------------------------------------------------
alter 似乎是有问题的, create好像没问题
此句是手动把分区放到 203 & 204上了
$>kafka-topics.sh --alter --zookeeper s202:2181 --topic test2 --replica-assignment 203:204,203:204,203:204,203:204,203:204


测试 producer 发消息后, 存在本地的logs 目录的文件
--------------------------------------------------------------
1. 开启一个producer, 发消息
2. 查看本地 cd ~/kafka/logs
3. 查看文件大小, 找出文件(一次性查看)
1. [ap@cs1]~% xcall.sh "ls -lh kafka/logs/test2-*"



副本
--------------------------------------------------------------
broker存放消息以消息达到顺序存放。生产和消费都是副本感知的。
支持到n-1故障。每个分区都有leader,follow.
leader挂掉时,消息分区写入到本地log或者,向生产者发送消息确认回执之前,生产者向新的leader发送消息。

新leader的选举是通过isr进行,第一个注册的follower成为leader。



## kafka支持副本模式, 就是 producer 写到 broker的 topic中的 partition 的副本机制
---------------------
[同步复制]
1.producer联系zk识别leader
2.向leader发送消息
3.leadr收到消息写入到本地log
4.follower从leader pull消息
5.follower向本地写入log
6.follower向leader发送ack消息
7.leader收到所有follower的ack消息
8.leader向producer回传ack


[异步副本]
--------------------------------------------------------------
和同步复制的区别在于 leader写入本地log之后,
直接向client回传ack消息,不需要等待所有follower复制完成。




# kafka 副本模式 & leader 机制 | 官方文档
----------------------------------------------------
ps : 推选leader过程就是follower在zk中注册过程,第一个注册就是leader

The process of choosing the new lead replica is that all followers' In-sync Replicas (ISRs) register themselves with ZooKeeper. The very first registered replica becomes the new lead replica, and the rest of the registered replicas become the followers.

Kafka supports the following replication modes:
• Synchronous replication: In synchronous replication, a producer first identifies the lead replica from ZooKeeper and publishes the message. As soon as the message is published, it is written to the log of the lead replica and all the followers of the lead start pulling the message, and by using a single channel, the order of messages is ensured. Each follower replica sends an acknowledgement to the lead replica once the message is written to its respective logs. Once replications are complete and all expected acknowledgements are received, the lead replica sends an acknowledgement to the producer.
On the consumer side, all the pulling of messages is done from the lead replica.

• Asynchronous replication: The only difference in this mode is that as soon as a lead replica writes the message to its local log, it sends the acknowledgement to the message client and does not wait for the acknowledgements from follower replicas. But as a down side, this mode does not ensure the message delivery in case of broker failure.

5.Kafka java API

5.0 pom 文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.rox</groupId>
<artifactId>Kafka_Demo</artifactId>
<version>1.0-SNAPSHOT</version>


<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>1.0.0</version>
</dependency>
</dependencies>
</project>

5.1实现消息生产者,发送&接受 消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
package com.rox.kafkademo.test;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.producer.KeyedMessage$;
import org.junit.Test;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;


public class TestProducer {


// 发送消息 producer
@Test
public void testSend() {

Properties props = new Properties();
// broker列表
props.put("metadata.broker.list","cs2:9092,cs3:9092,cs4:9092");

// 消息传到 broker 时的序列化方式
props.put("serializer.class", "kafka.serializer.StringEncoder");
// 后面的参数是: 是否获取反馈: 0--不获取反馈,消息有可能获取失败;
//1--获取消息传递给 leader后反馈(其它副本有可能获取失败);
//-1--所有 in-sync replicas 接受到消息时的反馈
props.put("request.required.acks", "1");

//创建生产者配置对象
ProducerConfig config = new ProducerConfig(props);

//创建生产者
Producer<String, String> producer = new Producer<String, String>(config);

KeyedMessage<String, String> msg = new KeyedMessage<String, String>("test2", "100", "hello word fuck you kafka");

// 发送
producer.send(msg);
System.out.println("发送完成");
}


// 接收消息 consumer
@Test
public void testConsumer() {

Properties props = new Properties();
props.put("zookeeper.connect", "cs2:2181");
props.put("group.id", "g2");
props.put("zookeeper.session.timeout.ms", "500");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "smallest");

// 创建消费者配置对象
ConsumerConfig config = new ConsumerConfig(props);

Map<String, Integer> map = new HashMap<String, Integer>();
map.put("test2", new Integer(1));

Map<String, List<KafkaStream<byte[], byte[]>>> msgs = Consumer.createJavaConsumerConnector(new ConsumerConfig(props)).createMessageStreams(map);
List<KafkaStream<byte[], byte[]>> msgList = msgs.get("test2");
for(KafkaStream<byte[],byte[]> stream : msgList){
ConsumerIterator<byte[],byte[]> it = stream.iterator();
while(it.hasNext()){
byte[] message = it.next().message();
System.out.println(new String(message));
}
}
}
}

5.2 新的 API

6.Flume 集成 kafak

6.1KafkaSink

kafka 是 consumer (消费者)

flume 的消息, 经由 kafkaSink 导出到 kafka –最常用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
[flume 为生产者]
1.1) flume 参数配置
----------------------------------------------------
a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type=netcat
a1.sources.r1.bind=localhost
a1.sources.r1.port=8888

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = test3
a1.sinks.k1.kafka.bootstrap.servers = cs2:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1

a1.channels.c1.type=memory

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

1.2) 开启 kafka 消费者
----------------------------------------------------
kafka-console-consumer.sh --bootstrap-server cs2:9092 --topic test3 --from-beginning
注意 : Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
使用 [bootstrap-server]代替[zookeeper]

1.3) flume 执行 kafka_sink.conf
----------------------------------------------------
flume-ng agent -f apps/flume/conf/confs/kafka_sink.conf -n a1

1.4) 验证端口
----------------------------------------------------
netstat -anop | grep 8888
[OR]
lsof -i tcp:8888

1.5) nc 连接
----------------------------------------------------
nc localhost 8888
发消息....

1.6) 观察 kafka 消费者接受消息
----------------------------------------------------
发现特别快!


========================================================
简单案例2
--------------
a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type=exec
#-F 最后10行,如果从头开始收集 -c +0 -F:持续收集后续数据,否则进程停止。
a1.sources.r1.command=tail -F -c +0 /home/ap/calllog/calllog.log

a1.channels.c1.type=memory

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = calllog
a1.sinks.k1.kafka.bootstrap.servers = cs2:9092 cs3:9092 cs4:9092

# How many messages to process in one batch. Larger batches improve throughput while adding latency.
a1.sinks.k1.kafka.flumeBatchSize = 20

# How many replicas must acknowledge a message before its considered successfully written. Accepted values are 0 (Never wait for acknowledgement), 1 (wait for leader only), -1 (wait for all replicas) Set this to -1 to avoid data loss in some cases of leader failure.
a1.sinks.k1.kafka.producer.acks = 1

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

6.2 KafkaSource :TODO

flume source 从 kafka 抓数据, flume source 是消费者, kafka 开启 producer.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
[消费者]
1) 配置
a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = cs2:9092
a1.sources.r1.kafka.topics = test3
a1.sources.r1.kafka.consumer.group.id = g4

a1.sinks.k1.type = logger

a1.channels.c1.type=memory

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2) 启动 flume
flume-ng agent -f /home/ap/apps/flume/conf/confs/kafka_source.conf -n a1 -Dflume.root.logger=INFO,console

3) 启动 kafka producer
kafka-console-producer.sh --broker-list cs2:9092 --topic test3

4) kafka 发消息: 收不到!!!! :TODO

6.3 KafkaChannel :TODO

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
:TODO  也出错 ?? 
越界
生产者 + 消费者
a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = avro
a1.sources.r1.bind = localhost
a1.sources.r1.port = 8888

a1.sinks.k1.type = logger

a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = cs2:9092
a1.channels.c1.kafka.topic = test3
a1.channels.c1.kafka.consumer.group.id = g6
a1.channels.c1.kafka.parseAsFlumeEvent = false
a1.channels.c1.zookeeperConnect= cs2:2181

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
如果帮到你, 可以给我赞助杯咖啡☕️
0%