i-NIO

一. 同步, 异步 & 阻塞, 非阻塞

概念

同步和异步其实指的是,请求发起方对消息结果的获取是主动发起的,还是等被动通知的

  • 如果是请求方主动发起的,一直在等待应答结果(同步阻塞)
  • 如果是结果由服务方通知的,也就是请求方发出请求后,要么在一直等待通知(异步阻塞), 要么就先去干自己的事了(异步非阻塞)

调用了一个函数之后,在等待这个函数返回结果之前,当前的线程是处于挂起状态,还是运行状态:

  • 如果是挂起状态,就意味着当前线程什么都不能干,就等着获取结果,这就叫同步阻塞
  • 如果仍然是运行状态,就意味当前线程是可以的继续处理其他任务,但要时不时的去看下是否有结果了,这就是同步非阻塞

形象的例子

故事:老王烧开水。

出场人物:老王,水壶两把(普通水壶,简称水壶;会响的水壶,简称响水壶)。

老王想了想,有好几种等待方式
1、老王用水壶煮水,并且站在那里,不管水开没开,每隔一定时间看看水开了没 ———-同步阻塞
老王想了想,这种方法不够聪明。

2、老王还是用水壶煮水,不再傻傻的站在那里看水开,跑去寝室上网,但是还是会每隔一段时间过来看看水开了没有,水没有开就走人。 ———-同步非阻塞
老王想了想,现在的方法聪明了些,但是还是不够好。

3、老王这次使用高大上的响水壶来煮水,站在那里,但是不会再每隔一段时间去看水开,而是等水开了,水壶会自动的通知他。 ———-异步阻塞
老王想了想,不会呀,既然水壶可以通知我,那我为什么还要傻傻的站在那里等呢,嗯,得换个方法。

4、老王还是使用响水壶煮水,跑到客厅上网去,等着响水壶自己把水煮熟了以后通知他。 ———-异步非阻塞

老王豁然,这下感觉轻松了很多。


二. NIO 中一些重要概念

Java IO 模型

BIO–同步阻塞:

JDK1.4 以前我们使用的都是 BIO

阻塞到我们的读写方法,阻塞到线程来提高并发性能,但是效果不是很好

NIO–同步非阻塞:(New IO)

JDK1.4 Linux 多路复用技术(select 模式)实现 IO 事件的轮询方式:同步 非阻塞的模式,这种方式目前是主流的网络通信模式

Mina 和 Netty

– 网络通信框架,比自己写 NIO 要容易些,并且代码可读性更好

AIO–异步非阻塞 IO:

JDK1.7(NIO2)真正的异步非阻塞 IO(基于 linux 的 epoll 模式)AIO 目 前使用的还比较少

通道 channel & 缓冲区 buffer

1. 通道 channel

关键词: 模拟流, 打开的连接, 双向可读写, 线程安全, 只能通过缓冲区操作

类型

  • FileChannel:从文件中读写数据;
  • DatagramChannel:通过 UDP 读写网络中数据;
  • SocketChannel:通过 TCP 读写网络中数据;
  • ServerSocketChannel:可以监听新进来的 TCP 连接,对每一个新进来的连接都会创建一个 SocketChannel。

2. 缓冲区 buffer

不能直接对通道进行读写数据,而是要先经过缓冲区。

缓冲区实质上是一个数组, 提供了对数据的结构化访问。

类型:

  • ByteBuffer
  • CharBuffer
  • ShortBuffer
  • IntBuffer
  • LongBuffer
  • FloatBuffer
  • DoubleBuffer

关键词:

  • capacity:最大容量;
  • position:当前已经读写的字节数;
  • limit:还可以读写的字节数。

其它详情可见 Java IO


三. 代码实例

Server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package com.rox.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;

