spring-boot实现websocket

服务端

服务端的实现,通常有两种方式:

一种用@ServerEndPoint注解来实现;第二种通过请求参数传递采方式,服务端的参数在拦截器中获取之后通过attributes传递给WebSocketHandler,可以添加拦截器在WebSocket连接建立和断开前进行一些其他操作。

其实第一种也就是前面说过的java封装的一种实现。因为在spring-boot中,需要把类注册到容器,还需要配置:

1
2
3
4
5
6
7
8
9
//自动注册所有@ServerEndpoint注解声明的WebsocketEndpoint
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}

}

@ServerEndpoint实现的时候,可能会出现注入失败的问题,可以通过手动注入的方式来解决。

1
2
ConfigurableApplicationContext applicationContext = new SpringApplication(Application.class).run(args);
WebSocketController.setApplicationContext(applicationContext); // 再在WebSocketController中获取Bean

第二种实现就是和spring差不多,实现WebSocketHandler,或者更有可能扩展TextWebSocketHandler或BinaryWebSocketHandler。

示例:

pom:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<artifactId>SpringBoot-websocket</artifactId>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-parent</artifactId>
<version>2.1.8.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>org.java-websocket</groupId>
<artifactId>Java-WebSocket</artifactId>
<version>1.3.5</version>
</dependency>
</dependencies>
</project>

java:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
//配置类
@Configuration
@EnableWebMvc
@EnableWebSocket
public class WebSocketConfig implements WebMvcConfigurer,WebSocketConfigurer {

@Autowired
private WebSocketController webSocketController;

@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(webSocketController, "/webSocket")
.addInterceptors(new HandShakeInterceptor()).setAllowedOrigins("*");
}

}
//拦截器
public class HandShakeInterceptor extends HttpSessionHandshakeInterceptor {
private final Logger logger = LoggerFactory.getLogger(HandShakeInterceptor.class);

/*
* 在WebSocket连接建立之前的操作,以鉴权为例
*/
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response,
WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
//一些鉴权操作
return true;
}

public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response,
WebSocketHandler wsHandler, Exception ex) {

}

}
//控制器
@RestController
public class WebSocketController implements WebSocketHandler {

private static AtomicInteger onlineCount = new AtomicInteger(0);

private static final ArrayList<WebSocketSession> sessions = new ArrayList<>();

private final Logger LOGGER = LoggerFactory.getLogger(WebSocketController.class);

@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
sessions.add(session);
int onlineNum = onlineCount.incrementAndGet();
LOGGER.info("打开连接,目前连接数: " + onlineNum);
}

@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
sessions.remove(session);
int onlineNum = onlineCount.decrementAndGet();
LOGGER.info("连接关闭: " + onlineNum);
}

@Override
public void handleMessage(WebSocketSession wsSession, WebSocketMessage<?> message) throws Exception {
LOGGER.info("收到的消息: " + message.toString());
}

@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
LOGGER.error("连接错误");
if (session.isOpen()) {
session.close();
}
sessions.remove(session);
onlineCount.decrementAndGet();
}

/*
* 是否支持消息拆分发送:如果接收的数据量比较大,最好打开(true), 否则可能会导致接收失败。
* 如果出现WebSocket连接接收一次数据后就自动断开,可能这里由问题。
*/
@Override
public boolean supportsPartialMessages() {
return true;
}
}

//
@SpringBootApplication
@ComponentScan("com.hu.demo")
public class App {
public static void main(String[] args) {
SpringApplication.run(App.class, args);
}
}

启动

客户端

客户端的实现,通常有html和java WebSocketClient两种方式,html前面已经有,这里用client实现一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class WebSocketClientDemo extends WebSocketClient {

public WebSocketClientDemo(String url) throws URISyntaxException {
super(new URI(url));
}
public WebSocketClientDemo(URI serverUri, Draft protocolDraft) {
super(serverUri, protocolDraft);
}

@Override
public void onOpen(ServerHandshake shake) {
System.out.println("握手...");
for (Iterator<String> it = shake.iterateHttpFields(); it.hasNext(); ) {
String key = it.next();
System.out.println(key + ":" + shake.getFieldValue(key));
}
}

@Override
public void onMessage(String paramString) {
System.out.println("接收到消息:" + paramString);
}

@Override
public void onClose(int paramInt, String paramString, boolean paramBoolean) {
System.out.println("关闭...");
}

@Override
public void onError(Exception e) {
System.out.println("异常" + e);

}

}

测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class ClientDemo {
public static void main(String[] args) throws Exception{
String serverUrl = "ws://127.0.0.1:8080/webSocket";
URI recognizeUri = new URI(serverUrl);
WebSocketClientDemo client = new WebSocketClientDemo(recognizeUri, new Draft_6455());
client.connect();
while (!client.getReadyState().equals(WebSocket.READYSTATE.OPEN)) {
System.out.println("未打开");
}
System.out.println("建立websocket连接");
client.send("hello");
}
}

结果:

1
2
3
4
5
6
7
8
9
10
11
12
未打开
未打开
未打开
未打开
未打开
未打开
握手...
已经连接
Connection:upgrade
Date:Sat, 04 Apr 2020 09:26:08 GMT
Sec-WebSocket-Accept:SIBN8X8GwCVQmjNH1VNNZnr+WY4=
Upgrade:websocket