admin管理员组

文章数量:1582033

MQTT断线重连及订阅消息恢复

注意注意,MQTT重连后需要重新订阅主题才能重新接收到消息

我这里使用的是

//设置断开后重新连接 
options.setAutomaticReconnect(true);
@Override
                public void connectionLost(Throwable throwable) {
                    log.error("连接断开,下面做重连...");
                    long reconnectTimes = 1;
                    while (true) {
                        try {
                            if (mqttClient.isConnected()) {
                                log.warn("mqtt reconnect success end");
                                break;
                            }
                            if(reconnectTimes == 10){
                                //当重连次数达到10次时,就抛出异常,不在重连
                                log.warn("mqtt reconnect error");
                                return;
                            }
                            log.warn("mqtt reconnect times = {} try again...", reconnectTimes++);
                            mqttClient.reconnect();
                        } catch (MqttException e) {
                            log.error("", e);
                        }
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e1) {
//                            e1.printStackTrace();
                        }
                    }
                }

看MQTT的connec的源码发现了一段代码使我找到了解决方案
MqttAsyncClient 的 connect()方法

public IMqttToken connect(MqttConnectOptions options, Object userContext, IMqttActionListener callback) throws MqttException, MqttSecurityException {
		......
		//省略
		......
		comms.setNetworkModules(createNetworkModules(serverURI, options));
		comms.setReconnectCallback(new MqttReconnectCallback(automaticReconnect));

		// Insert our own callback to iterate through the URIs till the connect
		// succeeds
		MqttToken userToken = new MqttToken(getClientId());
		ConnectActionListener connectActionListener = new ConnectActionListener(this, persistence, comms, options,
				userToken, userContext, callback, reconnecting);
		userToken.setActionCallback(connectActionListener);
		userToken.setUserContext(this);

		// If we are using the MqttCallbackExtended, set it on the
		// connectActionListener
		if (this.mqttCallback instanceof MqttCallbackExtended) {
			connectActionListener.setMqttCallbackExtended((MqttCallbackExtended) this.mqttCallback);
		}

		comms.setNetworkModuleIndex(0);
		connectActionListener.connect();

		return userToken;
	}

MqttReconnectCallback 是实现MqttCallbackExtended接口的
发现comms中有设置重连的回调对象
comms.setReconnectCallback(new MqttReconnectCallback(automaticReconnect));
但是怎么把这个回调由我们来主动放进去呢?继续往下看源码可以发现
MqttReconnectCallback对象只是在连接丢失connectionLost的时候进行循环连接
点击startReconnectCycle()最终又会回到
MqttAsyncClient 的 connect()方法

class MqttReconnectCallback implements MqttCallbackExtended {

		final boolean automaticReconnect;

		MqttReconnectCallback(boolean isAutomaticReconnect) {
			automaticReconnect = isAutomaticReconnect;
		}

		public void connectionLost(Throwable cause) {
			if (automaticReconnect) {
				// Automatic reconnect is set so make sure comms is in resting
				// state
				comms.setRestingState(true);
				reconnecting = true;
				startReconnectCycle();
			}
		}

		public void messageArrived(String topic, MqttMessage message) throws Exception {
		}

		public void deliveryComplete(IMqttDeliveryToken token) {
		}

		public void connectComplete(boolean reconnect, String serverURI) {
		}

	}

也就是如果我们在之前放入client的回调对象是实现的 MqttCallbackExtended 接口,则MQTT会将我们的回调对象放入 connectActionListener 中 然后由 connectActionListener实现具体的connect

接下来我们将 MessageCallback 对象改为实现 MqttCallbackExtended这个接口,然后实现下面方法