/**
* 注意: selector挑选器, 维护若干集合
* selectionKeys : 注册的key
* selectedKeys : 挑选出来的key -- 有事件的 key
* 客户端 & 服务端对应的客户端 SocketChannel(套接字通道), 一般来讲, 只需要对 read 感兴趣
* 也就是拦截 read 消息, write 的话随时都可以 write
* sc 是可读可写的
*/
public class MyServer {

public static void main(String[] args) throws Exception {

// 分配缓冲区buf内存
ByteBuffer buf = ByteBuffer.allocate(1024);

// 开启挑选器
Selector sel = Selector.open();

// 开启 ServerSocket 通道
ServerSocketChannel ssc = ServerSocketChannel.open();
// 绑定端口
ssc.socket().bind(new InetSocketAddress("localhost", 8888));


// 设置非阻塞
ssc.configureBlocking(false);

// 在挑选器中注册通道(服务器通道, 和感兴趣的事件 - OP_ACCEPT)
ssc.register(sel, SelectionKey.OP_ACCEPT);

// 初始化可选择通道对象(为 ServerSocketChannel & SocketChannel 的共同父类)
SelectableChannel sc = null;

while (true) {
// 挑选器开始选择(阻塞的)
// 如果么有接收到 感兴趣事件, 就塞在这里
sel.select();

// 能走到这一步, 就是已经接收到了 accept 事件
// 迭代挑选出来的 key 的集合
Iterator<SelectionKey> it = sel.selectedKeys().iterator();
while (it.hasNext()) {
SelectionKey key = it.next();

try {
// 如果注册的 key是可接受的, 就一定是服务器通道
if (key.isAcceptable()) {

// 取出该通道, 返回一个 SelectableChannel 的超类对象
sc = key.channel();

// 因为拿到的 key 是 isAcceptable, 所以可以判断是 ssc 对象, 强转
// 并通过 accept()方法, 返回一个 sc 对象(类似于套接字, 与客户端的 sc 对应, 负责跟客户端的 sc 通信)
SocketChannel sc0 = ((ServerSocketChannel) sc).accept();

// 设置 sc0 为非阻塞
sc0.configureBlocking(false);

// 为接收到的这个 sc 在选择器中, 注册读事件
sc0.register(sel, SelectionKey.OP_READ);
}

// 下一次轮询, 发现是来自 读事件 的key
if (key.isReadable()) {
// 取出 channel, 直接强转为 SocketChannel
SocketChannel sc1 = (SocketChannel) key.channel();

/// 回复客户端消息, 前面加个头'hello', 然后再写回去
// 创建消息字节数组
byte[] helloBytes = "hello: ".getBytes();
// 把字节数组放入 buf
buf.put(helloBytes, 0, helloBytes.length);

// 读取消息
// 不存在读完, 只要不为0, 就一直轮询读
// 从通道里读出来,放到缓冲区里
while (sc1.read(buf) > 0) {
// 拍板, 定稿, > 0说明有数据
// position 归0, limit 置在 已写元素 的后面一格, 此时不接受其它写入了
buf.flip();
// 持有的 sc 对象写入到channel
sc1.write(buf);
// 写完后 清空, position 归0, limit 最大
buf.clear();
}
}
} catch (Exception e) {
// 如果失败, 移除本次接收到的 keys
sel.keys().remove(key);
}
}

// 本次选择器事件处理完了之后
// 移除所有选择出来的 key
sel.selectedKeys().clear();
}
}
}

Client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package com.rox.nio;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;

public class MyClient {

public static void main(String[] args) throws Exception {

// buf
ByteBuffer buf = ByteBuffer.allocate(1024 * 8);

// 开一个 selector
Selector sel = Selector.open();

// 开一个套接字通道
SocketChannel sc = SocketChannel.open();

// 下面2种连接没区别
sc.socket().connect(new InetSocketAddress("localhost", 8888));
// sc.connect(new InetSocketAddress("localhost", 8888));

// 配置是否阻塞
sc.configureBlocking(false);

// 注册 sel 对象及 关注的 key
sc.register(sel, SelectionKey.OP_READ);

// 往 buf 中 put 进数据(一次)
// buf.put("tom".getBytes());
// buf.flip();
// sc.write(buf);
// buf.clear();

// 如果想要持续不断的通信, 可以开个分线程 (持续收发消息)
// 程序是从上往下运行的, 运行到下面没有接受到消息的话会塞住
// 一旦塞住了, 也就无法继续同服务端通信了
new Thread(){
public void run() {
int i = 0 ;
// 这里创建一个新的缓冲区专用
ByteBuffer buf2 = ByteBuffer.allocate(1024 * 8);
while(true){
try {
buf2.put(("Tom" + (i ++)).getBytes());
buf2.flip(); //切换读写
sc.write(buf2);
buf2.clear();
Thread.sleep(500);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}.start();

while (true) {
// 选择器选择, 接受 服务端返回的 key
sel.select();

ByteArrayOutputStream baos = new ByteArrayOutputStream();

// 从通道读取到缓冲区
while (sc.read(buf) > 0) {
// 读到了数据, 就拍一下板
buf.flip();
// 从缓冲区 buf 写入到 ByteArrayOutputStream 流
baos.write(buf.array(), 0, buf.limit());
// 写完了就清空 缓冲区buf
buf.clear();
}

// 打印接受到的数据
System.out.println(new String(baos.toByteArray()));
baos.close();
// 清空 selectedKeys的 set
sel.selectedKeys().clear();
}
}
}

其它的代码参考我的 Github

mygithub

如果帮到你, 可以给我赞助杯咖啡☕️
0%