i-Zookeeper

1.Zookeeper 的安装&概述

1.1. Zookeeper 简介

Zookeeper是一个开源的分布式的,为分布式应用提供协调服务的Apache项目。

zk提供的服务

  • Naming service //按名称区分集群中的节点.

  • Configuration management //对加入节点的最新化处理。

  • Cluster management //实时感知集群中节点的增减.
  • Leader election //leader选举
  • Locking and synchronization service //修改时锁定数据,实现容灾.
  • Highly reliable data registry //节点宕机数据也是可用的。

1.2. ZK 的工作机制

image-20180623145528682

1.4. ZK 架构

image-20180623144232085

1.4.1 名词解释

1.Client

​ 从server获取信息,周期性发送数据给server,表示自己还活着。

​ client连接时,server回传ack信息。

​ 如果client没有收到reponse,自动重定向到另一个server.

​ 2.Server

​ zk集群中的一员,向client提供所有service,回传ack信息给client,表示自己还活着。

​ 3.ensemble

​ 一组服务器。

​ 最小节点数是3.

​ 4.Leader

​ 如果连接的节点失败,自定恢复,zk服务启动时,完成leader选举。

​ 5.Follower

​ 追寻leader指令的节点。

1.4.2 整体解释

  • 1)Zookeeper:一个领导者(leader),多个跟随者(follower)组成的集群。
  • 2)Leader负责进行投票的发起和决议,更新系统状态
  • 3)Follower用于接收客户请求并向客户端返回结果,在选举Leader过程中参与投票
  • 4)集群中只要有半数以上节点存活,Zookeeper集群就能正常服务。
  • 5)全局数据一致:每个server保存一份相同的数据副本,client无论连接到哪个server,数据都是一致的。
  • 6)更新请求顺序进行,来自同一个client的更新请求按其发送顺序依次执行。
  • 7)数据更新原子性,一次数据更新要么成功,要么失败。
  • 8)实时性,在一定时间范围内,client能读到最新数据。

1.5. znode

image-20180623144202281

ZooKeeper数据模型的结构与Unix文件系统很类似,整体上可以看作是一棵树,每个节点称做一个ZNode。每一个ZNode默认能够存储1MB的数据,每个ZNode都可以通过其路径唯一标识

zk中的节点,维护了stat,由Version number, Action control list (ACL), Timestamp,Data length.构成.

data version //数据写入的过程变化

ACL //action control list,

1.6. 节点类型

​ 1.持久节点

​ client结束,还存在。

​ 2.临时节点

​ 在client活动时有效,断开自动删除。临时节点不能有子节点。

​ leader推选时使用。

​ 3.序列节点

​ 在节点名之后附加10个数字,主要用于同步和锁.

1.7. Session

​ Session中的请求以FIFO执行,一旦client连接到server,session就建立了。sessionid分配client.

​ client以固定间隔向server发送心跳,表示session是valid的,zk集群如果在超时时候,没有收到心跳,

​ 判定为client挂了,与此同时,临时节点被删除。

1.8. Watches

​ 观察。

​ client能够通过watch机制在数据发生变化时收到通知。

​ client可以在read 节点时设置观察者。watch机制会发送通知给注册的客户端。

​ 观察模式只触发一次。

​ session过期,watch机制删除了。

1.9. zk工作流程

​ zk集群启动后,client连接到其中的一个节点,这个节点可以是leader,也可以是follower。

​ 连通后,node分配一个id给client,发送ack信息给client。

​ 如果客户端没有收到ack,连接到另一个节点。

​ client周期性发送心跳信息给节点保证连接不会丢失。

​ 如果client读取数据,发送请求给node,node读取自己数据库,返回节点数据给client.

​ 如果client想要在 zk 中存储数据,将路径和数据发送给server,server转发给leader。

​ leader再补发请求给所有follower。只有大多数(超过半数)节点成功响应,则

​ 写操作成功。

1.10 zk 应用场景

1.10.1 统一命名服务

image-20180623153654235

1.10.2 统一配管理

image-20180623153811327

1.10.3 统一集群管理

image-20180623153914499

