admin管理员组

文章数量:1588981

文章目录

  • 需求目的
  • 需求分析
  • 源码

项目源码

需求目的

这个需求的目的是为了实现一个IM系统,其中IM系统需要提供如下这些功能。

  1. 支持各种类型消息的发送,包括语音、图片、文本消息、链接等消息的发送。
  2. 需要确保在最高并发达到5w的情况下的性能,考虑集群模式部署server端的时候的客户端负载均衡注册。
  3. 需要兼容app1和app2两个端消息的互通,也就是需要多用户端数据同步。
  4. 需要实现消息未读和消息已读功能。

需求分析

首先分析如何实现IM系统。
IM(Instant Messaging)及时通信系统。目前市面上有特别多的IM系统,QQ、微信都是。
而我们的产品也需要实现一个自己的IM通信系统,只不过我们的系统的并发量和用户量并不会比上面两个系统多,大概最多也就只需要承当5kqps。
由于市面上开发IM系统的人很多,而且也已经有非常成熟的源码了,这里我就非常简单的讲解一下实现一个最小的IM系统需要考虑什么和实现什么。
具体的可以参考:IM学习
由于我们的项目是使用Java语言进行开发的,所以很容易就可以想到IM系统可以使用Netty配合websocket的方式来实现在线的双向通信。
按照上面的目的分析,我们可以先简单的分析一下如何实现上面的这些目的。

  1. 定义一个特定的类类型,这个类类型就是消息类,消息类中存储了要发送的消息信息,包括发送人、接收人、时间、消息类型、消息内容等。这里需要考虑到如果是图片、语音、链接,那么消息要如何处理?实际上这些东西也就是一个又一个的文件而已,直接设定一个字段用来存储文件的URL就好了。
    a. 既然考虑到了要存储文件,并且得到URL,那么这里基本就需要用到OSS服务了,这里毋庸置疑,我选Minio,原因是因为我用的很多,比较熟悉。
  2. 其次,现在的系统一般都是集群部署的,因此,我们的服务端启动的时候,需要有一个管理中心,负责管理,并且进行负载均衡。
    a. 我们可以使用Nacos、Zookeeper作为注册和配置中心。这里考虑用Zookeeper了,以为客户端简单。
    b. 当客户端进行注册的时候,去Zk获取服务端的信息,并且进行负载均衡,把客户端挂到负载更小的服务端上。
  3. 要想实现多个设备的消息互通,那么就意味着,用户同一个账号的多个设备的客户端,都需要连接到服务端上,并且服务端需要存储对应用户的所有客户端连接信息,然后把消息转发过去,或者说广播过去。
  4. 消息已读未读比较easy,发送一条消息过去之后,就设定消息为未读,可以考虑把未读的消息和数量存放到Redis等高速缓存中(同时保存到数据库),然后接受者一旦打开页面,前端发送读取消息的请求,把消息从Redis中快速读取出来,并且删除Redis中的数据,并标记数据为已读状态。

源码

按照上面的要求,我们首先需要先实现一些特定的消息类型。
这个设计比较简单,之前我在设计RPC系统的时候也大概讲解过思路。
这个ChatMsg类型的作用就是我们系统的消息类,所有IM系统上发送的消息信息都可以用这个类来表示。

package blossom.project.imty;

import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;

import java.time.LocalDateTime;

@Data
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class ChatMsg {

    private String senderId;            // 发送者的用户id
    private String receiverId;          // 接受者的用户id
    private String msg;                   // 聊天内容
    private Integer msgType;            // 消息类型,见枚举 MsgTypeEnum.java
    private String msgId;

    @JsonDeserialize(using = LocalDateTimeDeserializer.class)
    @JsonSerialize(using = LocalDateTimeSerializer.class)
    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
    private LocalDateTime chatTime;          // 消息的聊天时间,既是发送者的发送时间、又是接受者的接受时间
    private Integer showMsgDateTimeFlag;   // 标记存储数据库,用于历史展示。每超过1分钟,则显示聊天时间,前端可以控制时间长短

    private String videoPath;           // 视频地址
    private Integer videoWidth;             // 视频宽度
    private Integer videoHeight;         // 视频高度
    private Integer videoTimes;             // 视频时间

    private String voicePath;           // 语音地址
    private Integer speakVoiceDuration;       // 语音时长
    private Boolean isRead;                // 语音消息标记是否已读未读,true: 已读,false: 未读

    private Integer communicationType;    // 聊天类型, 1:单聊,2:群聊

    private Boolean isReceiverOnLine;     // 用于标记当前接受者是否在线
}

设定完毕消息类型之后,我们可以先考虑一下如何实现多个设备的消息互通?
很容易就可以想到,存储一下客户端的每一个Channel连接,并且给他们标识一下编号,做好用户账号和Channel连接间的映射关系。
这样子,当同一个用户账号收到消息的时候,我们广播到这个用户账号对应的所有目前客户端连接就好了。
如下类就是这样子的一个管理多设备多端用户Channel的类。

