JAVA NIO API
# 概述
Non-Blocking I/O,是一种非阻塞通信模型。在 java.nio 式 jdk1.4 版本引入的一套 API,我们可以利用这套 API 实现非阻塞的网络编程模型。
大数据和实时计算的兴起,高性能 RPC 框架与网络编程技术再次成伟焦点。比图 Fackebook 的 Thrift 框架,scala 的 Akka 框架,实时流领域的 Storm、Spark 框架,又或者开源分布式数据库的 Mycat、VoltDB,这些框架的底层通信都采用了 NIO 通信技术。而 java 领域里大名鼎鼎的 NIO 框架 ——Netty,则被众多的开源或商业软件所采用。
NIO 适合高并发、高访问量、段请求
# Buffer
Buffer 缓冲区,是 NIO 通讯时数据的载体。常用的缓冲区是 ByteBuffer (字节缓冲区)。
// 创建10个字节的缓冲区
ByteBuffer buffer = ByteBuffer.allocate(capacity);
2
缓冲区的属性:
- capacity (容量):决定了存储容量的上线,一经写定,不能更改。
- limit (限制):限制的初始位置 = capacity
- position (位置):初始值是 0,但每当插入一个字节,就会向后移动一位
ByteBuffer 默认用的子类是 HeapByteBuffer(堆内字节缓冲区),这种类型的缓冲区,在 JVM 的堆中创建的,即缓冲区的生命周期由 (GC) JVM 管理的。MappedByteBuffer(堆外字节缓冲区),可以使用操作系统的内存,使用场景当创建大的字节缓冲区时,注意:如果使用堆外,生命周期的管理需要自来实现。
缓冲区的方法:
- get () 方法会根据当前 position 的位置取值,才外,get () 没调用一次,位置会移动一位。
- buffer.limit (buffer.position ()) 获取 buffer 的 position 位置,并赋予 limit 限制
- buffer.flip () 和 buffer.limit (buffer.position ()) 类似
- ByteBuffer.wrap (”test“.getByte ()) 根据传入的字节数组创建对应大小的缓冲区,并写入数据,写完后,会自动掉用 flip () 方法
- clear () 该方法不会清除缓冲区的数据,只会把 position 重置为 0,让后面的 (3 字节) 新数据,覆盖前面的 (6 字节) 老数据,但还有 3 字节的数据没有覆盖,可以使用 flip () 来清除掉未覆盖的老数据。
- hasRemaining () 判断 limit 和 position 之间是否还有元素可读,有返回 true,无返回 false
# Channel
Channel 是通道,就像告诉公路从 A 城市通往 B 城市,而 buffer 就是货车。和 socket 的连接方式差不多,具体代码如下:
# server 端
public static void main(String[] args) throws Exception {
ServerSocketChannel server = ServerSocketChannel.open();
// 设置 socket server 为非阻塞通信
server.configureBlocking(false);
// 绑定 本地 ip和8888 做为服务器连接口
server.bind(new InetSocketAddress(8888));
SocketChannel sc = null;
// 等待接收到一个通信连接
while (sc == null){
sc = server.accept();
}
// 设置改通信连接 也为非阻塞模式
sc.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(10);
// 读取写道Buffer
sc.read(buffer);
String str = new String(buffer.array());
System.out.println("服务收到消息:"+str);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# client 端
public static void main(String[] args) throws Exception {
SocketChannel sock = SocketChannel.open();
sock.connect(new InetSocketAddress("127.0.0.1",8888));
ByteBuffer buffer = ByteBuffer.wrap("hellow".getBytes());
sock.write(buffer);
}
2
3
4
5
6
# Selector
Selector 一般称 为选择器 ,当然你也可以翻译为 多路复用器 。它是 Java NIO 核心组件中的一个,用于检查一个或多个 NIO Channel(通道)的状态是否处于可读、可写。如此可以实现单线程管理多个 channels, 也就是可以管理多个网络链接。
使用 Selector 的好处在于: 使用更少的线程来就可以来处理通道了, 相比使用多个线程,避免了线程上下文切换带来的开销。
# 服务端代码
public static void main(String[] args) throws Exception {
ServerSocketChannel server = ServerSocketChannel.open();
server.configureBlocking(false);
server.bind(new InetSocketAddress(8888));
// 获取多路复用选择器
Selector selector = Selector.open();
// 服务端注册 接收客户端的 监听事件
server.register(selector, SelectionKey.OP_ACCEPT);
while(true){
// 该方法会阻塞,直到有事件到达u,才会放心
selector.select();
// 获取所有事件的key,走到这代表有事件来了
Set<SelectionKey> selectionKeys = selector.selectedKeys();
// 准备迭代所有事件
Iterator<SelectionKey> selectionKeyIterable = selectionKeys.iterator();
while (selectionKeyIterable.hasNext()){
// 获取事件
SelectionKey selectionKey = selectionKeyIterable.next();
// 表示有客户连接
if(selectionKey.isAcceptable()){
// 得到连接
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
// 建立和对应客户通信的通道
SocketChannel socketChannel = serverSocketChannel.accept();
// 谁知为非阻塞通信
socketChannel.configureBlocking(false);
// 注册拥有读写事件
socketChannel.register(selector, SelectionKey.OP_READ|SelectionKey.OP_WRITE);
}
// 表示有数据发送到服务端,这里可以不用继续设置 configureBlocking 为非阻塞,在连接的时候通道已经标记过了
if(selectionKey.isReadable()){
// 得到连接
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(20);
socketChannel.read(byteBuffer);
while (byteBuffer.hasRemaining()){
System.out.println("收到客户端的通信: "+new String(byteBuffer.array()));
}
}
// 给客户端发送数据
if(selectionKey.isWritable()){
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
ByteBuffer byteBuffer = ByteBuffer.wrap("3333333".getBytes());
// 读写时非阻塞的,为了确保读写完整性,需要加上 hasRemaining
while (byteBuffer.hasRemaining()){
socketChannel.write(byteBuffer);
}
}
// 处理完毕移除事件,避免重复
selectionKeyIterable.remove();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# 客户端代码
public static void main(String[] args) throws Exception {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress("127.0.0.1",8888));
Selector selector = Selector.open();
socketChannel.register(selector,SelectionKey.OP_CONNECT);
while(true){
selector.select();
Iterator<SelectionKey> selectionKeyIterator = selector.selectedKeys().iterator();
while (selectionKeyIterator.hasNext()){
SelectionKey selectionKey = selectionKeyIterator.next();
if(selectionKey.isConnectable()){
SocketChannel sc = (SocketChannel) selectionKey.channel();
sc.register(selector,SelectionKey.OP_READ|SelectionKey.OP_WRITE);
}
if(selectionKey.isReadable()){
SocketChannel sc = (SocketChannel) selectionKey.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(10);
while (byteBuffer.hasRemaining()){
System.out.println("收到服务端消息:"+new String(byteBuffer.array()));
}
}
if(selectionKey.isWritable()){
SocketChannel sc = (SocketChannel) selectionKey.channel();
ByteBuffer byteBuffer = ByteBuffer.wrap("3333333".getBytes());
// 读写时非阻塞的,为了确保读写完整性,需要加上 hasRemaining
while (byteBuffer.hasRemaining()){
sc.write(byteBuffer);
}
}
selectionKeyIterator.remove();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# FileChannel
public static void main(String[] args) throws Exception {
FileChannel write = new FileOutputStream(new File("1.txt")).getChannel();
ByteBuffer writeByteBuffer = ByteBuffer.wrap("2321312".getBytes());
if(writeByteBuffer.hasRemaining()) {
write.write(writeByteBuffer);
}
write.close();
FileChannel read = new FileInputStream(new File("1.txt")).getChannel();
ByteBuffer readByteBuffer = ByteBuffer.wrap("2321312".getBytes());
read.read(readByteBuffer);
if(readByteBuffer.hasRemaining()) {
System.out.println("文件内容:"+new String(readByteBuffer.array()));
}
read.close();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17