1.10.4 服务器动态上下线

image-20180623154113232

1.10.5 软负载均衡

image-20180623154212560

2.Zookeeper 完全分布式的安装&基本使用

2.1 安装

2.1.1 高可用安装

PS: 高可用安装见这里

2.2.2 完全分布式安装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
1.挑选3台主机
cs1 ~ cs3
2.每台机器都安装zk & 配置环境变量

3.配置zk配置文件
cs1 ~ cs3
[/home/ap/apps/zk/conf/zoo.cfg]
----------
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/home/ap/zookeeper
clientPort=2181

server.1=cs1:2888:3888
server.2=cs2:2888:3888
server.3=cs3:2888:3888
---------

4.在每台主机的/home/ap/zookeeper中添加myid,内容分别是1,2,3
[cs1]
$>echo 1 > /home/ap/zookeeper/myid
[cs2]
$>echo 2 > /home/ap/zookeeper/myid
[cs3]
$>echo 3 > /home/ap/zookeeper/myid

5.在cs1~cs3上,启动服务器集群
$>zkServer.sh start

6.查看每台服务器的状态
$>zkServer.sh status
* 注意: 如果有3台机器, 只启动一台的话, 会显示如下, 因为根据配置, 有3台机器, 此时只有一台启动, 没有过半数, 整个集群是挂掉的
* 只有启动的机器数量超过配置的半数, zk 集群才有效.

2.2.3 zoo.cfg 文件中配置参数的含义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
1)tickTime=2000:通信心跳数
tickTime:通信心跳数,Zookeeper服务器心跳时间,单位毫秒
Zookeeper使用的基本时间,服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个tickTime时间就会发送一个心跳,时间单位为毫秒。
它用于心跳机制,并且设置最小的session超时时间为两倍心跳时间。(session的最小超时时间是2*tickTime)
2)initLimit=10:LF初始通信时限
initLimit:LF初始通信时限
集群中的follower跟随者服务器(F)与leader领导者服务器(L)之间初始连接时能容忍的最多心跳数(tickTime的数量),用它来限定集群中的Zookeeper服务器连接到Leader的时限。
投票选举新leader的初始化时间
Follower在启动过程中,会从Leader同步所有最新数据,然后确定自己能够对外服务的起始状态。
Leader允许F在initLimit时间内完成这个工作。
3)syncLimit=5:LF同步通信时限
syncLimit:LF同步通信时限
集群中Leader与Follower之间的最大响应时间单位,假如响应超过syncLimit * tickTime,
Leader认为Follwer死掉,从服务器列表中删除Follwer。
在运行过程中,Leader负责与ZK集群中所有机器进行通信,例如通过一些心跳检测机制,来检测机器的存活状态。
如果L发出心跳包在syncLimit之后,还没有从F那收到响应,那么就认为这个F已经不在线了。
4)dataDir:数据文件目录+数据持久化路径
dataDir:数据文件目录+数据持久化路径
保存内存数据库快照信息的位置,如果没有其他说明,更新的事务日志也保存到数据库。
5)clientPort=2181:客户端连接端口
监听客户端连接的端口
6) Server.A=B:C:D。
A是一个数字,表示这个是第几号服务器;
B是这个服务器的ip地址;
C是这个服务器与集群中的Leader服务器交换信息的端口;
D是万一集群中的Leader服务器挂了,需要一个端口来重新进行选举,选出一个新的Leader,而这个端口就是用来执行选举时服务器相互通信的端口。
# 集群模式下配置一个文件myid,这个文件在dataDir目录下,这个文件里面有一个数据就是A的值,Zookeeper启动时读取此文件,拿到里面的数据与zoo.cfg里面的配置信息比较从而判断到底是哪个server。

2.2 ZK客户端的连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
$>zkCli.sh -server cs1:2181    		//进入zk命令行
$zk]help //查看帮助
$zk]quit //退出
$zk]create /a tom //创建节点
$zk]get /a //查看数据
$zk]ls / //列出节点
$zk]ls2 / //查看当前节点数据并能看到更新次数等数据
$zk]set /a tom //设置数据
$zk]delete /a //删除一个节点
$zk]rmr /a //递归删除所有节点。
$zk]stat / //查看节点状态