public class MultiChannelManager {

    // 用于多端同时接受消息,允许同一个账号在多个设备同时在线,比如iPad、iPhone、Mac等设备同时收到消息
    // key: userId, value: 多个用户的channel
    private static Map<String, List<Channel>> multiChannel = new HashMap<>();

    // 用于记录用户id和客户端channel长id的关联关系
    private static Map<String, String> userChannelIdRelation = new HashMap<>();

    public static void putUserChannelIdRelation(String channelId, String userId) {
        userChannelIdRelation.put(channelId, userId);
    }
    public static String getUserIdByChannelId(String channelId) {
        return userChannelIdRelation.get(channelId);
    }

    public static void putMultiChannels(String userId, Channel channel) {

        List<Channel> channels = getMultiChannels(userId);
        if (channels == null || channels.size() == 0) {
            channels = new ArrayList<>();
        }
        channels.add(channel);

        multiChannel.put(userId, channels);
    }
    public static List<Channel> getMultiChannels(String userId) {
        return multiChannel.get(userId);
    }

    public static void removeUselessChannels(String userId, String channelId) {

        List<Channel> channels = getMultiChannels(userId);
        if (channels == null || channels.size() == 0) {
            return;
        }

        for (int i = 0 ; i < channels.size() ; i ++) {
            Channel tempChannel = channels.get(i);
            if (tempChannel.id().asLongText().equals(channelId)) {
                channels.remove(i);
            }
        }

        multiChannel.put(userId, channels);
    }

    public static List<Channel> getMyOtherChannels(String userId, String channelId) {
        List<Channel> channels = getMultiChannels(userId);
        if (channels == null || channels.size() == 0) {
            return null;
        }

        List<Channel> myOtherChannels = new ArrayList<>();
        for (int i = 0 ; i < channels.size() ; i ++) {
            Channel tempChannel = channels.get(i);
            if (!tempChannel.id().asLongText().equals(channelId)) {
                myOtherChannels.add(tempChannel);
            }
        }
        return myOtherChannels;
    }

    public static void outputMulti() {
        for (Map.Entry<String, List<Channel>> entry : multiChannel.entrySet()) {
            System.out.println("UserId: " + entry.getKey());
            List<Channel> temp = entry.getValue();
            for (Channel c : temp) {
                System.out.println("\t\t ChannelId: " + c.id().asLongText());
            }
        }
    }

    public static void sendToTarget(List<Channel> receiverChannels, DataContent dataContent) {

        ChannelGroup clients = ChatHandler.clients;

        if (receiverChannels == null) {
            return;
        }

        for (Channel c : receiverChannels) {
            Channel findChannel = clients.find(c.id());
            if (findChannel != null) {
                findChannel.writeAndFlush(
                        new TextWebSocketFrame(
                                JsonUtils.objectToJson(dataContent)));
            }

        }
    }

    public static void sendToMyOthers(List<Channel> myOtherChannels, DataContent dataContent) {

        ChannelGroup clients = ChatHandler.clients;

        if (myOtherChannels == null) {
            return;
        }

        for (Channel c : myOtherChannels) {
            Channel findChannel = clients.find(c.id());
            if (findChannel != null) {
                findChannel.writeAndFlush(
                        new TextWebSocketFrame(
                                JsonUtils.objectToJson(dataContent)));
            }
        }
    }

}

保存完毕多端用户通道之后,我们就可以开始设计一下如何往这些通道发送消息了。
构思一下,我们知道如果要往这些通道发消息,那么首先是通过服务器发送的,也就是一个NettyIM服务器上会保存有多个连接到这个服务器的Channel连接,我们通过特定的手段,例如ID的方式获取到这个Channel连接之后,然后通过IM的服务端发送消息到客户端连接上就好。
所以,这里我们可以顺带把服务端管理的实现给他先完成。
这里依靠Zk的能力。再服务端启动的时候,把服务端信息注册到Zk上。

public class CuratorConfig {

    private static String host = "127.0.0.1:3191";                 // 单机/集群的ip:port地址
    private static Integer connectionTimeoutMs = 30 * 1000;        // 连接超时时间
    private static Integer sessionTimeoutMs = 3 * 1000;            // 会话超时时间
    private static Integer sleepMsBetweenRetry = 2 * 1000;         // 每次重试的间隔时间
    private static Integer maxRetries = 3;                         // 最大重试次数
    private static String namespace = "itzixi-im";                 // 命名空间(root根节点名称)

    private static CuratorFramework client;

    static {
        RetryPolicy backoffRetry = new ExponentialBackoffRetry(sleepMsBetweenRetry, maxRetries);

        // 声明初始化客户端
        client = CuratorFrameworkFactory.builder()
                .connectString(host)
                .connectionTimeoutMs(connectionTimeoutMs)
                .sessionTimeoutMs(sessionTimeoutMs)
                .retryPolicy(backoffRetry)
                .namespace(namespace)
                .build();
        client.start();     // 启动curator客户端
    }

