说说使用websocket遇到的坑,

编程入门 行业动态 更新时间:2024-10-26 03:36:29

说说使用<a href=https://www.elefans.com/category/jswz/34/1771432.html style=websocket遇到的坑,"/>

说说使用websocket遇到的坑,

引言

今天来说说前段时间项目运行过程中遇到问题。原本我们做的只是一个客服系统。连接数量基本被锁定在一百左右(因为客服最多能同时与5个左右的游客对话),但后面单独为一家商户定制了类似IM一对一聊天。突然连接量上来了,然后系统运行一天左右必卡死。重启服务后程序又正常运行。第二天又会出现同样的问题。
下面我会贴上我的代码。因为第一次遇到这个问题,所以查了第一天没查出原因。

说下我都改过那些东西:
1.我怀疑运行内存不够,所以我加大了jvm的运行内存,直接上5个G足够大了吧,但问题还是存在。

2.问了下朋友说我的连接可能导致内存溢出,或是连接估计没有杀死,可能我只是删除了存在map里的对象,而jvm这个连接还存在。所以我加了个异步,删除缓存的同时,在把对象赋值为null。但问题依旧存在。

3.百度相关问题,修改nginx配置、修改系统配置(差点把服务搞坏了),问题还是存在。

4.针对Caused by: java.io.IOException: null异常查询不少资料,也根据部分文章的解决方法做修改设置。发现毫无卵用。

说下这过程中我的发现:
1.为了方便观察我开了个接口实时查询连接数量。发现同时在线连接量最多800,900左右,当程序卡死后连接量 会在200左右(这个时候调接口基本无响应,偶尔人品好会有返回)。

2.查看liunx中程序的socket总连接量10700左右。

3.程序占用内存最高1.5G,系统还有好几G的空闲。并且我已经给程序的jvm参数设置了 -Xms6144m -Xmx6144m的内存。

4.程序会有个一个异常:Caused by: java.io.IOException: null,并且无法彻底解决这个问题。

解决方法:

最开始使用的tomcat的websocket。这个websocket到底有什么问题,我一直没有查出来。有知道的大佬告诉一下小弟。最后我把tomcat的websocket模块改成netty的websocket。问题解决,运行到现在无任何问题。

服务卡死的websocket代码