注意:
* 创建节点不能递归创建, 目前只能一层一层创建
* 每次创建都的写数据, 只创建目录的话会创建不成功
* 如果 close 后, 并没有退出客户端, 只是访问不到数据, 此时如果想要连接, 可以用connect host:port
================================================================================

ZK 的帮助文档
----------------
help:
ZooKeeper -server host:port cmd args
stat path [watch]
set path data [version]
ls path [watch]
delquota [-n|-b] path
ls2 path [watch]
setAcl path acl
setquota -n|-b val path
history
redo cmdno
printwatches on|off
delete path [version]
sync path
listquota path
rmr path
get path [watch]
create [-s] [-e] path data acl
addauth scheme auth
quit
getAcl path
close
connect host:port

2.3 监听测试

注意点: 注册的监听只能使用一次, 监听完毕后需要重新注册.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
=======节点值的变化的监听==========
(1)在cs1主机上注册监听/app1节点数据变化
[zk: localhost:2181(CONNECTED) 26] get /app1 watch

(2)在cs2主机上修改/app1节点的数据
[zk: localhost:2181(CONNECTED) 5] set /app1 777

(3)观察cs2主机收到数据变化的监听
WATCHER::
WatchedEvent state:SyncConnected type:NodeDataChanged path:/app1


=======节点的子节点变化的监听==========
(1)在cs1主机上注册监听/app1节点的子节点变化
[zk: localhost:2181(CONNECTED) 1] ls /app1 watch
[aa0000000001, server101]

(2)在cs2主机/app1节点上创建子节点
[zk: localhost:2181(CONNECTED) 6] create /app1/bb 666
Created /app1/bb

(3)观察cs1主机收到子节点变化的监听
WATCHER::
WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/app1

3. ZK 内部机制

3.1 选举机制

3.1.1. zookeeper的选举机制(全新集群paxos)

以一个简单的例子来说明整个选举的过程.
假设有五台服务器组成的zookeeper集群,它们的id从1-5,同时它们都是最新启动的,也就是没有历史数据,在存放数据量这一点上,都是一样的.假设这些服务器依序启动,来看看会发生什么.
1) 服务器1启动,此时只有它一台服务器启动了,它发出去的报没有任何响应,所以它的选举状态一直是LOOKING状态
2) 服务器2启动,它与最开始启动的服务器1进行通信,互相交换自己的选举结果,由于两者都没有历史数据,所以id值较大的服务器2胜出,但是由于没有达到超过半数以上的服务器都同意选举它(这个例子中的半数以上是3),所以服务器1,2还是继续保持LOOKING状态.
3) 服务器3启动,根据前面的理论分析,服务器3成为服务器1,2,3中的老大,而与上面不同的是,此时有三台服务器选举了它,所以它成为了这次选举的leader.
4) 服务器4启动,根据前面的分析,理论上服务器4应该是服务器1,2,3,4中最大的,但是由于前面已经有半数以上的服务器选举了服务器3,所以它只能接收当小弟的命了.
5) 服务器5启动,同4一样,当小弟.

3.1.2. 非全新集群的选举机制(数据恢复)

那么,初始化的时候,是按照上述的说明进行选举的,但是当zookeeper运行了一段时间之后,有机器down掉,重新选举时,选举过程就相对复杂了。

需要加入数据id、leader id和逻辑时钟。

数据id:数据新的id就大,数据每次更新都会更新id。

Leader id:就是我们配置的myid中的值,每个机器一个。

逻辑时钟:这个值从0开始递增,每次选举对应一个值,也就是说: 如果在同一次选举中,那么这个值应该是一致的 ; 逻辑时钟值越大,说明这一次选举leader的进程更新.

选举的标准就变成:

​ 1、逻辑时钟小的选举结果被忽略,重新投票

​ 2、统一逻辑时钟后,数据id大的胜出

​ 3、数据id相同的情况下,leader id大的胜出

根据这个规则选出leader。

