控制器示例

控制器示例
Controller public class WebSocketController { // 注入消息模板 Autowired private SimpMessagingTemplate messagingTemplate; /** * 处理客户端发送的消息 * 目的地/app/chat */ MessageMapping(/chat) SendTo(/topic/messages) public ChatMessage handleMessage(ChatMessage message) { message.setTimestamp(new Date()); System.out.println(收到消息: message.getContent()); return message; } /** * 发送广播消息 */ GetMapping(/broadcast) public void broadcast(String content) { ChatMessage message new ChatMessage(); message.setContent(content); message.setSender(系统); message.setTimestamp(new Date()); // 发送到 /topic/messages messagingTemplate.convertAndSend(/topic/messages, message); } /** * 发送点对点消息 */ GetMapping(/sendToUser) public void sendToUser(String userId, String content) { ChatMessage message new ChatMessage(); message.setContent(content); message.setSender(管理员); message.setTimestamp(new Date()); // 发送给指定用户/user/{userId}/queue/messages messagingTemplate.convertAndSendToUser( userId, /queue/messages, message ); } } // 消息实体类 Data AllArgsConstructor NoArgsConstructor public class ChatMessage { private String sender; private String content; private Date timestamp; }连接拦截器Component public class WebSocketInterceptor extends ChannelInterceptorAdapter { Override public Message? preSend(Message? message, MessageChannel channel) { StompHeaderAccessor accessor MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); if (StompCommand.CONNECT.equals(accessor.getCommand())) { // 连接建立时处理 String token accessor.getFirstNativeHeader(token); // 验证 token... System.out.println(用户连接: accessor.getSessionId()); } else if (StompCommand.DISCONNECT.equals(accessor.getCommand())) { // 连接断开时处理 System.out.println(用户断开: accessor.getSessionId()); } return message; } }原生 Java WebSocketJSR 356注解方式ServerEndpoint(/chat/{userId}) Component public class ChatEndpoint { // 存储所有连接 private static final MapString, Session sessions new ConcurrentHashMap(); // 存储用户ID和session的映射 private static final MapString, String userSessionMap new ConcurrentHashMap(); /** * 连接建立时调用 */ OnOpen public void onOpen(Session session, PathParam(userId) String userId) { System.out.println(连接建立: session.getId() , 用户: userId); // 保存连接 sessions.put(session.getId(), session); userSessionMap.put(userId, session.getId()); // 通知其他用户有新用户上线 broadcast(系统, 用户 userId 上线了); } /** * 收到消息时调用 */ OnMessage public void onMessage(String message, Session session, PathParam(userId) String userId) { System.out.println(收到消息: message from: userId); try { // 解析消息 JSONObject json new JSONObject(message); String content json.getString(content); String toUserId json.optString(to, null); if (toUserId ! null) { // 私聊消息 sendToUser(userId, toUserId, content); } else { // 群发消息 broadcast(userId, content); } } catch (Exception e) { e.printStackTrace(); } } /** * 连接关闭时调用 */ OnClose public void onClose(Session session, PathParam(userId) String userId) { System.out.println(连接关闭: session.getId()); // 移除连接 sessions.remove(session.getId()); userSessionMap.remove(userId); // 通知其他用户 broadcast(系统, 用户 userId 下线了); } /** * 发生错误时调用 */ OnError public void onError(Session session, Throwable error) { System.out.println(连接错误: session.getId()); error.printStackTrace(); } /** * 广播消息给所有用户 */ private void broadcast(String sender, String content) { JSONObject message new JSONObject(); message.put(sender, sender); message.put(content, content); message.put(timestamp, System.currentTimeMillis()); message.put(type, broadcast); // 发送给所有连接的客户端 for (Session session : sessions.values()) { if (session.isOpen()) { try { session.getAsyncRemote().sendText(message.toString()); } catch (Exception e) { e.printStackTrace(); } } } } /** * 发送私聊消息 */ private void sendToUser(String fromUserId, String toUserId, String content) { String toSessionId userSessionMap.get(toUserId); if (toSessionId ! null) { Session toSession sessions.get(toSessionId); if (toSession ! null toSession.isOpen()) { try { JSONObject message new JSONObject(); message.put(sender, fromUserId); message.put(content, content); message.put(timestamp, System.currentTimeMillis()); message.put(type, private); toSession.getAsyncRemote().sendText(message.toString()); } catch (Exception e) { e.printStackTrace(); } } } } }编程方式继承 Endpoint 类ServerEndpoint(/game) public class GameEndpoint extends Endpoint { private static final SetSession sessions Collections.synchronizedSet(new HashSet()); Override public void onOpen(Session session, EndpointConfig config) { System.out.println(新连接: session.getId()); sessions.add(session); // 添加消息处理器 session.addMessageHandler(new MessageHandler.WholeString() { Override public void onMessage(String message) { System.out.println(收到: message); // 处理游戏逻辑 handleGameMessage(session, message); } }); // 发送欢迎消息 try { JSONObject welcome new JSONObject(); welcome.put(type, welcome); welcome.put(message, 欢迎加入游戏!); welcome.put(sessionId, session.getId()); session.getBasicRemote().sendText(welcome.toString()); } catch (IOException e) { e.printStackTrace(); } } Override public void onClose(Session session, CloseReason closeReason) { System.out.println(连接关闭: session.getId()); sessions.remove(session); // 通知其他玩家 broadcastPlayerLeft(session.getId()); } Override public void onError(Session session, Throwable thr) { System.err.println(连接错误: session.getId()); thr.printStackTrace(); } private void handleGameMessage(Session session, String message) { try { JSONObject json new JSONObject(message); String type json.getString(type); switch (type) { case move: // 处理移动 handlePlayerMove(session, json); break; case chat: // 处理聊天 handleChatMessage(session, json); break; default: System.out.println(未知消息类型: type); } } catch (Exception e) { e.printStackTrace(); } } private void handlePlayerMove(Session session, JSONObject moveData) { // 处理玩家移动逻辑 // 广播给所有玩家 broadcastGameUpdate(moveData); } private void handleChatMessage(Session session, JSONObject chatData) { // 广播聊天消息 JSONObject broadcastMsg new JSONObject(); broadcastMsg.put(type, chat); broadcastMsg.put(sender, session.getId()); broadcastMsg.put(message, chatData.getString(message)); broadcastMsg.put(timestamp, System.currentTimeMillis()); broadcast(broadcastMsg.toString()); } private void broadcast(String message) { synchronized (sessions) { for (Session s : sessions) { if (s.isOpen()) { try { s.getBasicRemote().sendText(message); } catch (IOException e) { e.printStackTrace(); } } } } } }配置文件application.yml 配置spring: websocket: # WebSocket 配置 enabled: true server: # 服务器配置 port: 8080 servlet: context-path: /api # 自定义配置 websocket: max-sessions: 1000 heartbeat-interval: 30000 max-message-size: 128KB心跳检测和连接管理Component public class WebSocketHeartbeat { Autowired private SimpMessagingTemplate messagingTemplate; private ScheduledExecutorService scheduler Executors.newScheduledThreadPool(1); PostConstruct public void init() { // 每30秒发送一次心跳 scheduler.scheduleAtFixedRate(() - { try { messagingTemplate.convertAndSend(/topic/heartbeat, Map.of(timestamp, System.currentTimeMillis(), type, heartbeat)); } catch (Exception e) { e.printStackTrace(); } }, 0, 30, TimeUnit.SECONDS); } PreDestroy public void destroy() { scheduler.shutdown(); } }消息编码器/解码器// 自定义消息编解码器 Component public class ChatMessageConverter implements MessageConverter { Override public Message? toMessage(Object payload, MessageHeaders headers) { if (payload instanceof ChatMessage) { ChatMessage msg (ChatMessage) payload; byte[] bytes serializeMessage(msg); return MessageBuilder.createMessage(bytes, headers); } return null; } Override public Object fromMessage(Message? message, Class? targetClass) { if (targetClass ChatMessage.class) { byte[] bytes (byte[]) message.getPayload(); return deserializeMessage(bytes); } return null; } private byte[] serializeMessage(ChatMessage message) { try { return new ObjectMapper().writeValueAsBytes(message); } catch (Exception e) { throw new RuntimeException(序列化失败, e); } } private ChatMessage deserializeMessage(byte[] bytes) { try { return new ObjectMapper().readValue(bytes, ChatMessage.class); } catch (Exception e) { throw new RuntimeException(反序列化失败, e); }