SpringBoot Websocket Stomp 实现单设备登录(顶号) ①

编程入门 行业动态 更新时间:2024-10-25 23:26:32

SpringBoot Websocket Stomp 实现单<a href=https://www.elefans.com/category/jswz/34/1769313.html style=设备登录(顶号) ①"/>

SpringBoot Websocket Stomp 实现单设备登录(顶号) ①

单设备登录方式直接使用websocket实现比较容易实现,通常自己维护session会话列表管理即可。

当集成spring-messaging的stomp后,它封装的比较封闭,stomp有维护session会话列表,但是外部无法通过正常方式获取到,如果不想自己再维护一个可以尝试通过下面方式实现单设备登录功能。

本案例场景:同账号登录时,存在已在线通同账号,发送一个消息给在线账号告知顶号,然后将连接断开。

Stomp通过消息中继实现消息发送,下面使用SimpleBrokerMessageHandler说明:

配置:


@Slf4j
@EnableWebSocketMessageBroker
@Configuration
@RequiredArgsConstructor
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {@Overridepublic void registerStompEndpoints(StompEndpointRegistry registry) {registry.addEndpoint("/wse").setHandshakeHandler(new MyHandleShakeHandler());
//        .withSockJS(); // 本例不使用sockjs客户端,所以不启用}@Overridepublic void configureMessageBroker(MessageBrokerRegistry registry) {ThreadPoolTaskScheduler pool = new ThreadPoolTaskScheduler();pool.setPoolSize(Runtime.getRuntime().availableProcessors());pool.setThreadNamePrefix("WsHeart");pool.initialize();// 心跳最好配置上,不配置会导致无法感知连接状态,掉了也不知道,// 一方面占用资源,另一方面影响业务功能// 不配置时,由于系统环境等缘故长连接长期无读写操作可能会失效// 注意配置了心跳要配置一个心跳执行线程池registry.enableSimpleBroker("/topic/").setHeartbeatValue(new long[]{1000 * 60, 1000 * 30}) // 心跳读写间隔,.setTaskScheduler(pool);// user点对点通讯时,/user是UserDestinationMessageHandler使用的topic前缀名,// /queue是一个broker消息中继,如果没有消息中继,那么无法最终将消息发出去。// user消息最终也是转为simp消息发送,最终使用SimpleBrokerMessageHandler处理发送消息// 所以如果使用点对点消息,配置消息中继时最好为/user配置一个中继,            // 当然也可以只配置一个中继,都用一个中继如topic,此时convertAndSendToUser时,destination为/topic/xxx, // 很多例子中使用/queue代表用户点对点中继,如果配置了/queue单独使用则就变成了/queue/xxx// 本例没有为用户单独配置一个中继,都是用topic, 实际使用最好分开// 用户订阅普通广播消息为:/topic/xxx, 订阅点对点消息为 /user/topic/xxxregistry.setUserDestinationPrefix("/user/");}// 握手后自定义用户token解析static class MyHandleShakeHandler extends DefaultHandshakeHandler {@Overrideprotected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) {if (log.isDebugEnabled()) {log.debug("request handshake: {} remote: {}, headers: {}", request.getURI(), request.getRemoteAddress(), JsonUtil.toJsonStringQuit(request.getHeaders()));}List<String> tkHeaders = request.getHeaders().get("token");if (Objects.nonNull(tkHeaders) && !tkHeaders.isEmpty()) {String tk = tkHeaders.get(0);JWT jwtAuthToken = MyJwtUtil.parseToken(tk);if (Objects.isNull(jwtAuthToken)) {log.error("handshake token not parsable: {}", tk);} else {return MyJwtUtil.extractJwtUser(jwtAuthToken);}}return super.determineUser(request, wsHandler, attributes);}}
}

单设备检测及消息发送

package com.tom;import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.annotation.DependsOn;
import org.springframework.context.annotation.Lazy;
import org.springframework.context.event.SmartApplicationListener;
import org.springframework.core.Ordered;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.simp.SimpMessageSendingOperations;
import org.springframework.messaging.simp.SimpMessageType;
import org.springframework.messaging.simp.broker.SimpleBrokerMessageHandler;
import org.springframework.messaging.simp.user.SimpSession;
import org.springframework.messaging.simp.user.SimpUser;
import org.springframework.messaging.simp.user.SimpUserRegistry;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
import org.springframework.web.socket.messaging.AbstractSubProtocolEvent;
import org.springframework.web.socket.messaging.SessionConnectEvent;import java.security.Principal;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;@Slf4j
@Component// 当前bean会在simpleBrokerMessageHandler之前被创建,
// 构造函数注入需要特别指明这个被依赖的bean要完成之后再创建本类bean
@DependsOn("simpleBrokerMessageHandler")
@RequiredArgsConstructor
public class WsConnectEventHandler implements SmartApplicationListener {private final SimpUserRegistry simpUserRegistry;private final SimpMessageSendingOperations sendingOperations;private final ScheduledExecutorService se = Executors.newScheduledThreadPool(4);private final Object sessionLock = new Object();private final SimpleBrokerMessageHandler simpleBrokerMessageHandler;@Overridepublic boolean supportsEventType(Class<? extends ApplicationEvent> eventType) {return AbstractSubProtocolEvent.class.isAssignableFrom(eventType);}@Overridepublic void onApplicationEvent(ApplicationEvent event) {AbstractSubProtocolEvent subProtocolEvent = (AbstractSubProtocolEvent) event;if (event instanceof SessionConnectEvent) {Principal user = subProtocolEvent.getUser();if (user == null) {return;}String name = user.getName();synchronized (this.sessionLock) {SimpUser u = simpUserRegistry.getUser(name);if (Objects.nonNull(u)) {Message<?> message = subProtocolEvent.getMessage();MessageHeaders headers = message.getHeaders();String sessionId = SimpMessageHeaderAccessor.getSessionId(headers);Assert.state(sessionId != null, "No session id");Set<SimpSession> sessions = u.getSessions();log.info("User: {} has online sessions: {}", name, sessions.size());for (SimpSession session : sessions) {if (!sessionId.equals(session.getId())) {// /user/topic/kick-outlog.info("kick-out: {}, user: {}", session.getId(), session.getUser());// 此处监听connect事件, 通常情况下新session还未加入到simpSession中,// 所以可以直接给当前用户发消息,不会影响到新session, // 但是由于消息在队列中不会即时发送,可能存在新session加入后,消息被发送,导致新设备也收到该消息,所以不用此方法
//                            sendingOperations.convertAndSendToUser(u.getName(), "/topic/kick-out"
//                                    , "当前帐号已在其他设备登录!"); //// 指定session操作,否则是发给同一个用户下所有session// 注意:一个websocket session对应多个stomp session,此处的session不是WebSocketSessionString sId = session.getId();SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.create();headerAccessor.setSessionId(sId);headerAccessor.setLeaveMutable(true);// 点对点不需要加/user前缀// 第一个参数user既可以是用户name也可以是sessionId, sessionId时只会指定session收到。sendingOperations.convertAndSendToUser(sId, "/topic/kick-out", "当前帐号已在其他设备登录!", headerAccessor.getMessageHeaders());// 延迟断开连接se.schedule(() -> {try {simpleBrokerMessageHandler.handleMessage(createDisconnectMsg(sId));log.info("handle-disconnect: {}, user: {}", sId, session.getUser());} catch (Exception e) {log.error("Handle disconnect error", e);}}, 3, TimeUnit.SECONDS);}}}}}}private Message<?> createDisconnectMsg(String id) {return MessageBuilder.withPayload("").setHeader(SimpMessageHeaderAccessor.SESSION_ID_HEADER, id).setHeader(SimpMessageHeaderAccessor.MESSAGE_TYPE_HEADER, SimpMessageType.DISCONNECT).build();}@Overridepublic int getOrder() {return Ordered.HIGHEST_PRECEDENCE;}
}

测试方法:

package com.tom;import com.alibaba.fastjson.JSONArray;
import com.guangyu.utils.JsonUtil;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.converter.StringMessageConverter;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaders;
import org.springframework.messaging.simp.stomp.StompSession;
import org.springframework.messaging.simp.stomp.StompSessionHandlerAdapter;
import org.springframework.scheduling.concurrent.DefaultManagedTaskScheduler;
import org.springframework.web.socket.WebSocketHttpHeaders;
import org.springframework.web.socket.client.WebSocketClient;
import org.springframework.web.socket.client.standard.StandardWebSocketClient;
import org.springframework.web.socket.messaging.WebSocketStompClient;
import org.springframework.web.socket.sockjs.client.SockJsClient;
import org.springframework.web.socket.sockjs.client.WebSocketTransport;import java.lang.reflect.Type;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;@Slf4j
class StompClientTest {// 启动服务器后,分别运行两个test方法,没对返回消息做处理,可能会报错,正常忽略即可@Testvoid testStandardWebSocket0() throws ExecutionException, InterruptedException {// 标准wsClient,服务端不能使用withSockJs(): The HTTP response from the server [400] did not permit the HTTP upgrade to WebSocket// // ws://localhost:8800/wseStandardWebSocketClient webSocketClient = new StandardWebSocketClient();// user点对点通讯时,/user是UserDestinationMessageHandler使用的topic前缀名,// spring文档中的/queue 是一个broker消息中继,如果没有消息中继,那么无法最终将消息发出去。user消息最终也是转为simp消息发送testWebSocket(webSocketClient, "ws://localhost:8800/wse", "/user/topic/kick-out");}@Testvoid testStandardWebSocket1() throws ExecutionException, InterruptedException {StandardWebSocketClient webSocketClient = new StandardWebSocketClient();testWebSocket(webSocketClient, "ws://localhost:8800/wse", "/user/topic/kick-out");}void testWebSocket(WebSocketClient webSocketClient, String url, String topic) throws InterruptedException, ExecutionException {WebSocketStompClient client = new WebSocketStompClient(webSocketClient);client.setMessageConverter(new CompositeMessageConverter(List.of(new MappingJackson2MessageConverter(),new StringMessageConverter())));client.setTaskScheduler(new DefaultManagedTaskScheduler());client.setDefaultHeartbeat(new long[]{30001, 60001});WebSocketHttpHeaders wsHeaders = new WebSocketHttpHeaders();wsHeaders.add("token", "xxx"); // 业务上自定义的ws连接token标识用户信息StompSession session = client.connect(url, wsHeaders, new StompSessionHandlerAdapter() {@Overridepublic Type getPayloadType(StompHeaders headers) {log.info("getPayloadType {}", JsonUtil.toJsonString(headers));return super.getPayloadType(headers);}@Overridepublic void handleFrame(StompHeaders headers, Object payload) {log.info("handleFrame {}, {}", JsonUtil.toJsonString(headers), payload);}@Overridepublic void afterConnected(StompSession session, StompHeaders connectedHeaders) {log.info("afterConnected {}, {}", session.getSessionId(), JsonUtil.toJsonString(connectedHeaders));}@Overridepublic void handleException(StompSession session, StompCommand command, StompHeaders headers, byte[] payload, Throwable exception) {log.error("handleException sessionId: " + session.getSessionId(), exception);}@Overridepublic void handleTransportError(StompSession session, Throwable exception) {log.info("handleTransportError sessionId: " + session.getSessionId(), exception);}}).get();session.subscribe(topic, new StompSessionHandlerAdapter() {@Overridepublic Type getPayloadType(StompHeaders headers) {log.info("subscribe getPayloadType {}", JsonUtil.toJsonString(headers));return String.class;}@Overridepublic void handleFrame(StompHeaders headers, Object payload) {log.info("subscribe handleFrame {}, {}", JsonUtil.toJsonString(headers), JsonUtil.toJsonString(payload));}@Overridepublic void afterConnected(StompSession session, StompHeaders connectedHeaders) {log.info("subscribe afterConnected {}, {}", session.getSessionId(), JsonUtil.toJsonString(connectedHeaders));}@Overridepublic void handleException(StompSession session, StompCommand command, StompHeaders headers, byte[] payload, Throwable exception) {log.error("subscribe handleException sessionId: " + session.getSessionId(), exception);}@Overridepublic void handleTransportError(StompSession session, Throwable exception) {log.info("subscribe handleTransportError sessionId: " + session.getSessionId(), exception);}});log.info("Connect status: {}", session.isConnected());while (true) {TimeUnit.SECONDS.sleep(10);log.info("Connect status: {}", session.isConnected());}}}

更多推荐

SpringBoot Websocket Stomp 实现单设备登录(顶号) ①

本文发布于:2024-03-09 04:19:09,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1723775.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:设备   SpringBoot   Websocket   Stomp

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!