3.2 stat 的结构

  • 1)czxid- 引起这个znode创建的zxid,创建节点的事务的zxid
    • 每次修改ZooKeeper状态都会收到一个zxid形式的时间戳,也就是ZooKeeper事务ID。
      事务ID是ZooKeeper中所有修改总的次序。每个修改都有唯一的zxid,如果zxid1小于zxid2,那么zxid1在zxid2之前发生。
  • 2)ctime - znode被创建的毫秒数(从1970年开始)
  • 3)mzxid - znode最后更新的zxid
  • 4)mtime - znode最后修改的毫秒数(从1970年开始)
  • 5)pZxid-znode最后更新的子节点zxid
  • 6)cversion - znode子节点变化号,znode子节点修改次数
  • 7)dataversion - znode数据变化号
  • 8)aclVersion - znode访问控制列表的变化号
  • 9)ephemeralOwner- 如果是临时节点,这个是znode拥有者的session id。如果不是临时节点则是0。
  • 10)dataLength- znode的数据长度
  • 11)numChildren - znode子节点数量

4. 通过 JavaAPI 访问 ZK

4.1 添加Maven 依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
9.1[pom.xml]
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.it18zhang</groupId>
<artifactId>ZooKeeperDemo</artifactId>
<version>1.0-SNAPSHOT</version>

<dependencies>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.9</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
</dependency>
</dependencies>
</project>

4.2 Java 代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
* 对应关系:
* Linux Java
----------------------
* ls getChildren
* get getData
* set setDAta
* create create
˘˘˘˘˘˘˘˘˘˘˘˘˘˘˘˘˘˘˘˘˘˘˘˘˘˘˘˘˘˘˘˘˘˘˘˘˘˘˘˘˘˘˘˘



package com.rox.zktest;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.junit.Test;

import java.io.IOException;
import java.util.List;


/**
* 对应关系:
* Linux Java
* ls getChildren
* get getData
* set setDAta
* create create
*/

public class TestZK {
@Test
public void ls() throws IOException, KeeperException, InterruptedException {

/**
* 放三个就不行, 只能放2个目前来看
*/
ZooKeeper zk = new ZooKeeper("cs1:2181,cs2:2181,cs3:2181", 5000, null);
List<String> list = zk.getChildren("/", null);

for (String s : list) {
System.out.println(s);
}
}

@Test
public void lsAll() {
try {
ls("/");
} catch (Exception e) {
e.printStackTrace();
}
}


/**
* 列出指定 path 下的children
*
* @param path
*/
public void ls(String path) throws Exception {
System.out.println(path);

ZooKeeper zk = new ZooKeeper("cs1:2181,cs2:2181,cs3:2181", 5000, null);
List<String> list = zk.getChildren(path, null);
if (list == null || list.isEmpty()) {
return;
}
for (String s : list) {
// 先输出 children
if (path.equals("/" )) {
ls(path + s);
} else {
ls(path + "/" + s);
}
}
}


/**
* 设置数据
*/
@Test
public void setData() throws Exception {
ZooKeeper zk = new ZooKeeper("cs1:2181", 5000, null);
zk.setData("/a","tomaslee".getBytes(),0);
}


/**
* 创建临时节点
*/
@Test
public void createEPHEMERAL() throws Exception {
ZooKeeper zk = new ZooKeeper("cs1:2181", 5000, null);
zk.create("/c/c1", "tom".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL);

System.out.println("hello");
}



/**
* 创建观察者
*/
@Test
public void testWatch() throws Exception {
final ZooKeeper zk = new ZooKeeper("cs1:2181,cs2:2181,cs3:2181", 5000, null);

Stat st = new Stat();

Watcher w = null;
w = new Watcher() {
public void process(WatchedEvent event) {

try {
System.out.println("数据改了...");
zk.getData("/a",this,null);
}catch (Exception e) {
e.printStackTrace();
}
}
};

byte[] data = zk.getData("/a",w,st);
System.out.println(new String(data));

while (true) {
Thread.sleep(1000);
}
}
}
如果帮到你, 可以给我赞助杯咖啡☕️
0%