    public static CuratorFramework getClient() {
        return client;
    }

}


public class ZookeeperRegister {

    public static void registerNettyServer(String nodeName,
                                           String ip,
                                           Integer port) throws Exception {
        CuratorFramework zkClient = CuratorConfig.getClient();
        String path = "/" + nodeName;
        Stat stat = zkClient.checkExists().forPath(path);
        if (stat == null) {
            zkClient.create()
                    .creatingParentsIfNeeded()
                    .withMode(CreateMode.PERSISTENT).forPath(path);
        } else {
            System.out.println(stat.toString());
        }

        // 创建对应的临时节点,值可以放在线人数,默认为初始化的0
        NettyServerNode serverNode = new NettyServerNode();
        serverNode.setIp(ip);
        serverNode.setPort(port);
        String nodeJson = JsonUtils.objectToJson(serverNode);

        zkClient.create()
                .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
                .forPath(path + "/im-", nodeJson.getBytes());
    }

    public static String getLocalIp() throws Exception {
        InetAddress addr = InetAddress.getLocalHost();
        String ip=addr.getHostAddress();
        System.out.println("本机IP地址:" + ip);
        return ip;
    }

    /**
     * 增加在线人数
     * @param serverNode
     */
    public static void incrementOnlineCounts(NettyServerNode serverNode) throws Exception {
        dealOnlineCounts(serverNode, 1);
    }

    /**
     * 减少在线人数
     * @param serverNode
     */
    public static void decrementOnlineCounts(NettyServerNode serverNode) throws Exception {
        dealOnlineCounts(serverNode, -1);
    }

    /**
     * 处理在线人数的增减
     * @param serverNode
     * @param counts
     */
    public static void dealOnlineCounts(NettyServerNode serverNode,
                                        Integer counts) throws Exception {

        CuratorFramework zkClient = CuratorConfig.getClient();

        InterProcessReadWriteLock readWriteLock = new InterProcessReadWriteLock(zkClient,
                                                                        "/rw-locks");
        readWriteLock.writeLock().acquire();

        try {

            String path = "/server-list";
            List<String> list = zkClient.getChildren().forPath(path);
            for (String node:list) {
                String pendingNodePath = path + "/" + node;
                String nodeValue = new String(zkClient.getData().forPath(pendingNodePath));
                NettyServerNode pendingNode = JsonUtils.jsonToPojo(nodeValue,
                                                                   NettyServerNode.class);

                // 如果ip和端口匹配,则当前路径的节点则需要累加或者累减
                if (pendingNode.getIp().equals(serverNode.getIp()) &&
                        (pendingNode.getPort().intValue() == serverNode.getPort().intValue())) {
                    pendingNode.setOnlineCounts(pendingNode.getOnlineCounts() + counts);
                    String nodeJson = JsonUtils.objectToJson(pendingNode);
                    zkClient.setData().forPath(pendingNodePath, nodeJson.getBytes());
                }
            }

        } finally {
            readWriteLock.writeLock().release();
        }
    }

    /**
     * 获取负载最轻的服务器
     * @return
     * @throws Exception
     */
    public static NettyServerNode getLeastLoadedServer() throws Exception {
        CuratorFramework zkClient = CuratorConfig.getClient();
        String path = "/server-list";
        List<String> list = zkClient.getChildren().forPath(path);

        NettyServerNode leastLoadedNode = null;
        int minOnlineCounts = Integer.MAX_VALUE;

        for (String node : list) {
            String pendingNodePath = path + "/" + node;
            String nodeValue = new String(zkClient.getData().forPath(pendingNodePath));
            NettyServerNode pendingNode = JsonUtils.jsonToPojo(nodeValue, NettyServerNode.class);

            if (pendingNode.getOnlineCounts() < minOnlineCounts) {
                minOnlineCounts = pendingNode.getOnlineCounts();
                leastLoadedNode = pendingNode;
            }
        }

        return leastLoadedNode;
    }

}
public class JedisPoolUtils {

    private static final JedisPool jedisPool;

    static {
        //配置连接池
        JedisPoolConfig poolConfig = new JedisPoolConfig();
        //最大连接数
        poolConfig.setMaxTotal(10);
        //最大空闲连接
        poolConfig.setMaxIdle(10);
        //最小空闲连接
        poolConfig.setMinIdle(5);
        //最长等待时间,ms
        poolConfig.setMaxWaitMillis(1500);
        //创建连接池对象
        jedisPool = new JedisPool(poolConfig,
                "127.0.0.1",
                5379,
                1000,
                "BlossomIM");
    }

    public static Jedis getJedis(){
        return jedisPool.getResource();
    }

}

然后在我们的服务端启动的时候,执行如下代码即可。

// 注册当前netty服务到zookeeper中
ZookeeperRegister.registerNettyServer("server-list",
                                        ZookeeperRegister.getLocalIp(),
                                        nettyPort);

