技术博客 技术博客
  • JAVA
  • 仓颉
  • 设计模式
  • 人工智能
  • Spring
  • Mybatis
  • Maven
  • Git
  • Kafka
  • RabbitMQ
  • RocketMQ
  • Redis
  • Zookeeper
  • Nginx
  • 数据库套件
  • MySQL
  • Elasticsearch
  • MongoDB
  • Hadoop
  • ClickHouse
  • Hbase
  • Hive
  • Flink
  • Flume
  • SQLite
  • linux
  • Docker
  • Jenkins
  • Kubernetes
  • 工具
  • 前端
  • AI
GitHub (opens new window)
  • JAVA
  • 仓颉
  • 设计模式
  • 人工智能
  • Spring
  • Mybatis
  • Maven
  • Git
  • Kafka
  • RabbitMQ
  • RocketMQ
  • Redis
  • Zookeeper
  • Nginx
  • 数据库套件
  • MySQL
  • Elasticsearch
  • MongoDB
  • Hadoop
  • ClickHouse
  • Hbase
  • Hive
  • Flink
  • Flume
  • SQLite
  • linux
  • Docker
  • Jenkins
  • Kubernetes
  • 工具
  • 前端
  • AI
GitHub (opens new window)
  • JAVA

    • 基础

      • JAVA 锁 及 线程
      • JAVA NIO API
        • 概述
        • Buffer
        • Channel
          • server端
          • client端
        • Selector
          • 服务端代码
          • 客户端代码
        • FileChannel
      • JVM 模块介绍
    • 版本

      • JAVA8 新特性总结
      • JAVA9 新特性总结
      • JAVA 10/11/12/13/14/15/16/17 新特性总结
    • 其他

      • jar 打包成.exe可执行文件
      • java代码混淆之 ProGuard
      • JAVA 性能监控(jvisualvm)及测试(JMeter)
      • Alibaba Arthas
      • jar启动脚本
  • 仓颉

    • 基础

      • 仓颉介绍
  • 设计模式

    • 代理模式
  • 人工智能

    • 线性回归
    • Pytorch
目录

JAVA NIO API

# 概述

Non-Blocking I/O,是一种非阻塞通信模型。在 java.nio 式 jdk1.4 版本引入的一套 API,我们可以利用这套 API 实现非阻塞的网络编程模型。

大数据和实时计算的兴起,高性能 RPC 框架与网络编程技术再次成伟焦点。比图 Fackebook 的 Thrift 框架,scala 的 Akka 框架,实时流领域的 Storm、Spark 框架,又或者开源分布式数据库的 Mycat、VoltDB,这些框架的底层通信都采用了 NIO 通信技术。而 java 领域里大名鼎鼎的 NIO 框架 ——Netty,则被众多的开源或商业软件所采用。

NIO 适合高并发、高访问量、段请求

# Buffer

Buffer 缓冲区,是 NIO 通讯时数据的载体。常用的缓冲区是 ByteBuffer (字节缓冲区)。

// 创建10个字节的缓冲区
ByteBuffer buffer = ByteBuffer.allocate(capacity);
1
2

缓冲区的属性:

  • capacity (容量):决定了存储容量的上线,一经写定,不能更改。
  • limit (限制):限制的初始位置 = capacity
  • position (位置):初始值是 0,但每当插入一个字节,就会向后移动一位

ByteBuffer 默认用的子类是 HeapByteBuffer(堆内字节缓冲区),这种类型的缓冲区,在 JVM 的堆中创建的,即缓冲区的生命周期由 (GC) JVM 管理的。MappedByteBuffer(堆外字节缓冲区),可以使用操作系统的内存,使用场景当创建大的字节缓冲区时,注意:如果使用堆外,生命周期的管理需要自来实现。

缓冲区的方法:

  • get () 方法会根据当前 position 的位置取值,才外,get () 没调用一次,位置会移动一位。
  • buffer.limit (buffer.position ()) 获取 buffer 的 position 位置,并赋予 limit 限制
  • buffer.flip () 和 buffer.limit (buffer.position ()) 类似
  • ByteBuffer.wrap (”test“.getByte ()) 根据传入的字节数组创建对应大小的缓冲区,并写入数据,写完后,会自动掉用 flip () 方法
  • clear () 该方法不会清除缓冲区的数据,只会把 position 重置为 0,让后面的 (3 字节) 新数据,覆盖前面的 (6 字节) 老数据,但还有 3 字节的数据没有覆盖,可以使用 flip () 来清除掉未覆盖的老数据。
  • hasRemaining () 判断 limit 和 position 之间是否还有元素可读,有返回 true,无返回 false

# Channel

Channel 是通道,就像告诉公路从 A 城市通往 B 城市,而 buffer 就是货车。和 socket 的连接方式差不多,具体代码如下:

# server 端

    public static void main(String[] args) throws Exception {
        ServerSocketChannel server = ServerSocketChannel.open();
        // 设置 socket server 为非阻塞通信
        server.configureBlocking(false);
        // 绑定 本地 ip和8888 做为服务器连接口
        server.bind(new InetSocketAddress(8888));

        SocketChannel sc = null;
        // 等待接收到一个通信连接
        while (sc == null){
            sc = server.accept();
        }
        // 设置改通信连接 也为非阻塞模式
        sc.configureBlocking(false);

        ByteBuffer buffer = ByteBuffer.allocate(10);
        // 读取写道Buffer
        sc.read(buffer);
        String str = new String(buffer.array());
        System.out.println("服务收到消息:"+str);
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

# client 端

    public static void main(String[] args) throws Exception {
        SocketChannel sock = SocketChannel.open();
        sock.connect(new InetSocketAddress("127.0.0.1",8888));
        ByteBuffer buffer = ByteBuffer.wrap("hellow".getBytes());
        sock.write(buffer);
    }
1
2
3
4
5
6

# Selector

Selector 一般称 为选择器 ,当然你也可以翻译为 多路复用器 。它是 Java NIO 核心组件中的一个,用于检查一个或多个 NIO Channel(通道)的状态是否处于可读、可写。如此可以实现单线程管理多个 channels, 也就是可以管理多个网络链接。
使用 Selector 的好处在于: 使用更少的线程来就可以来处理通道了, 相比使用多个线程,避免了线程上下文切换带来的开销。

# 服务端代码

    public static void main(String[] args) throws Exception {
        ServerSocketChannel server = ServerSocketChannel.open();
        server.configureBlocking(false);
        server.bind(new InetSocketAddress(8888));
        // 获取多路复用选择器
        Selector selector = Selector.open();
        // 服务端注册 接收客户端的 监听事件
        server.register(selector, SelectionKey.OP_ACCEPT);
        while(true){
            // 该方法会阻塞,直到有事件到达u,才会放心
            selector.select();
            // 获取所有事件的key,走到这代表有事件来了
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            // 准备迭代所有事件
            Iterator<SelectionKey> selectionKeyIterable = selectionKeys.iterator();
            while (selectionKeyIterable.hasNext()){
                // 获取事件
                SelectionKey selectionKey = selectionKeyIterable.next();
                // 表示有客户连接
                if(selectionKey.isAcceptable()){
                    // 得到连接
                    ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
                    // 建立和对应客户通信的通道
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    // 谁知为非阻塞通信
                    socketChannel.configureBlocking(false);
                    // 注册拥有读写事件
                    socketChannel.register(selector, SelectionKey.OP_READ|SelectionKey.OP_WRITE);
                }
                // 表示有数据发送到服务端,这里可以不用继续设置 configureBlocking 为非阻塞,在连接的时候通道已经标记过了
                if(selectionKey.isReadable()){
                    // 得到连接
                    SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                    ByteBuffer byteBuffer = ByteBuffer.allocateDirect(20);
                    socketChannel.read(byteBuffer);
                    while (byteBuffer.hasRemaining()){
                        System.out.println("收到客户端的通信: "+new String(byteBuffer.array()));
                    }
                    
                }
                // 给客户端发送数据
                if(selectionKey.isWritable()){
                    SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                    ByteBuffer byteBuffer = ByteBuffer.wrap("3333333".getBytes());
                    // 读写时非阻塞的,为了确保读写完整性,需要加上 hasRemaining
                    while (byteBuffer.hasRemaining()){
                        socketChannel.write(byteBuffer);
                    }
                }
                // 处理完毕移除事件,避免重复
                selectionKeyIterable.remove();
            }
        }
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54

# 客户端代码

    public static void main(String[] args) throws Exception {
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.configureBlocking(false);
        socketChannel.connect(new InetSocketAddress("127.0.0.1",8888));
        Selector selector = Selector.open();
        socketChannel.register(selector,SelectionKey.OP_CONNECT);
        while(true){
            selector.select();
            Iterator<SelectionKey> selectionKeyIterator = selector.selectedKeys().iterator();
            while (selectionKeyIterator.hasNext()){
                SelectionKey selectionKey = selectionKeyIterator.next();
                if(selectionKey.isConnectable()){
                    SocketChannel sc = (SocketChannel) selectionKey.channel();
                    sc.register(selector,SelectionKey.OP_READ|SelectionKey.OP_WRITE);
                }
                if(selectionKey.isReadable()){
                    SocketChannel sc = (SocketChannel) selectionKey.channel();
                    ByteBuffer byteBuffer = ByteBuffer.allocate(10);
                    while (byteBuffer.hasRemaining()){
                        System.out.println("收到服务端消息:"+new String(byteBuffer.array()));
                    } 
                }
                if(selectionKey.isWritable()){
                    SocketChannel sc = (SocketChannel) selectionKey.channel();
                    ByteBuffer byteBuffer = ByteBuffer.wrap("3333333".getBytes());
                    // 读写时非阻塞的,为了确保读写完整性,需要加上 hasRemaining
                    while (byteBuffer.hasRemaining()){
                        sc.write(byteBuffer);
                    }
                }
                selectionKeyIterator.remove();
            }
        }
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

# FileChannel

    public static void main(String[] args) throws Exception {
        FileChannel write = new FileOutputStream(new File("1.txt")).getChannel();
        ByteBuffer writeByteBuffer = ByteBuffer.wrap("2321312".getBytes());
        if(writeByteBuffer.hasRemaining()) {
            write.write(writeByteBuffer);
        }
        write.close();

        FileChannel read = new FileInputStream(new File("1.txt")).getChannel();
        ByteBuffer readByteBuffer = ByteBuffer.wrap("2321312".getBytes());
        read.read(readByteBuffer);
        if(readByteBuffer.hasRemaining()) {
            System.out.println("文件内容:"+new String(readByteBuffer.array()));
        }
        read.close();

    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
上次更新: 6/11/2025, 4:10:30 PM
JAVA 锁 及 线程
JVM 模块介绍

← JAVA 锁 及 线程 JVM 模块介绍→

Theme by Vdoing | Copyright © 2023-2025
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式