mqttClient.setCallback(new MqttCallbackExtended () {
                /**
                 * Called when the connection to the server is completed successfully.
                 *
                 * @param reconnect If true, the connection was the result of automatic reconnect.
                 * @param serverURI The server URI that the connection was made to.
                 */
                @Override
                public void connectComplete(boolean reconnect, String serverURI) {
                    try{
                        //如果监测到有,号,说明要订阅多个主题
                        if(mqttTopic.contains(",")){
                            //多主题
                            String[] mqttTopics = mqttTopic.split(",");
                            mqttClient.subscribe(mqttTopics);
                        }else{
                            //单主题
                            mqttClient.subscribe(mqttTopic);
                        }
                        log.info("----TAG", "connectComplete: 订阅主题成功");
                    }catch(Exception e){
                        e.printStackTrace();
                        log.info("----TAG", "error: 订阅主题失败");
                    }
                }

然后可能在同一个环境,比方测试服和本地,创建同ip端口,用户密码clientId一样的客户端,那么2边会占用资源,需要加上异常报错,我的处理方式是连接10次不行就让他掉线,还需要在报错的地方加上处理:

//当创建客户端的时候出现 已断开连接,有可能是在另一个环境下启动了该客户端,直接吧这边的客户端关闭,不然另一边会无限重连
            if(e.getMessage().equals("已断开连接") || e.getMessage().equals("客户机未连接")){
                try {
                    mqttClient.close();
                } catch (MqttException ex) {
                    ex.printStackTrace();
                }
            }

以下是我的开发完整代码,使用了多线程方式创建,

package com.t4cloud.t.sensor.entity;

import com.t4cloud.t.base.redis.topic.entity.RedisMsg;
import com.t4cloud.t.base.utils.RedisTopicUtil;
import com.t4cloud.t.sensor.constant.MqttClientManager;
import com.t4cloud.t.sensor.entity.vo.SensorMqttMsg;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

//MQTT客户端线程
@Slf4j
public class MqttClientThread extends Thread{

    //连接地址
    private String serverURL;
    //MQTT客户端登录用户名
    private String mqttUsername;
    //MQTT客户端密码
    private String mqttPassWord;
    //MQTT订阅主题
    private String mqttTopic;
    //MQTT的client
    private String clientId;
    //产品id
    private String productId;
    //推送至我们自己的RedisTopIc中channel
    private String channel = "mqtt";
    //mqtt实体类
    private MqttClient mqttClient;

    //构造函数
    public MqttClientThread(String serverURL,String mqttUsername,String mqttPassWord,String mqttTopic,String clientId,String productId) {
        this.serverURL = serverURL;
        this.mqttUsername = mqttUsername;
        this.mqttPassWord = mqttPassWord;
        this.mqttTopic = mqttTopic;
        this.clientId = clientId;
        this.productId = productId;
    }

    //线程方法
    public void run(){
        try {
            // host为主机名,clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示,
            // MemoryPersistence设置clientid的保存形式,默认为以内存保存,就用username
            mqttClient = new MqttClient(serverURL, clientId, new MemoryPersistence());
            // 配置参数信息
            MqttConnectOptions options = new MqttConnectOptions();
            // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,
            // 这里设置为true表示每次连接到服务器都以新的身份连接
            options.setCleanSession(true);
            // 设置用户名
            options.setUserName(mqttUsername);
            // 设置密码
            options.setPassword(mqttPassWord.toCharArray());
            // 设置超时时间 单位为秒
            options.setConnectionTimeout(10);
            // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
//            options.setKeepAliveInterval(20);
            //设置断开后重新连接
            options.setAutomaticReconnect(true);
            // 连接
            mqttClient.connect(options);
            // 订阅
            //如果监测到有,号,说明要订阅多个主题
            if(mqttTopic.contains(",")){
                //多主题
                String[] mqttTopics = mqttTopic.split(",");
                mqttClient.subscribe(mqttTopics);
            }else{
                //单主题
                mqttClient.subscribe(mqttTopic);
            }
            // 设置回调
            mqttClient.setCallback(new MqttCallbackExtended () {
                /**
                 * Called when the connection to the server is completed successfully.
                 *
                 * @param reconnect If true, the connection was the result of automatic reconnect.
                 * @param serverURI The server URI that the connection was made to.
                 */
                @Override
                public void connectComplete(boolean reconnect, String serverURI) {
                    try{
                        //如果监测到有,号,说明要订阅多个主题
                        if(mqttTopic.contains(",")){
                            //多主题
                            String[] mqttTopics = mqttTopic.split(",");
                            mqttClient.subscribe(mqttTopics);
                        }else{
                            //单主题
                            mqttClient.subscribe(mqttTopic);
                        }
                        log.info("----TAG", "connectComplete: 订阅主题成功");
                    }catch(Exception e){
                        e.printStackTrace();
                        log.info("----TAG", "error: 订阅主题失败");
                    }
                }


                @Override
                public void connectionLost(Throwable throwable) {
                    log.error("连接断开,下面做重连...");
                    long reconnectTimes = 1;
                    while (true) {
                        try {
                            if (mqttClient.isConnected()) {
                                log.warn("mqtt reconnect success end");
                                break;
                            }
                            if(reconnectTimes == 10){
                                //当重连次数达到10次时,就抛出异常,不在重连
                                log.warn("mqtt reconnect error");
                                return;
                            }
                            log.warn("mqtt reconnect times = {} try again...", reconnectTimes++);
                            mqttClient.reconnect();
                        } catch (MqttException e) {
                            log.error("", e);
                        }
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e1) {
//                            e1.printStackTrace();
                        }
                    }
                }

                @Override
                public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
                    log.info("接收消息主题 : " + topic);
                    log.info("接收消息Qos : " + mqttMessage.getQos());
                    log.info("接收消息内容 : " + new String(mqttMessage.getPayload()));
                    //向我们通道中发送消息
                    RedisMsg redisMsg = new RedisMsg();
                    redisMsg.setChannel(channel);
                    redisMsg.setMsg("推送MQTT消息");
                    SensorMqttMsg mqttMsg = new SensorMqttMsg();
                    mqttMsg.setProductId(productId);
                    mqttMsg.setPayload(new String(mqttMessage.getPayload()));
                    redisMsg.setData(mqttMsg);
                    RedisTopicUtil.sendMessage(channel, redisMsg);
                }

                @Override
                public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                    //认证过程
                    log.info("deliveryComplete.............");
                }
            });
            //放入缓存,根据clinetId吧mqttClient对象放进去
            MqttClientManager.MQTT_CLIENT_MAP.putIfAbsent(clientId, mqttClient);
        } catch (Exception e) {
            e.printStackTrace();
            //当创建客户端的时候出现 已断开连接,有可能是在另一个环境下启动了该客户端,直接吧这边的客户端关闭,不然另一边会无限重连
            if(e.getMessage().equals("已断开连接") || e.getMessage().equals("客户机未连接")){
                try {
                    mqttClient.close();
                } catch (MqttException ex) {
                    ex.printStackTrace();
                }
            }
        }
    }
}

参考原文链接:https://blog.csdn/csdm_admin/article/details/119935243

本文标签: 断线消息MQTT