这里我们知道,Netty作为服务器启动后,会绑定机器上的某一个端口,所以我们的服务器节点保存ip和端口信息即可。
完成了当前nettyim服务器的注册之后,我们就可以开始设计如何广播消息了。
这里我们利用MQ的能力来实现异步。

package blossom.project.imty.mq;

import blossom.project.imty.DataContent;
import blossom.project.imty.websocket.MultiChannelManager;
import blossom.project.im.utils.JsonUtils;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class RabbitMQConnectUtils {

    private final List<Connection> connections = new ArrayList<>();
    private final int maxConnection = 20;

    // 开发环境 dev
    private final String host = "127.0.0.1";
    private final int port = 5682;
    private final String username = "BlossomIM";
    private final String password = "BlossomIM";
    private final String virtualHost = "BlossomIM";

    // 生产环境 prod
    //private final String host = "";
    //private final int port = 5672;
    //private final String username = "123";
    //private final String password = "123";
    //private final String virtualHost = "123";

    public ConnectionFactory factory;

    public ConnectionFactory getRabbitMqConnection() {
        return getFactory();
    }

    public ConnectionFactory getFactory() {
        initFactory();
        return factory;
    }

    private void initFactory() {
        try {
            if (factory == null) {
                factory = new ConnectionFactory();
                factory.setHost(host);
                factory.setPort(port);
                factory.setUsername(username);
                factory.setPassword(password);
                factory.setVirtualHost(virtualHost);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void sendMsg(String message, String queue) throws Exception {
        Connection connection = getConnection();
        Channel channel = connection.createChannel();
        channel.basicPublish("",
                            queue,
                            MessageProperties.PERSISTENT_TEXT_PLAIN,
                            message.getBytes("utf-8"));
        channel.close();
        setConnection(connection);
    }

    public void sendMsg(String message, String exchange, String routingKey) throws Exception {
        Connection connection = getConnection();
        Channel channel = connection.createChannel();
        channel.basicPublish(exchange,
                            routingKey,
                            MessageProperties.PERSISTENT_TEXT_PLAIN,
                            message.getBytes("utf-8"));
        channel.close();
        setConnection(connection);
    }

    public GetResponse basicGet(String queue, boolean autoAck) throws Exception {
        GetResponse getResponse = null;
        Connection connection = getConnection();
        Channel channel = connection.createChannel();
        getResponse = channel.basicGet(queue, autoAck);
        channel.close();
        setConnection(connection);
        return getResponse;
    }

    public Connection getConnection() throws Exception {
        return getAndSetConnection(true, null);
    }

    public void setConnection(Connection connection) throws Exception {
        getAndSetConnection(false, connection);
    }

    public void listen(String fanout_exchange, String queueName) throws Exception {

        Connection connection = getConnection();
        Channel channel = connection.createChannel();

        // FANOUT 发布订阅模式(广播模式)
        channel.exchangeDeclare(fanout_exchange,
                BuiltinExchangeType.FANOUT,
                true, false, false, null);

        channel.queueDeclare(queueName, true, false, false, null);

        channel.queueBind(queueName, fanout_exchange, "");

        Consumer consumer = new DefaultConsumer(channel){
            /**
             * 重写消息配送方法
             * @param consumerTag 消息的标签(标识)
             * @param envelope  信封(一些信息,比如交换机路由等等信息)
             * @param properties 配置信息
             * @param body 收到的消息数据
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {

                String msg = new String(body);
                System.out.println("body = " + msg);

                String exchange = envelope.getExchange();
                System.out.println("exchange = " + exchange);
                if (exchange.equalsIgnoreCase("fanout_exchange")) {
                    DataContent dataContent = JsonUtils.jsonToPojo(msg, DataContent.class);
                    String senderId = dataContent.getChatMsg().getSenderId();
                    String receiverId = dataContent.getChatMsg().getReceiverId();

                    // 广播至集群的其他节点并且发送给用户聊天信息
                    List<ioty.channel.Channel> receiverChannels =
                            MultiChannelManager.getMultiChannels(receiverId);
                    MultiChannelManager.sendToTarget(receiverChannels, dataContent);

                    // 广播至集群的其他节点并且同步给自己其他设备聊天信息
                    String currentChannelId = dataContent.getExtend();
                    List<ioty.channel.Channel> senderChannels =
                            MultiChannelManager.getMyOtherChannels(senderId, currentChannelId);
                    MultiChannelManager.sendToTarget(senderChannels, dataContent);
                }
            }
        };
        /**
         * queue: 监听的队列名
         * autoAck: 是否自动确认,true:告知mq消费者已经消费的确认通知
         * callback: 回调函数,处理监听到的消息
         */
        channel.basicConsume(queueName, true, consumer);
    }

    private synchronized Connection getAndSetConnection(boolean isGet, Connection connection) throws Exception {
        getRabbitMqConnection();

        if (isGet) {
            if (connections.isEmpty()) {
                return factory.newConnection();
            }
            Connection newConnection = connections.get(0);
            connections.remove(0);
            if (newConnection.isOpen()) {
                return newConnection;
            } else {
                return factory.newConnection();
            }
        } else {
            if (connections.size() < maxConnection) {
                connections.add(connection);
            }
            return null;
        }
    }

}

用下面的类来保存消息和广播消息。

public class MessagePublisher {

    // 定义交换机的名字
    public static final String TEST_EXCHANGE = "test_exchange";

    // 定义队列的名字
    public static final String TEST_QUEUE = "test_queue";

    // 发送信息到消息队列接受并且保存到数据库的路由地址
    public static final String ROUTING_KEY_WECHAT_MSG_SEND = "BlossomIM.wechat.wechat.msg.send";


    public static void sendMsgToSave(ChatMsg msg) throws Exception {
        RabbitMQConnectUtils connectUtils = new RabbitMQConnectUtils();
        connectUtils.sendMsg(JsonUtils.objectToJson(msg),
                            TEST_EXCHANGE,
                            ROUTING_KEY_WECHAT_MSG_SEND);
    }

    public static void sendMsgToOtherNettyServer(String msg) throws Exception {
        RabbitMQConnectUtils connectUtils = new RabbitMQConnectUtils();
        String fanout_exchange = "fanout_exchange";
        connectUtils.sendMsg(msg, fanout_exchange, "");
    }
}

并且在服务器启动的时候,监听特定的rabbitmq-queue。

// 启动消费者进行监听,队列可以根据动态生成的端口号进行拼接
String queueName = "netty_queue_" + nettyPort;
RabbitMQConnectUtils mqConnectUtils = new RabbitMQConnectUtils();
mqConnectUtils.listen("fanout_exchange", queueName);

这样子,当我们在消息处理函数中,广播消息的时候,就可以通过上面的mq接口进行广播了。
这个服务器就会收到,然后取出对应的服务器,把消息发送给特定的客户端。
在设计完毕我们的消息类型之后,我们可以开始基于Websocket和Netty实现我们的消息处理类了,也就是假设我们收到了上面的ChatMsg类型的消息,我们会如何进行处理?
这里我们创建如下类型,用来处理入站消息。和RPC的实现方式一样,再这个类中,我们需要考虑到所有的我们希望的对消息的处理操作。

// SimpleChannelInboundHandler: 对于请求来说,相当于入站(入境)
// TextWebSocketFrame: 用于为websocket专门处理的文本数据对象,Frame是数据(消息)的载体
public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame>

具体的消息处理代码如下:

package blossom.project.imty.websocket;

import blossom.project.im.enums.MsgTypeEnum;
import blossom.project.imty.ChatMsg;
import blossom.project.imty.DataContent;
import blossom.project.imty.NettyServerNode;
import blossom.project.imty.utils.JedisPoolUtils;
import blossom.project.imty.utils.ZookeeperRegister;
import blossom.project.im.utils.JsonUtils;
import blossom.project.im.utils.LocalDateUtils;
import blossom.project.im.utils.OkHttpUtil;
import com.a3testponent.idworker.IdWorkerConfigBean;
import com.a3testponent.idworker.Snowflake;
import com.baomidou.mybatisplus.core.toolkit.IdWorker;
import ioty.channel.Channel;
import ioty.channel.ChannelHandlerContext;
import ioty.channel.SimpleChannelInboundHandler;
import ioty.channel.group.ChannelGroup;
import ioty.channel.group.DefaultChannelGroup;
import ioty.handler.codec.http.websocketx.TextWebSocketFrame;
import ioty.util.concurrent.GlobalEventExecutor;

import blossom.project.im.grace.result.GraceJSONResult;
import blossom.project.imty.mq.MessagePublisher;
import redis.clients.jedis.Jedis;


import java.time.LocalDateTime;
import java.util.Objects;

/**
 * 创建自定义助手类
 *
 * @Auther ZhangBlossom
 */
// SimpleChannelInboundHandler: 对于请求来说,相当于入站(入境)
// TextWebSocketFrame: 用于为websocket专门处理的文本数据对象,Frame是数据(消息)的载体
public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    // 用于记录和管理所有客户端的channel组
    public static ChannelGroup clients =
            new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    @Override
    protected void channelRead0(ChannelHandlerContext ctx,
                                TextWebSocketFrame msg) throws Exception {
        // 获得客户端传输过来的消息
        String content = msg.text();
        System.out.println("接受到的数据:" + content);

        // 1. 获取客户端发来的消息并且解析
        DataContent dataContent = JsonUtils.jsonToPojo(content, DataContent.class);
        ChatMsg chatMsg = dataContent.getChatMsg();

        String msgText = chatMsg.getMsg();
        String receiverId = chatMsg.getReceiverId();
        String senderId = chatMsg.getSenderId();


        // 判断是否黑名单 start
        // 如果双方只要有一方是黑名单,则终止发送
        GraceJSONResult result = OkHttpUtil.get("http://127.0.0.1:1000/friendship/isBlack?friendId1st=" + receiverId
                + "&friendId2nd=" + senderId);
        boolean isBlack = (Boolean) result.getData();
        System.out.println("当前的黑名单关系为: " + isBlack);
        if (isBlack) {
            return;
        }
        // 判断是否黑名单 end


        // 时间校准,以服务器的时间为准
        chatMsg.setChatTime(LocalDateTime.now());

        Integer msgType = chatMsg.getMsgType();

        // 获取channel
        Channel currentChannel = ctx.channel();
        String currentChannelId = currentChannel.id().asLongText();
        String currentChannelIdShort = currentChannel.id().asShortText();

        // System.out.println("客户端currentChannelId:" + currentChannelId);
        // System.out.println("客户端currentChannelIdShort:" + currentChannelIdShort);

        // 2. 判断消息类型,根据不同的类型来处理不同的业务
        if (Objects.equals(msgType, MsgTypeEnum.CONNECT_INIT.type)) {
            // 当websocket初次open的时候,初始化channel,把channel和用户userid关联起来
            MultiChannelManager.putMultiChannels(senderId, currentChannel);
            MultiChannelManager.putUserChannelIdRelation(currentChannelId, senderId);

            NettyServerNode minNode = dataContent.getServerNode();
            // System.out.println(minNode);
            // 初次连接后,该节点下的在线人数累加
            ZookeeperRegister.incrementOnlineCounts(minNode);

            // 获得ip+端口,在redis中设置关系,以便在前端设备断线后减少在线人数
            Jedis jedis = JedisPoolUtils.getJedis();
            jedis.set(senderId, JsonUtils.objectToJson(minNode));

        } else if (Objects.equals(msgType, MsgTypeEnum.WORDS.type)
                || Objects.equals(msgType, MsgTypeEnum.IMAGE.type)
                || Objects.equals(msgType, MsgTypeEnum.VIDEO.type)
                || Objects.equals(msgType, MsgTypeEnum.VOICE.type)
        ) {

            // 此处为mq异步解耦,保存信息到数据库,数据库无法获得信息的主键id,
            // 所以此处可以用snowflake直接生成唯一的主键id
            Snowflake snowflake = new Snowflake(new IdWorkerConfigBean());
            String sid = snowflake.nextId();
            System.out.println("sid = " + sid);

            String iid = IdWorker.getIdStr();
            System.out.println("iid = " + iid);

            chatMsg.setMsgId(sid);

            // 此处receiverId所对应的channel为空
            // 发送消息
            // List<Channel> receiverChannels = MultiChannelManager.getMultiChannels(receiverId);
            // if (receiverChannels == null || receiverChannels.size() == 0 || receiverChannels.isEmpty()) {
            // receiverChannels为空,表示用户离线/断线状态,消息不需要发送,后续可以存储到数据库
            // chatMsg.setIsReceiverOnLine(false);
            // } else {
            //     chatMsg.setIsReceiverOnLine(true);

            if (Objects.equals(msgType, MsgTypeEnum.VOICE.type)) {
                chatMsg.setIsRead(false);
            }
            dataContent.setChatMsg(chatMsg);
            String chatTimeFormat = LocalDateUtils
                    .format(chatMsg.getChatTime(),
                            LocalDateUtils.DATETIME_PATTERN_2);
            dataContent.setChatTime(chatTimeFormat);
            // MultiChannelManager.sendToTarget(receiverChannels, dataContent);
            MessagePublisher.sendMsgToOtherNettyServer(JsonUtils.objectToJson(dataContent));

            // 当receiverChannels为空不为空的时候,同账户多端设备接受消息
            // for (Channel c : receiverChannels) {
            //     Channel findChannel = clients.find(c.id());
            //     if (findChannel != null) {
            //
            //         // if (msgType == MsgTypeEnum.VOICE.type) {
            //         //     chatMsg.setIsRead(false);
            //         // }
            //         // dataContent.setChatMsg(chatMsg);
            //         // String chatTimeFormat = LocalDateUtils
            //         //         .format(chatMsg.getChatTime(),
            //         //                 LocalDateUtils.DATETIME_PATTERN_2);
            //         // dataContent.setChatTime(chatTimeFormat);
            //         // 发送消息给在线的用户
            //         findChannel.writeAndFlush(
            //                 new TextWebSocketFrame(
            //                         JsonUtils.objectToJson(dataContent)));
            //     }
            //
            // }
            // }

            // 把聊天信息作为mq的消息发送给消费者进行消费处理(保存到数据库)
            MessagePublisher.sendMsgToSave(chatMsg);
        }

        // 此处也不需要了,都在mq的监听中完成
        // dataContent.setChatMsg(chatMsg);
        // String chatTimeFormat = LocalDateUtils
        //         .format(chatMsg.getChatTime(),
        //                 LocalDateUtils.DATETIME_PATTERN_2);
        // dataContent.setChatTime(chatTimeFormat);
        // dataContent.setExtend(currentChannelId);
        //
        // List<Channel> myOtherChannels = MultiChannelManager
        //                 .getMyOtherChannels(senderId, currentChannelId);
        // MultiChannelManager.sendToMyOthers(myOtherChannels, dataContent);

        // for (Channel c : myOtherChannels) {
        //     Channel findChannel = clients.find(c.id());
        //     if (findChannel != null) {
        //         // dataContent.setChatMsg(chatMsg);
        //         // String chatTimeFormat = LocalDateUtils
        //         //         .format(chatMsg.getChatTime(),
        //         //                 LocalDateUtils.DATETIME_PATTERN_2);
        //         // dataContent.setChatTime(chatTimeFormat);
        //         // 同步消息给在线的其他设备端
        //         findChannel.writeAndFlush(
        //                 new TextWebSocketFrame(
        //                         JsonUtils.objectToJson(dataContent)));
        //     }
        // }


        // currentChannel.writeAndFlush(new TextWebSocketFrame(currentChannelId));


        // clients.writeAndFlush(new TextWebSocketFrame(currentChannelId));

        MultiChannelManager.outputMulti();
    }

    /**
     * 客户端连接到服务端之后(打开链接)
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        Channel currentChannel = ctx.channel();
        String currentChannelId = currentChannel.id().asLongText();
        System.out.println("客户端建立连接,channel对应的长id为:" + currentChannelId);

        // 获得客户端的channel,并且存入到ChannelGroup中进行管理(作为一个客户端群组)
        clients.add(currentChannel);
    }

    /**
     * 关闭连接,移除channel
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        Channel currentChannel = ctx.channel();
        String currentChannelId = currentChannel.id().asLongText();
        System.out.println("客户端关闭连接,channel对应的长id为:" + currentChannelId);

        // 移除多余的会话
        String userId = MultiChannelManager.getUserIdByChannelId(currentChannelId);
        MultiChannelManager.removeUselessChannels(userId, currentChannelId);

        clients.remove(currentChannel);

        // zk中在线人数累减
        Jedis jedis = JedisPoolUtils.getJedis();
        NettyServerNode minNode = JsonUtils.jsonToPojo(jedis.get(userId),
                NettyServerNode.class);
        ZookeeperRegister.decrementOnlineCounts(minNode);
    }

    /**
     * 发生异常并且捕获,移除channel
     *
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        Channel currentChannel = ctx.channel();
        String currentChannelId = currentChannel.id().asLongText();
        System.out.println("发生异常捕获,channel对应的长id为:" + currentChannelId);

        // 发生异常之后关闭连接(关闭channel)
        ctx.channel().close();
        // 随后从ChannelGroup中移除对应的channel
        clients.remove(currentChannel);

        // 移除多余的会话
        String userId = MultiChannelManager.getUserIdByChannelId(currentChannelId);
        MultiChannelManager.removeUselessChannels(userId, currentChannelId);

        // zk中在线人数累减
        Jedis jedis = JedisPoolUtils.getJedis();
        NettyServerNode minNode = JsonUtils.jsonToPojo(jedis.get(userId),
                NettyServerNode.class);
        ZookeeperRegister.decrementOnlineCounts(minNode);
    }

}

到此为止,其实我们的核心代码基本就完成了,这个时候,只要编写一个客户端代码,连接到对应的服务端,然后发送消息就好了。就能实现数据的交互。
接下来是main函数。

package blossom.project.imty;

import blossom.project.imty.websocket.WSServerInitializer;
import ioty.bootstrap.ServerBootstrap;
import ioty.channel.ChannelFuture;
import ioty.channel.EventLoopGroup;
import ioty.channel.nio.NioEventLoopGroup;
import ioty.channel.socket.nio.NioServerSocketChannel;
import org.apachemons.lang3.StringUtils;
import blossom.project.imty.mq.RabbitMQConnectUtils;
import blossom.project.imty.utils.JedisPoolUtils;
import blossom.project.imty.utils.ZookeeperRegister;
import redis.clients.jedis.Jedis;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

/**
 * ChatServer: Netty 服务的启动类(服务器)
 * @Auther ZhangBlossom
 */
public class ChatServer {

    public static final Integer nettyDefaultPort = 875;
    public static final String initOnlineCounts = "0";

    /*
     * FIXME: 优化方案,改成zookeeper方案,
     *        如此可以不需要在中断连接后,监听并且清理在线人数和端口,
     *        因为netty与zk建立的临时节点,中断连接后,会自动删除该临时节点
     */
    public static Integer selectPort(Integer port) {
        String portKey = "netty_port";
        Jedis jedis = JedisPoolUtils.getJedis();

        jedis.set("jedis-test", "hello world");

        Map<String, String> portMap = jedis.hgetAll(portKey);
        System.out.println(portMap);
        // 由于map中的key都应该是整数类型的port,所以先转换成整数后,再比对,否则string类型的比对会有问题
        List<Integer> portList = portMap.entrySet().stream()
                .map(entry -> Integer.valueOf(entry.getKey()))
                .collect(Collectors.toList());
        // step1: 编码到此处先运行测试看一下结果
        System.out.println(portList);

        Integer nettyPort = null;
        if (portList == null || portList.isEmpty()) {
            // step2: 编码到此处先运行测试看一下结果
            jedis.hset(portKey, port+"", initOnlineCounts);
            nettyPort = port;
        } else {
            // 循环portList,获得最大值,并且累加10
            Optional<Integer> maxInteger = portList.stream().max(Integer::compareTo);
            Integer maxPort = maxInteger.get().intValue();
            Integer currentPort = maxPort + 10;
            jedis.hset(portKey, currentPort+"", initOnlineCounts);
            nettyPort = currentPort;
        }
        // step3: 编码到此处先运行测试看一下最终结果
        return nettyPort;

        /**
         * TODO: ChatServer停止的时候,需要删除在redis中对应的端口。
         *       一旦断开,则会触发zk的节点删除事件,在那边删除即可。写到springboot服务中即可
         */


        // TODO: 客户端负载均衡,最小连接数,查询redis中最少在线人数的,并且让前端建立ws连接
        // TODO: 用户创建连接,会话管理那边,则对应netty服务的在线人数累+1
        // TODO: 用户断开连接,会话管理那边,则对应netty服务的在线人数累-1
    }

    public static void removePort(Integer port) {
        String portKey = "netty_port";
        Jedis jedis = JedisPoolUtils.getJedis();
        jedis.hdel(portKey, port+"");
    }

    public static void main(String[] args) throws Exception {

        // 定义主从线程组
        // 定义主线程池,用于接受客户端的连接,但是不做任何处理,比如老板会谈业务,拉到业务就会交给下面的员工去做了
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        // 定义从线程池,处理主线程池交过来的任务,公司业务员开展业务,完成老板交代的任务
        EventLoopGroup workerGroup = new NioEventLoopGroup();


        // Netty服务启动的时候,从redis中查找有没有端口,如果没有则用875,如果有则把端口累加1(或10)再启动
        Integer nettyPort = selectPort(nettyDefaultPort);



        // 注册当前netty服务到zookeeper中
        ZookeeperRegister.registerNettyServer("server-list",
                                                ZookeeperRegister.getLocalIp(),
                                                nettyPort);



        // 启动消费者进行监听,队列可以根据动态生成的端口号进行拼接
        String queueName = "netty_queue_" + nettyPort;
        RabbitMQConnectUtils mqConnectUtils = new RabbitMQConnectUtils();
        mqConnectUtils.listen("fanout_exchange", queueName);




        try {
            // 构建Netty服务器
            ServerBootstrap server = new ServerBootstrap();     // 服务的启动类
            server.group(bossGroup, workerGroup)                // 把主从线程池组放入到启动类中
                    .channel(NioServerSocketChannel.class)      // 设置Nio的双向通道
                    .childHandler(new WSServerInitializer());   // 设置处理器,用于处理workerGroup

            // 启动server,并且绑定端口号875,同时启动方式为"同步"
            ChannelFuture channelFuture = server.bind(nettyPort).sync();
            // 请求:http://127.0.0.1:875

            // 监听关闭的channel
            channelFuture.channel().closeFuture().sync();
        } finally {
            // 优雅的关闭线程池组
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();

            // 移除现有的redis与netty的端口关系
            removePort(nettyPort);
        }
    }

}

其实到此为止,一个简单的IM系统就开发完毕了。
而Client的代码,可以参考如下进行实现:

import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;

import java.URI;
import java.URISyntaxException;
import java.util.Scanner;

public class SimpleIMClient extends WebSocketClient {

    public SimpleIMClient(URI serverUri) {
        super(serverUri);
    }

    @Override
    public void onOpen(ServerHandshake handshakedata) {
        System.out.println("Connected to server");
    }

    @Override
    public void onMessage(String message) {
        System.out.println("Received message: " + message);
    }

    @Override
    public void onClose(int code, String reason, boolean remote) {
        System.out.println("Disconnected from server with exit code " + code + " additional info: " + reason);
    }

    @Override
    public void onError(Exception ex) {
        ex.printStackTrace();
    }

    public static void main(String[] args) throws URISyntaxException, InterruptedException {
        // 获取负载最小的服务器节点
        NettyServerNode leastLoadedServer = ZookeeperRegister.getLeastLoadedServer();
        String serverUri = "ws://" + leastLoadedServer.getIp() + ":" + leastLoadedServer.getPort() + "/ws";

        // 连接到服务器
        SimpleIMClient client = new SimpleIMClient(new URI(serverUri));
        client.connectBlocking();

        // 创建一个新的线程来读取用户输入并发送消息
        new Thread(() -> {
            Scanner scanner = new Scanner(System.in);
            while (true) {
                String message = scanner.nextLine();
                client.send(message);
            }
        }).start();
    }
}

本文标签: 简单系统BlossomIMim