java客户端请求websocket
Spring boot 导入包
pom.xml 导入
1 2 3 4
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency>
|
客户端调用方法
测试执行方法
connectWebSocket
- 连接websocket 客户端,并携带过期时间等参数
HandshakeMessage
sendHandshake
WebSocketConfig.queue.take
- 队列信息,等待数据返回,并消费,获取websocket 返回的消息数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public String test() { try { WebSocketConfig.connectWebSocket(); HandshakeMessage handshakeMessage = new HandshakeMessage(); handshakeMessage.setMessage("test"); handshakeMessage.setClientId(UUID.randomUUID().toString()); handshakeMessage.setType("handshake"); WebSocketConfig.sendHandshake(handshakeMessage); String take = WebSocketConfig.queue.take(); System.out.println("test:" + take); WebSocketConfig.close(); }catch (InterruptedException ex){ System.out.println("连接异常"+ ex.getMessage()); } return null; }
|
方法对应实体类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| public class HandshakeMessage { private String type; private String clientId; private String message;
public String getType() { return type; }
public void setType(String type) { this.type = type; }
public String getClientId() { return clientId; }
public void setClientId(String clientId) { this.clientId = clientId; }
public String getMessage() { return message; }
public void setMessage(String message) { this.message = message; }
}
|
配置 yaml 资源
1 2
| websocket: url: ws://localhost:8080/websocket
|
WebSocketConfig 配置类
注入配置websocketUrl:
使用@Value注解将websocket.url注入到类的私有成员变量websocketUrl中。确保在类初始化时就设置好静态变量。
LinkedBlockingQueue
LinkedBlockingQueue是Java并发集合框架中的一种线程安全的队列实现,它继承自BlockingQueue接口。LinkedBlockingQueue使用链表结构来存储元素,并且提供了阻塞操作,可以在队列为空或满时自动阻塞生产者或消费者线程,直到队列变为非空或非满。
LinkedBlockingQueue的特点
线程安全性:LinkedBlockingQueue是线程安全的,可以在多线程环境中安全地使用。
阻塞操作:提供了put和take等阻塞方法,当队列满时调用put会阻塞,当队列为空时调用take会阻塞。
容量可配置:LinkedBlockingQueue可以被初始化为一个固定容量的队列,也可以是一个无界队列(默认情况下,如果未指定容量,则容量为Integer.MAX_VALUE)。
connectWebSocket 连接
URI
- URI类可以帮助你处理和解析Web地址,并确保这些地址格式正确。
WebSocketClientHandler
- 继承 WebSocketClient 类 ,实现一些 websocket 方法重写
connectBlocking方法
- connectBlocking方法用于建立WebSocket连接,并且在连接建立之前会阻塞当前线程。这通常用于确保连接完全建立后再继续执行后续操作。connectBlocking方法是org.java_websocket.client.WebSocketClient的一个扩展方法,它允许开发者在连接建立之前等待一段时间。
connectBlocking方法有两个参数:
- timeout:指定连接建立的最大等待时间。
- unit:指定时间单位(如毫秒、秒、分钟等)。
sendMessage
sendMessage方法用于向WebSocket服务器发送文本消息
send 方法
- 检查WebSocket连接状态:
- 如果WebSocket连接尚未打开(!this.isOpen()),则抛出WebsocketNotConnectedException异常。这是因为只有在连接建立后才能发送数据。
- 检查参数有效性:
- 如果传入的frames参数为null,则抛出IllegalArgumentException异常。这是为了确保传入的数据是有效的。
- 准备发送的数据帧:
- 创建一个新的ArrayList来存储即将发送的二进制帧。
- 遍历frames集合中的每一个Framedata对象。
- 对于每一个Framedata对象,调用draft.createBinaryFrame(f)方法将其转换为ByteBuffer,然后添加到outgoingFrames列表中。
- 在遍历过程中,通过日志记录每一步操作的信息(如果启用了日志的trace级别)。
- 发送数据帧:
- 最后,调用write方法,将准备好的outgoingFrames列表作为参数传递进去,完成实际的数据发送操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public void send(String text) { if (text == null) { throw new IllegalArgumentException("Cannot send 'null' data to a WebSocketImpl."); } else { this.send((Collection)this.draft.createFrames(text, this.role == Role.CLIENT)); } } private void send(Collection<Framedata> frames) { if (!this.isOpen()) { throw new WebsocketNotConnectedException(); } else if (frames == null) { throw new IllegalArgumentException(); } else { ArrayList<ByteBuffer> outgoingFrames = new ArrayList(); Iterator var3 = frames.iterator();
while(var3.hasNext()) { Framedata f = (Framedata)var3.next(); this.log.trace("send frame: {}", f); outgoingFrames.add(this.draft.createBinaryFrame(f)); }
this.write((List)outgoingFrames); } }
|
close
关闭连接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
| package com.dog.websocket;
import com.alibaba.fastjson2.JSONObject; import org.java_websocket.client.WebSocketClient; import org.java_websocket.handshake.ServerHandshake; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component;
import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayDeque; import java.util.Queue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit;
@Component public class WebSocketConfig {
private static String websocketUrl;
public static LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(1000);
@Value("${websocket.url}") public void setWebsocketUrl(String websocketUrl){ WebSocketConfig.websocketUrl = websocketUrl; }
private static WebSocketClient client;
public static void connectWebSocket(){ try{ URI uri = new URI(websocketUrl); client = new WebSocketClientHandler(uri); client.connectBlocking(4000, TimeUnit.MINUTES); }catch (URISyntaxException|InterruptedException ex){ ex.printStackTrace(); throw new RuntimeException("websocket 连接异常"); } }
public static void sendMessage(String sendMessage){ client.send(sendMessage); }
public static void sendHandshake(HandshakeMessage handshakeMessage){ String sendMessage = JSONObject.toJSONString(handshakeMessage); System.out.println(sendMessage); client.send(sendMessage); }
public void sendByteMessage(byte[] bytes){ client.send(bytes); }
public static void close(){ if (client != null && client.isOpen()) { client.close(); } } }
|
WebSocketClientHandler 配置类
继承WebSocketClient 并重写了几个关键的方法来处理WebSocket连接的不同生命周期事件
onOpen 方法
1 2 3 4
| @Override public void onOpen(ServerHandshake serverHandshake) { System.out.println("连接websocket 状态:"+ serverHandshake.getHttpStatus()); }
|
当WebSocket连接成功建立时,这个方法会被调用。它打印出连接的状态码。
onMessage 方法
1 2 3 4 5 6 7 8 9 10 11 12 13
| @Override public void onMessage(String s) { System.out.println("message: "+ s); try { if (!queue.offer(s, 10, TimeUnit.SECONDS)) { System.err.println("无法在规定时间内将消息放入队列"); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); System.err.println("向队列中添加消息时被中断"); } }
|
当从WebSocket服务器接收到消息时,这个方法会被调用。它首先打印接收到的消息,然后尝试将消息放入WebSocketConfig.queue队列中。如果在向队列中添加消息时发生中断异常,则恢复中断状态并打印错误信息。
onClose 方法
1 2 3 4
| @Override public void onClose(int i, String s, boolean b) { System.out.println("WebSocket连接已关闭: " + s); }
|
当WebSocket连接关闭时,这个方法会被调用。它打印出关闭连接的原因。
onError 方法
1 2 3 4 5
| @Override public void onError(Exception ex) { ex.printStackTrace(); System.err.println("WebSocket发生错误: " + ex.getMessage()); }
|
当WebSocket连接发生错误时,这个方法会被调用。它打印出错误信息及其堆栈跟踪。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| import org.java_websocket.client.WebSocketClient; import org.java_websocket.handshake.ServerHandshake; import org.springframework.stereotype.Component;
import java.net.URI; import java.util.concurrent.TimeUnit;
import static com.dog.websocket.WebSocketConfig.queue;
public class WebSocketClientHandler extends WebSocketClient {
public WebSocketClientHandler(URI serverUri) { super(serverUri); }
@Override public void onOpen(ServerHandshake serverHandshake) { System.out.println("连接websocket 状态:"+ serverHandshake.getHttpStatus()); }
@Override public void onMessage(String s) { System.out.println("message: "+ s); try { if (!queue.offer(s, 10, TimeUnit.SECONDS)) { System.err.println("无法在规定时间内将消息放入队列"); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); System.err.println("向队列中添加消息时被中断"); } }
@Override public void onClose(int i, String s, boolean b) { System.out.println("WebSocket连接已关闭: " + s); }
@Override public void onError(Exception ex) { ex.printStackTrace(); System.err.println("WebSocket发生错误: " + ex.getMessage()); } }
|
上面只是根据所需要自行调整
java服务端websocket
在上一篇博客已做详细简绍,不做补充
spring boot 项目 跟 JavaScript 简单 websocket 使用
WebSocketConfig 配置类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| package com.ruoyi.common.utils.socket;
import com.ruoyi.common.utils.socket.handler.WebSocketHandler; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.config.annotation.EnableWebSocket; import org.springframework.web.socket.config.annotation.WebSocketConfigurer; import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
@Configuration @EnableWebSocket public class WebSocketConfig implements WebSocketConfigurer {
private final WebSocketHandler webSocketHandler;
public WebSocketConfig(WebSocketHandler webSocketHandler) { this.webSocketHandler = webSocketHandler; }
@Override public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { registry.addHandler(webSocketHandler, "/websocket").setAllowedOrigins("*"); } }
|
WebSocketHandler 监听类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
| package com.ruoyi.common.utils.socket.handler;
import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; import com.fasterxml.jackson.databind.ObjectMapper; import com.ruoyi.common.utils.socket.HandshakeMessage; import org.springframework.stereotype.Component; import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.handler.TextWebSocketHandler;
import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @Component public class WebSocketHandler extends TextWebSocketHandler {
private static final Map<String, WebSocketSession> clientSessions = new ConcurrentHashMap<>(); private static final ObjectMapper objectMapper = new ObjectMapper();
@Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { super.afterConnectionEstablished(session); String sessionId = session.getId(); System.out.println("WebSocket connection established with session ID: " + sessionId); }
@Override protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { String payload = message.getPayload(); HandshakeMessage handshakeMessage = objectMapper.readValue(payload, HandshakeMessage.class); if ("handshake".equals(handshakeMessage.getType())) { String clientId = handshakeMessage.getClientId(); String sessionId = session.getId(); clientSessions.put(clientId, session); handshakeMessage.setMessage("success"); session.sendMessage(new TextMessage(JSON.toJSONString(handshakeMessage))); } }
@Override public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception { super.afterConnectionClosed(session, status); String sessionId = session.getId(); System.out.println("WebSocket connection closed with session ID: " + sessionId); clientSessions.values().removeIf(s -> s.getId().equals(sessionId)); }
public void sendMessageToClient(String clientId, String message) { WebSocketSession session = clientSessions.get(clientId); if (session != null && session.isOpen()) { try { session.sendMessage(new TextMessage(message)); } catch (Exception e) { e.printStackTrace(); } } } }
|