技术博客 技术博客
  • JAVA
  • 仓颉
  • 设计模式
  • 人工智能
  • Spring
  • Mybatis
  • Maven
  • Git
  • Kafka
  • RabbitMQ
  • RocketMQ
  • Redis
  • Zookeeper
  • Nginx
  • 数据库套件
  • MySQL
  • Elasticsearch
  • MongoDB
  • Hadoop
  • ClickHouse
  • Hbase
  • Hive
  • Flink
  • Flume
  • SQLite
  • linux
  • Docker
  • Jenkins
  • Kubernetes
  • 工具
  • 前端
  • AI
GitHub (opens new window)
  • JAVA
  • 仓颉
  • 设计模式
  • 人工智能
  • Spring
  • Mybatis
  • Maven
  • Git
  • Kafka
  • RabbitMQ
  • RocketMQ
  • Redis
  • Zookeeper
  • Nginx
  • 数据库套件
  • MySQL
  • Elasticsearch
  • MongoDB
  • Hadoop
  • ClickHouse
  • Hbase
  • Hive
  • Flink
  • Flume
  • SQLite
  • linux
  • Docker
  • Jenkins
  • Kubernetes
  • 工具
  • 前端
  • AI
GitHub (opens new window)
  • Spring

    • spring

      • 核心内容拆解 IOC
      • 核心内容拆解 AOP
      • 核心内容拆解 事件通知
      • 核心内容拆解 三级缓存
      • 核心内容拆解 FactoryBean
      • 注解替代Spring生命周期实现类
    • spring mv

      • Spring MVC 之基本工作原理
    • spring boot

      • SpringBoot 之 Filter、Interceptor、Aspect
      • SpringBoot 之 Starter
      • SpringBoot 之 Stomp 使用和 vue 相配置
      • SpringBoot MyBatisPlus 实现多数据源
      • SpringBoot MyBatis 动态建表
      • Spring Boot 集成 Jasypt 3.0.3 配置文件加密
      • Spring Boot 集成 FastDFS
      • Spring Boot VUE前后端加解密
      • Spring Boot logback.xml 配置
      • Spring Boot MinIO
      • Spring Boot kafka
      • Spring Boot WebSocket
        • 网络拓扑图
        • 状态管理
        • 数据发送
          • 数据发送失败
        • 代码
          • 依赖
          • SSHSocketConfig
          • WebSocketInterceptor
          • WebSSHSocketHandler
          • WsSessionManager
        • 前端代码
        • webSocket 性能测试
          • 测试代码
          • 配置
          • 单机性能测试结果
    • spring cloud

      • SpringCloud - Ribbon和Feign
      • SpringCloud alibaba - Nacos
      • SpringCloud alibaba - Sentinel哨兵
      • SpringCloud alibaba - Gateway
      • SpringCloud alibaba - 链路跟踪
      • SpringCloud - 分布式事务一(XA,2PC,3PC)
      • SpringCloud - 分布式事务二(Seata-AT,TCC,Saga)
      • SpringCloud - 分布式事务三(Seata搭建)
      • SpringCloud - 分布式事务四(多数据源事务)
      • SpringCloud - 分布式事务五(微服务间调用的事务处理)
  • Mybatis

    • 核心功能拆解 工作流程
    • 核心功能拆解 Plugin插件功能实现
    • 核心功能拆解 一二级缓存原理
    • MyBatis Plus+Spring Boot 实现一二级缓存以及自定义缓存
  • maven

    • pom 文件介绍及 parent、properties 标签详解
    • dependencies 标签详解
    • 使用 Nexus3.x 搭建私服
  • git

    • 私有 git 仓库搭建
目录

Spring Boot WebSocket

首先要说的是集群和高可用是两码事,集群就是多台服务器同时在工作;高可用是就一台服务器在工作,但崩溃了另一台顶上。对于 websocket、nacos、spring boot、spring cloud gateway 等技术不在这做过多讲解。

# 网络拓扑图

  1. 所有服务注册到 nacos 中
  2. 前端 socket 注册会通过网关轮询到一个 socket 服务器进行 socket 握手,一旦前端刷新,会开一个新的 socket session 注册到 其中一个 socket 服务器。

以上 2 步其实已经实现了最基本的 socket 集群搭建,最重要的是在于客户端和服务端数据的发送以及状态的管理,如谁注册到哪个 socket 服务器,要能找到,否则消息发给谁?

# 状态管理

状态管理可以使用 redis 来进行状态的管理,当用户第一次握手 (上线) 的时候,我们可以把该用户添加到 redis 中,当用户下线的时候,删除 redis 里的用户。

状态管理多用于聊天室业务的用户在线状态,以上流程中是前端发起请求获取状态,这种方式不是最好的,其实我们可以通过 socket 本身的事件,当用户注册 session 时,socket 有特定的事件能接收,接收到后可以获取该用户的其他好友并通知等。

# 数据发送

