WebSocket零基础极速上手开发指南

编程入门 行业动态 更新时间:2024-10-28 10:21:34

WebSocket零基础<a href=https://www.elefans.com/category/jswz/34/1769291.html style=极速上手开发指南"/>

WebSocket零基础极速上手开发指南

一、WebSocket后台配置

1、添加依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

2、WebSocket配置类 WebSocketConfig.java

package com.service.websocket.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.CorsRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;/*** Websocket 配置*/
@Configuration
@Slf4j
public class WebsocketConfig implements WebMvcConfigurer {//服务器支持跨域@Overridepublic void addCorsMappings(CorsRegistry registry) {registry.addMapping("/**").allowedOrigins("*").allowedMethods("GET", "POST","OPTIONS").allowedHeaders("*").exposedHeaders("Access-Control-Allow-Headers","Access-Control-Allow-Methods","Access-Control-Allow-Origin","Access-Control-Max-Age","X-Frame-Options").allowCredentials(false).maxAge(3600);}/** 注入ServerEndpointExporter,这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint 。* 要注意,如果使用独立的servlet容器,而不是直接使用springboot的内置容器,就不要注入ServerEndpointExporter,* 因为它将由容器自己提供和管理。在Spring中可以直接使用Java WebSocket API来提供服务,如果使用内置的web容器,需要做的仅仅是需要在下面添加* */@Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}}

3、WebSocket服务 Server

package com.service.websocket.server;import cn.gooday.jsh.servicemon.dto.RestControllerResult;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.websocket.EndpointConfig;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import jsh.mg.msg.service.websocket.dto.WebsocketParamsDto;
import jsh.mg.msg.service.websocket.util.HttpUtils;
import jsh.mg.msg.service.websocket.util.RedisUtil;
import lombok.extern.slf4j.Slf4j;
import org.apachemons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/*** websocket服务器*/
@Component
@ServerEndpoint("/socket/{serialNumber}")
@Slf4j
public class WebsocketServer {private static final Logger logger = LoggerFactory.getLogger(WebsocketServer.class);//静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。private static int onlineCount = 0;//concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。若要实现服务端与单一客户端通信的话,可以使用Map来存放,其中Key可以为用户标识public static ConcurrentHashMap<String, WebsocketServer> webSocketMap =new ConcurrentHashMap<String, WebsocketServer>();//与某个客户端的连接会话,需要通过它来给客户端发送数据private Session session;private String callBackUrl;private static String PREX_STRING = "STRING_";private static String PREX_BINARY = "BINARY_";private static long EXPIRE_TIME = 604800; //过期时间7天@Autowiredprivate RedisUtil redisUtil;/*** 连接成功后调用的方法* @param session  可选的参数。session为与某个客户端的连接会话,需要通过它来给客户端发送数据*/@OnOpenpublic void onOpen(@PathParam(value = "serialNumber") String serialNumber, Session session, EndpointConfig config) {this.session = session;String queryString = session.getQueryString();log.info("session.getQueryString()" + queryString);if (StringUtils.isNotBlank(queryString) && queryString.contains("url=")) {this.callBackUrl = queryString.replace("url=", "");}webSocketMap.put(serialNumber, this) ;//在线数加1addOnlineCount();log.info("当前有连接" + serialNumber + "加入!当前在线人数为" + getOnlineCount());}/*** 连接关闭调用方法*/@OnClosepublic void onClose(@PathParam(value = "serialNumber") String serialNumber) {if (StringUtils.isNotBlank(serialNumber)) {log.info("WebsocketServer.onClose() is begin, serialNumber is " + serialNumber);webSocketMap.remove(serialNumber);//在线数减1subOnlineCount();}}/*** 连接异常*/@OnErrorpublic void onError(@PathParam(value = "serialNumber") String serialNumber, Throwable error) {log.info("连接异常: serialNumber is " + serialNumber + ",error is " + error.getMessage());}/*** 接收到客户短消息*/@OnMessagepublic void onMessage(@PathParam(value = "serialNumber") String serialNumber, String message) {try {if (!isOnline(serialNumber)) {return ;}WebsocketServer websocketServer = webSocketMap.get(serialNumber) ;HttpUtils.sendPost(websocketServer.callBackUrl, message);} catch (Exception e) {log.info(e.getMessage());return ;}return ;}public RestControllerResult<Boolean> checkExist(String serialNumber) {RestControllerResult<Boolean> result =  new RestControllerResult<Boolean>();if (!isExist(serialNumber)) {result.setData(false);result.setErrorMsg("请求的链接:" + serialNumber + "不在该服务器上");return result;}result.setData(true);return result;}public RestControllerResult<Boolean> checkOnline(String serialNumber) {RestControllerResult<Boolean> result =  new RestControllerResult<Boolean>();if (!isOnline(serialNumber)) {result.setData(false);result.setErrorMsg("请求的链接:" + serialNumber + "不在线");return result;}result.setData(true);return result;}public RestControllerResult<Boolean> checkOfflineMessage(String serialNumber) {RestControllerResult<Boolean> result =  new RestControllerResult<Boolean>();if (!isOnline(serialNumber)) {result.setData(false);result.setErrorMsg("请求的链接:" + serialNumber + "不在线");return result;}List<String> messageList = (List<String>)redisUtil.get(PREX_STRING + serialNumber);List<ByteBuffer> binaryList = (List<ByteBuffer>)redisUtil.get(PREX_BINARY + serialNumber);if ((null == messageList || messageList.size() < 1) && (null == binaryList || binaryList.size() < 1)) {result.setData(false);} else {result.setData(true);}return result;}public RestControllerResult<Boolean> pullOfflineMessage(String serialNumber, Boolean isAsync, Boolean flag) {RestControllerResult<Boolean> result =  new RestControllerResult<Boolean>();try {if (!isOnline(serialNumber)) {result.setData(false);result.setErrorMsg("请求的链接:" + serialNumber + "不在线");return result;}WebsocketServer websocketServer = webSocketMap.get(serialNumber) ;List<String> messageList = (List<String>)redisUtil.get(PREX_STRING + serialNumber);if (null != messageList && messageList.size() > 0 ) {Boolean isSuccess = false;if (!isAsync) {isSuccess = sendBasicMessage(websocketServer.session, messageList, flag);} else {isSuccess = sendAsyncMessage(websocketServer.session, messageList);}if (isSuccess) {redisUtil.delete(PREX_STRING + serialNumber);}}List<ByteBuffer> binaryList = (List<ByteBuffer>)redisUtil.get(PREX_BINARY + serialNumber);if (null != binaryList && binaryList.size() > 0 ) {Boolean isSuccess = false;if (!isAsync) {isSuccess = sendBasicBinaryMessage(websocketServer.session, binaryList, flag);} else {isSuccess = sendAsyncBinaryMessage(websocketServer.session, binaryList);}if (isSuccess) {redisUtil.delete(PREX_BINARY + serialNumber);}}} catch (IOException e) {log.info(e.getMessage());result.setData(false);result.setErrorMsg(e.getMessage());return result;}return result;}/*** 给所有人发消息*/public RestControllerResult<Boolean>  sendToAll(WebsocketParamsDto paramsDto) {RestControllerResult<Boolean> result =  new RestControllerResult<Boolean>();//遍历HashMapfor (String serialNumber : webSocketMap.keySet()) {try {if (paramsDto.getIsBinary()) {sendBasicBinaryMessage(webSocketMap.get(serialNumber).session, paramsDto.getBinaryData(), paramsDto.getFlag());} else {sendBasicMessage(webSocketMap.get(serialNumber).session, paramsDto.getMessage(), paramsDto.getFlag());}} catch (IOException e) {log.info(e.getMessage());if (paramsDto.getIsBinary()) {redisUtil.addToListRightExpire(PREX_STRING + serialNumber, EXPIRE_TIME, paramsDto.getMessage());} else {redisUtil.addToListRightExpire(PREX_BINARY + serialNumber, EXPIRE_TIME, paramsDto.getBinaryData());}}}result.setData(true);return result;}/*** 给指定的终端发送消息*/public RestControllerResult<Boolean> sendToTerminal(WebsocketParamsDto paramsDto) {log.info("WebsocketServer.sendToSingle() is begin, serialNumber is " + paramsDto.getSerialNumber());RestControllerResult<Boolean> result =  new RestControllerResult<Boolean>();String toSerialNumber = paramsDto.getSerialNumber();try {if(!isOnline(toSerialNumber)) {result.setData(false);result.setErrorMsg("请求的链接:" + toSerialNumber + "不在该服务器上");log.info("请求的链接:" + toSerialNumber + "不在该服务器上");return result;}WebsocketServer websocketServer = webSocketMap.get(toSerialNumber) ;if (paramsDto.getIsBinary()) {sendBasicBinaryMessage(websocketServer.session, paramsDto.getBinaryData(), paramsDto.getFlag());} else {sendBasicMessage(websocketServer.session, paramsDto.getMessage(), paramsDto.getFlag());}result.setData(true);} catch (IOException e) {log.info(e.getMessage());if (paramsDto.getIsBinary()) {redisUtil.addToListRightExpire(PREX_STRING + toSerialNumber, EXPIRE_TIME, paramsDto.getMessage());} else {redisUtil.addToListRightExpire(PREX_BINARY + toSerialNumber, EXPIRE_TIME, paramsDto.getBinaryData());}result.setData(false);result.setErrorMsg("推送消息至链接:" + toSerialNumber + "时系统异常:" + e.getMessage());return result;}return result;}/*** 给所有人发消息*/public RestControllerResult<Boolean> sendToTerminalList(WebsocketParamsDto paramsDto) {RestControllerResult<Boolean> result =  new RestControllerResult<Boolean>();List<String> terminalList = paramsDto.getSerialList();//遍历HashMapfor (String serialNumber : terminalList) {try {if (paramsDto.getIsBinary()) {sendBasicBinaryMessage(webSocketMap.get(serialNumber).session, paramsDto.getBinaryData(), paramsDto.getFlag());} else {sendBasicMessage(webSocketMap.get(serialNumber).session, paramsDto.getMessage(), paramsDto.getFlag());}} catch (IOException e) {log.info(e.getMessage());if (paramsDto.getIsBinary()) {redisUtil.addToListRightExpire(PREX_STRING + serialNumber, EXPIRE_TIME, paramsDto.getMessage());} else {redisUtil.addToListRightExpire(PREX_BINARY + serialNumber, EXPIRE_TIME, paramsDto.getBinaryData());}}}result.setData(true);return result;}/*** 给指定的人发送消息*/public RestControllerResult<Boolean> sendToTerminalAsync(WebsocketParamsDto paramsDto) {RestControllerResult<Boolean> result =  new RestControllerResult<Boolean>();String toSerialNumber = paramsDto.getSerialNumber();try {if(!isOnline(toSerialNumber)) {result.setData(false);result.setErrorMsg("请求的链接:" + toSerialNumber + "不在该服务器上");log.info("请求的链接:" + toSerialNumber + "不在该服务器上");return result;}WebsocketServer websocketServer = webSocketMap.get(toSerialNumber) ;if (paramsDto.getIsBinary()) {sendAsyncBinaryMessage(websocketServer.session, paramsDto.getBinaryData());} else {sendAsyncMessage(websocketServer.session, paramsDto.getMessage());}result.setData(true);} catch (IOException e) {log.info(e.getMessage());if (paramsDto.getIsBinary()) {redisUtil.addToListRightExpire(PREX_STRING + toSerialNumber, EXPIRE_TIME, paramsDto.getMessage());} else {redisUtil.addToListRightExpire(PREX_BINARY + toSerialNumber, EXPIRE_TIME, paramsDto.getBinaryData());}result.setData(false);result.setErrorMsg("推送消息至链接:" + toSerialNumber + "时系统异常:" + e.getMessage());return result;}return result;}/*** 给所有人发消息*/public RestControllerResult<Boolean> sendToTerminalListAsync(WebsocketParamsDto paramsDto) {RestControllerResult<Boolean> result =  new RestControllerResult<Boolean>();List<String> terminalList = paramsDto.getSerialList();for (String serialNumber : terminalList) {try {if (paramsDto.getIsBinary()) {sendAsyncBinaryMessage(webSocketMap.get(serialNumber).session, paramsDto.getBinaryData());} else {sendAsyncMessage(webSocketMap.get(serialNumber).session, paramsDto.getMessage());}} catch (IOException e) {log.info(e.getMessage());if (paramsDto.getIsBinary()) {redisUtil.addToListRightExpire(PREX_STRING + serialNumber, EXPIRE_TIME, paramsDto.getMessage());} else {redisUtil.addToListRightExpire(PREX_BINARY + serialNumber, EXPIRE_TIME, paramsDto.getBinaryData());}}}result.setData(true);return result;}/*** 给所有人发消息*/public RestControllerResult<Boolean>  sendToAllAsync(WebsocketParamsDto paramsDto) {RestControllerResult<Boolean> result =  new RestControllerResult<Boolean>();for (String serialNumber : webSocketMap.keySet()) {try {if (paramsDto.getIsBinary()) {sendAsyncBinaryMessage(webSocketMap.get(serialNumber).session, paramsDto.getBinaryData());} else {sendAsyncMessage(webSocketMap.get(serialNumber).session, paramsDto.getMessage());}} catch (IOException e) {log.info(e.getMessage());if (paramsDto.getIsBinary()) {redisUtil.addToListRightExpire(PREX_STRING + serialNumber, EXPIRE_TIME, paramsDto.getMessage());} else {redisUtil.addToListRightExpire(PREX_BINARY + serialNumber, EXPIRE_TIME, paramsDto.getBinaryData());}}}result.setData(true);logger.info("给所有人发消息结果: {}", result);return result;}public static Map<String, WebsocketServer> getWebSocketMap() {return webSocketMap ;}/*** 同步发送消息模式。* message: 待发送的消息* flag: 是否支持发送部分消息(true:可部分发送消息; false:一次性发布全部消息)*/public void sendBasicMessage(Session session, String message, Boolean flag) throws IOException {log.info("WebsocketServer.sendBasicMessage() is begin, flag is " + flag + " ,message is " + message);session.getBasicRemote().sendText(message, flag);log.info("WebsocketServer.sendBasicMessage() is end. ");}/*** 同步发送消息模式。* message: 待发送的消息* flag: 是否支持发送部分消息(true:可部分发送消息; false:一次性发布全部消息)*/public void sendBasicBinaryMessage(Session session, ByteBuffer message, Boolean flag) throws IOException {log.info("WebsocketServer.sendBasicMessage() is begin, flag is " + flag + " ,message is " + message);session.getBasicRemote().sendBinary(message, flag);log.info("WebsocketServer.sendBasicMessage() is end. ");}/*** 异步发送消息模式。* message: 待发送的消息* flag: 是否支持发送部分消息*/public void sendAsyncMessage(Session session, String message) throws IOException {log.info("WebsocketServer.sendAsyncMessage() is begin, message is " + message);session.getAsyncRemote().sendText(message);log.info("WebsocketServer.sendAsyncMessage() is end. ");}/*** 异步发送消息模式。* message: 待发送的消息* flag: 是否支持发送部分消息*/public void sendAsyncBinaryMessage(Session session, ByteBuffer message) throws IOException {log.info("WebsocketServer.sendAsyncMessage() is begin, message is " + message);session.getAsyncRemote().sendBinary(message);log.info("WebsocketServer.sendAsyncMessage() is end. ");}/*** 同步发送消息模式。* message: 待发送的消息* flag: 是否支持发送部分消息(true:可部分发送消息; false:一次性发布全部消息)*/public Boolean sendBasicMessage(Session session, List<String> messageList, Boolean flag) throws IOException {if (null == messageList || messageList.size() < 1) {return true;}for (int i = 0; i < messageList.size(); i++) {session.getBasicRemote().sendText(messageList.get(i), flag);}return true;}/*** 异步发送消息模式。* message: 待发送的消息* flag: 是否支持发送部分消息*/public Boolean sendAsyncMessage(Session session, List<String> messageList) throws IOException {if (null == messageList || messageList.size() < 1) {return true;}for (int i = 0; i < messageList.size(); i++) {session.getAsyncRemote().sendText(messageList.get(i));}return true;}/*** 同步发送消息模式。* message: 待发送的消息* flag: 是否支持发送部分消息(true:可部分发送消息; false:一次性发布全部消息)*/public Boolean sendBasicBinaryMessage(Session session, List<ByteBuffer> messageList, Boolean flag) throws IOException {if (null == messageList || messageList.size() < 1) {return true;}for (int i = 0; i < messageList.size(); i++) {session.getBasicRemote().sendBinary(messageList.get(i), flag);}return true;}/*** 异步发送消息模式。* message: 待发送的消息* flag: 是否支持发送部分消息*/public Boolean sendAsyncBinaryMessage(Session session, List<ByteBuffer> messageList) throws IOException {if (null == messageList || messageList.size() < 1) {return true;}for (int i = 0; i < messageList.size(); i++) {session.getAsyncRemote().sendBinary(messageList.get(i));}return true;}/*** 获取当前时间** @return*/private String getNowTime() {Date date = new Date();DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");String time = format.format(date);return time;}public static synchronized int getOnlineCount() {return onlineCount;}public static synchronized void addOnlineCount() {WebsocketServer.onlineCount++;}public static synchronized void subOnlineCount() {WebsocketServer.onlineCount--;}public Boolean isExist(String serialNumber) {if(StringUtils.isBlank(serialNumber) || !webSocketMap.containsKey(serialNumber)){return false;}return true;}public Boolean isOnline(String serialNumber) {try {if(StringUtils.isBlank(serialNumber) || !webSocketMap.containsKey(serialNumber)){return false;}WebsocketServer websocketServer = webSocketMap.get(serialNumber) ;if (websocketServer == null ) {return false;}} catch (Exception e){log.error(e.getMessage());return false;}return true;}
}

4、入参对象配置

package com.service.websocket.dto;import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import java.nio.ByteBuffer;
import java.util.List;
import lombok.Data;
import org.w3c.dom.Text;/*** Websocket参数 DTO.** @author *****/
@Data
@ApiModel(value = "WebsocketParamsDto", description = "test")
public class WebsocketParamsDto {@ApiModelProperty(value = "序列号", name = "serialNumber", example = "key.12345")private String serialNumber;@ApiModelProperty(value = "键值", name = "serialList", example = "key.12345")private List<String> serialList;@ApiModelProperty(value = "信息类型:true 异步;false 同步", name = "dataType", example = "1/空:Sring;2-binary;3-text;")private Boolean isBinary;@ApiModelProperty(value = "信息", name = "message", example = "待发送信息")private String message;@ApiModelProperty(value = "二进制数据", name = "binaryData", example = "01010101....")private ByteBuffer binaryData;@ApiModelProperty(value = "Text数据", name = "textData", example = "key.67890")private Text textData;@ApiModelProperty(value = "模式", name = "同步false/异步模式true", example = "true")private Boolean isAsync;@ApiModelProperty(value = "传送标识", name = "flag", example = "true--一次性发送全部信息;false--可部分发送信息;")private Boolean flag;
}

5、业务调用

/*** 给所有人发消息*/public RestControllerResult<Boolean>  sendToAll(WebsocketParamsDto paramsDto) {RestControllerResult<Boolean> result =  new RestControllerResult<Boolean>();//遍历HashMapfor (String serialNumber : webSocketMap.keySet()) {try {if (paramsDto.getIsBinary()) {sendBasicBinaryMessage(webSocketMap.get(serialNumber).session, paramsDto.getBinaryData(), paramsDto.getFlag());} else {sendBasicMessage(webSocketMap.get(serialNumber).session, paramsDto.getMessage(), paramsDto.getFlag());}} catch (IOException e) {log.info(e.getMessage());if (paramsDto.getIsBinary()) {redisUtil.addToListRightExpire(PREX_STRING + serialNumber, EXPIRE_TIME, paramsDto.getMessage());} else {redisUtil.addToListRightExpire(PREX_BINARY + serialNumber, EXPIRE_TIME, paramsDto.getBinaryData());}}}result.setData(true);return result;}
@Resource
private WebsocketServer webSocketServer;@Override
public RestControllerResult<Boolean>sendToAll(@RequestBody WebsocketParamsDto paramsDto) {return webSocketServer.sendToAll(paramsDto);
}

二、Websocket 测试

1、在线测试

测试URL:Websocket在线测试-Websocket接口测试-Websocket模拟请求工具 (jsons)

2、业务调用 WebSocket 服务,发出消息

通过 postman 测试工具调用接口,或者其他工具

3、Websocket 监听端回应情况如下 

更多推荐

WebSocket零基础极速上手开发指南

本文发布于:2024-03-08 00:36:26,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1719375.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:极速   上手   基础   指南   WebSocket

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!