import com.alibaba.fastjson.JSONObject;
import com.github.pagehelper.util.StringUtil;
import com.mlamon.CacheData;
import com.mlamon.RedisConstant;
import com.mla.config.MyEndpointConfigure;
import com.mla.eunm.IMMsgType;
import com.mla.eunm.IMSendType;
import com.mla.eunm.IMUserType;
import com.mla.model.MessageModel;
import com.mla.model.SendMessage;
import com.mla.model.SessionModel;
import com.mla.pojo.SocketLink;
import com.mla.service.ConsultLogService;
import com.mla.service.SystemConfigServer;
import com.mla.service.UserService;
import com.mla.util.Base64Util;
import com.mla.util.RedisUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.List;
import java.util.Map;/*** @author Jun* 2020年01月09日 15:12:00*/@Slf4j
@Component
@ServerEndpoint(value = "/customer", configurator = MyEndpointConfigure.class)
public class MyWebsocketServer {@Autowiredprivate RedisUtil redisUtil;@Autowiredprivate PushMessageServer pushMessageServer;@Autowiredprivate ConsultLogService consultLogService;@Autowiredprivate UserService userService;@Autowiredprivate SystemConfigServer systemConfigServer;@Autowiredprivate AsyncWebsocketServer asyncWebsocketServer;@OnOpenpublic void onOpen(Session session) {Map<String, List<String>> paramete = session.getRequestParameterMap();try {if (paramete.get("data") == null) {session.close();return;}//解码String data = Base64Util.decode(paramete.get("data").get(0));SocketLink link = JSONObject.parseObject(data, SocketLink.class);if (link == null) {session.close();return;}log.info("新用户连接: {}", link.getUserId());Session sessionOld = CacheData.get(link.getUserId());if (sessionOld != null) {try {//如果存在旧通道,则等待1秒后执行,执行刷新时的close业务没有走结束Thread.sleep(1000);} catch (Exception e) {}}//缓存通道CacheData.add(link.getUserId(), session);//注册用户信息redisUtil.hset(RedisConstant.USERINFO, link.getUserId(), link);//新注册客服推送排队列表if (link.getUserType().equals(IMUserType.customer.getUserType())) {//添加客服队列redisUtil.sSet(RedisConstant.STAFF, link.getUserId());asyncWebsocketServer.openLinkAsync(link, session);} else {//通知客服用户上线MessageModel message = new MessageModel();message.setType(IMSendType.ONLINE.getType());message.setMessage(link.getUid());SessionModel sessionModel = (SessionModel) redisUtil.hget(RedisConstant.SESSION, link.getUid());Session csSession = CacheData.get(sessionModel.getCsId());pushMessageServer.sendText(csSession, message);}} catch (Exception e) {log.error(e.getMessage(), e);try {//出现异常直接关闭连接, 防止无效连接session.close();} catch (IOException e1) {e1.printStackTrace();}}}/*** 客户端关闭** @param session session*/@OnClosepublic void onClose(Session session) {//将掉线的用户移除在线的组里CacheData.del(session);}/*** 收到客户端发来消息** @param message 消息对象*/@OnMessagepublic void onMessage(String message) {try {if (StringUtil.isEmpty(message)) {return;}message = Base64Util.decode(message);SendMessage model = JSONObject.parseObject(message, SendMessage.class);if (model == null) {return;}messageHandle(model);} catch (Exception e) {log.error("处理消息失败:" + message, e);}}/*** 分支处理** @param model*/private void messageHandle(SendMessage model) {//心跳不做任何处理if (model.getMsgType().equals(IMMsgType.HEARTBEAT.getType())) {} else if (model.getMsgType().equals(IMMsgType.MESSAGE.getType())) {log.info("消息推送: {}", model.toString());pushMessageServer.pushMessage(model);}}@OnErrorpublic void onError(Session session, Throwable throwable) {log.error(throwable.getMessage(), throwable);}
}

netty的WebSocket
这只是websocket部分,当然还有netty的部分代码,这里就不贴出来了,可以看到我的业务代码没有做任何修改。

package com.mlaty;import com.alibaba.fastjson.JSONObject;
import com.mlamon.CacheData;
import com.mlamon.RedisConstant;
import com.mla.eunm.IMMsgType;
import com.mla.eunm.IMUserType;
import com.mla.model.SendMessage;
import com.mla.pojo.SocketLink;
import com.mla.socket.AsyncWebsocketServer;
import com.mla.socket.PushMessageServer;
import com.mla.util.Base64Util;
import com.mla.util.RedisUtil;
import ioty.channel.Channel;
import ioty.channel.ChannelHandler;
import ioty.channel.ChannelHandlerContext;
import ioty.channel.SimpleChannelInboundHandler;
import ioty.handler.codec.http.FullHttpRequest;
import ioty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;@Slf4j
@Component
@Qualifier("nettyHandler")
@ChannelHandler.Sharable
public class NettyHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {//所有正在连接的channel都会存在这里面,所以也可以间接代表在线的客户端//public static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);@Autowiredprivate RedisUtil redisUtil;@Autowiredprivate AsyncWebsocketServer asyncWebsocketServer;@Autowiredprivate PushMessageServer pushMessageServer;@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {if (null != msg && msg instanceof FullHttpRequest) {//转化为http请求FullHttpRequest request = (FullHttpRequest) msg;//拿到请求地址String uri = request.uri();//判断是不是websocket请求,如果是拿出我们传递的参数(我的是token)/*String origin = request.headers().get("Origin");if (null == origin) {ctx.close();return;} else {*/Channel channel = ctx.channel();if (null != uri && uri.contains("/customer") && uri.contains("?")) {try {String[] uriArray = uri.split("\\?");if (uriArray == null || uriArray.length == 0) {ctx.channel().close();return;}String param = uriArray[1].substring(5);if (param.length() == 0) {ctx.channel().close();return;}//重新设置请求地址request.setUri("/customer");//解码String data = Base64Util.decode(param);SocketLink link = JSONObject.parseObject(data, SocketLink.class);if (link == null) {ctx.channel().close();return;}log.info("新用户连接: {}", link.getUserId());Channel sessionOld = CacheData.get(link.getUserId());if (sessionOld != null) {try {//如果存在旧通道,则等待1秒后执行,执行刷新时的close业务没有走结束Thread.sleep(1000);} catch (Exception e) {}}//缓存通道CacheData.add(link.getUserId(), ctx.channel());//注册用户信息redisUtil.hset(RedisConstant.USERINFO, link.getUserId(), link);//新注册客服推送排队列表if (link.getUserType().equals(IMUserType.customer.getUserType())) {//添加客服队列redisUtil.sSet(RedisConstant.STAFF, link.getUserId());asyncWebsocketServer.openLinkAsync(link, channel);}} catch (Exception e) {ctx.channel().close();}}/*}*/}//接着建立请求super.channelRead(ctx, msg);}//接收到客户都发送的消息@Overridepublic void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {try {if (msg.text().isEmpty()) {return;}String message = Base64Util.decode(msg.text());SendMessage model = JSONObject.parseObject(message, SendMessage.class);if (model == null) {return;}messageHandle(model);} catch (Exception e) {log.error("处理消息失败:" + msg.text(), e);}}//客户端建立连接@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {log.info("新连接: {}", ctx.channel().id());}//关闭连接@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {CacheData.del(ctx.channel());}//出现异常@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();}/*** 分支处理** @param model*/private void messageHandle(SendMessage model) {//心跳不做任何处理if (model.getMsgType().equals(IMMsgType.HEARTBEAT.getType())) {} else if (model.getMsgType().equals(IMMsgType.MESSAGE.getType())) {log.info("消息推送: {}", model.toString());pushMessageServer.pushMessage(model);}}
}
总结

对于tomcat的websocket所存在的问题,我没有找到解决方案,知道的大佬告诉我一下。
netty的websocket是的好用啊。后续观察
1.同时在线连接平均在700左右。
2.程序运行现在使用内存不到1G。

更多推荐

说说使用websocket遇到的坑,

本文发布于:2024-02-08 21:55:45,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1675496.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:websocket

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!