数据发送主要考虑接收消息的用户在哪个 socket 服务注册的 session,找到该注册用户的 session 有两种方式可以实现:

  1. A 发送消息给 B,消息往每个 socket 服务都发送一份,判断哪个 socket 服务持有 B 的 session 信息(广播)
  2. 通过 Hash 的方式,该方式要求用户向 socket 注册的时候也是以 hash 的方式注册到某个 socket 服务。

第一种方式实现起来最简单,可以使用 Redis 的 pub/sub 方式,socket 服务订阅同一个地址,接收到消息后,判断 socket 服务器缓存中是否有该用户持有的 session 即可。

第二种方式实现起来稍微麻烦一些,我们需要重写网关的请求转发,当然我用的是 spring cloud gateway 和 nacos,本身就提供了 hash 转发的方式,但是就要求我的消息发送时的 hash 计算和 nacos 所提供的 hash 转算法一致,否则会出现问题。如果使用了这种方式,就需要通过服务之间的 RPC 方式来控制 hash 的算法。也可以使用 rabbitmq,该方式会简单并好处多。

# 数据发送失败

如果用以上的 Redis 来做数据的传输是做不到数据不丢失的,Redis 的 pub/sub 是没有 ACK 机制的,不管 socket 服务发送成功失败都会删除该条数据。

我们可以使用 RabbitMQ ,RabbitMQ 是一款消息中间件,提供了生产 ACK 机制和消费 ACK 机制,能保证数据不丢失,当 socket 服务器发送失败的时候,数据还会保留在 RabbitMQ 中。

# 代码

这里只提供 websocket 代码,具体的消息的传输方式请大家自研,思路已经告诉大家了,只要你学过这些东西就一定能搭建出来。

# 依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
1
2
3
4

# SSHSocketConfig

@Configuration
@EnableWebSocket
public class SSHSocketConfig implements WebSocketConfigurer {

    @Resource
    WebSSHSocketHandler webSSHSocketHandler;
    @Resource
    WebSocketInterceptor webSocketInterceptor;

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry webSocketHandlerRegistry) {
        webSocketHandlerRegistry.addHandler(webSSHSocketHandler, "/webSocket")
                .addInterceptors(webSocketInterceptor)
                .setAllowedOrigins("*");
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

# WebSocketInterceptor

@Slf4j
@Component
public class WebSocketInterceptor implements HandshakeInterceptor {

    /**
     * 握手前
     * @param request
     * @param response
     * @param wsHandler
     * @param attributes
     * @return
     * @throws Exception
     */
    @Override
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
        // 获得请求参数
        Map<String, String> paramMap = HttpUtil.decodeParamMap(request.getURI().getQuery(), Charset.forName("utf-8"));
        String uid = paramMap.get("token");
        if (StrUtil.isNotBlank(uid)) {
            attributes.put("token", uid);
            return true;
        }
        log.error("用户登录已失效");
        return false;
    }

    /**
     * 握手后
     *
     * @param request
     * @param response
     * @param wsHandler
     * @param exception
     */
    @Override
    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
        log.debug("握手完成");
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

# WebSSHSocketHandler

@Component
@Slf4j
public class WebSSHSocketHandler extends TextWebSocketHandler {

    private ObjectMapper objectMapper = new ObjectMapper();

    /**
     * socket 建立成功事件
     * @param webSocketSession
     * @throws Exception
     */
    @Override
    public void afterConnectionEstablished(WebSocketSession webSocketSession) throws Exception {
        Object token = webSocketSession.getAttributes().get("token");
        if (token != null) {
            // 用户连接成功,放入在线用户缓存
            WsSessionManager.add(token.toString(), webSocketSession);
            log.debug("用户 account {} 握手成功!",token.toString());
//            WebSocketVO webSocketVO = new WebSocketVO();
//            webSocketVO.setTopic(SocketConst.INDEX_WELCOME.getTopic());
//            webSocketVO.setData(SocketConst.INDEX_WELCOME.getMsg());
//            webSocketSession.sendMessage(new TextMessage(objectMapper.writeValueAsString(webSocketVO)));
        } else {
            throw new RuntimeException("用户登录已经失效!");
        }
    }

    /**
     * 接收消息事件
     * @param webSocketSession
     * @param webSocketMessage
     * @throws Exception
     */
    @Override
    public void handleMessage(WebSocketSession webSocketSession, WebSocketMessage webSocketMessage) throws Exception {
        // 获得客户端传来的消息
        // 消息内容
        String payload = webSocketMessage.getPayload().toString();
        // 检查session
        Object token = webSocketSession.getAttributes().get("token");
    }

    /**
     * socket 断开连接时
     * @param webSocketSession
     * @param closeStatus
     * @throws Exception
     */
    @Override
    public void afterConnectionClosed(WebSocketSession webSocketSession, CloseStatus closeStatus) throws Exception {
        Object token = webSocketSession.getAttributes().get("token");
        if (token != null) {
            // 用户退出,移除缓存
            WsSessionManager.removeAndClose(token.toString());
        }
        log.debug("{} 用户离开",token.toString());
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59

# WsSessionManager

@Slf4j
public class WsSessionManager {


    /**
     * 保存连接 session 的地方
     */
    private static Map<String, WebSocketSession> SESSION_POOL = new ConcurrentHashMap<>();

    /**
     * 添加 session
     * @param key
     */
    public static void add(String key, WebSocketSession session) {
        // 添加 session
        SESSION_POOL.put(key, session);
    }

    /**
     * 删除 session,会返回删除的 session
     *
     * @param key
     * @return
     */
    public static WebSocketSession remove(String key) {
        // 删除 session
        return SESSION_POOL.remove(key);
    }

    /**
     * 删除并同步关闭连接
     *
     * @param key
     */
    public static void removeAndClose(String key) {
        WebSocketSession session = remove(key);
        if (session != null) {
            try {
                // 关闭连接
                session.close();
            } catch (IOException e) {
                // todo: 关闭出现异常处理
                e.printStackTrace();
            }
        }
    }

    /**
     * 获得 session
     * @param key
     * @return
     */
    public static WebSocketSession get(String key) {
        // 获得 session
        return SESSION_POOL.get(key);
    }


    public static void sendToUser(String id,String json) throws IOException {
        WebSocketSession webSocketSession = get(id);
        if(!webSocketSession.isOpen()){
            return;
        }
        synchronized (id) {
            webSocketSession.sendMessage(new TextMessage(json));
        }
    }

    public static void sendToUserGroup(String id,String json) throws IOException {
        String groupKey = id.split("-")[1];
        Set<String> set =  SESSION_POOL.keySet();
        Iterator<String> iterator = set.iterator();
        while (iterator.hasNext()){
            String key = iterator.next();
            if(key.endsWith("-"+groupKey)){
                WebSocketSession webSocketSession = get(key);
                if(!webSocketSession.isOpen()){
                    continue;
                }
                synchronized (key) {
                    webSocketSession.sendMessage(new TextMessage(json));
                }
            }
        }
    }

    public static void sendToAll(String id,String json) throws IOException {
        Set<String> set =  SESSION_POOL.keySet();
        Iterator<String> iterator = set.iterator();
        while (iterator.hasNext()){
            String key = iterator.next();
            WebSocketSession webSocketSession = get(key);
            if(!webSocketSession.isOpen()){
                continue;
            }
            synchronized (key) {
                webSocketSession.sendMessage(new TextMessage(json));
            }
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101

# 前端代码

import { getSocketKey } from '@/utils/auth'

export default{

  debug: false,
  connection: null,

  init(bus){
    let protocol = 'ws://';
    if (window.location.protocol == 'https:') {
      protocol = 'wss://';
    }
    if (!window.WebSocket) {
      //否则报错
      this.console("不支持 socket 连接")
      return null;
    }
    let endpoint = protocol+'127.0.0.1:8080/monitor-socket/webSocket?token='+getSocketKey();
    this.connection = new WebSocket(endpoint);
    // 打开连接
    this.connection.onopen = () => {
      this.console("连接打开成功")
    };
    // 接收消息
    this.connection.onmessage = (evt) => {
      this.console(evt)
      let json = JSON.parse(evt.data);
      bus.emit(json.topic,json);
    };
    // 关闭连接
    this.connection.onclose = (evt) => {
      this.console(evt)
    };
    // 连接错误
    this.connection.onerror = (evt) => {
      this.console(evt)
    };
  },
  send(msg){
    this.connection.send(JSON.stringify(msg));
  },
  close(){
    this.connection.close()
  },
  console(msg){
    if(this.debug){
      console.log(msg)
    }
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50

# webSocket 性能测试

# 测试代码

可以看出来我是给用户组发送消息,并且保证有一个用户连线,否则则是没有意义,我的最主要的是 socket 往外发的能力。

# 配置

属性 值
内存 16G
内核 8
CPU 16
CPU 频率 2.90 GHz

# 单机性能测试结果

测试工具 JMeter

线程 时间 (s) 真实运行时间 (s) 循环次数 异常率 吞吐量
1000 10 9 1 0 100.1/s
1000 5 5 1 0 200.1/s
1000 1 1 1 0 1000.0/s
5000 10 9 1 0 500.2/s
5000 5 5 1 0 1000.0/s
5000 1 1 1 0 4071.7/s
5000 1 1 1 25.54% 3090.2/s

时间的意思是,固定时间内,运行完这些线程,真实运行时间,就是开辟这么多线程实际使用时间。出现异常率后就不需要测试了,同时并发请求 5000 以下问题不大。

上次更新: 6/11/2025, 4:10:30 PM
Spring Boot kafka
SpringCloud - Ribbon和Feign

← Spring Boot kafka SpringCloud - Ribbon和Feign→

Theme by Vdoing | Copyright © 2023-2025
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式