Android使用MQTT订阅及发布消息((一)初步了解Mqtt以及实现Android操作mqtt服务)

编程入门 行业动态 更新时间:2024-10-25 18:30:00

Android使用MQTT订阅及发布<a href=https://www.elefans.com/category/jswz/34/1771421.html style=消息((一)初步了解Mqtt以及实现Android操作mqtt服务)"/>

Android使用MQTT订阅及发布消息((一)初步了解Mqtt以及实现Android操作mqtt服务)

Android使用MQTT订阅及发布消息((一)初步了解Mqtt以及实现Android操作mqtt服务)

  • 关于
  • MQTT介绍
    • MQTT协议实现方式
    • MQTT服务器
    • MQTT协议中的订阅、主题、会话
    • MQTT协议中的方法
    • MQTT服务质量 (QoS)
    • MQTT服务端(Broker)
    • MQTT客户端
  • mqtt服务选取
  • Android连接mqtt
    • 配置
    • 修改mainActivity文件
    • 订阅 topic
    • 取消订阅 topic
    • 发布消息publish
    • 断开 MQTT 连接

关于

  可能有很多小伙伴和我一样是初次知道mqtt,然后它是啥,用来干什么那就更不清楚了,前段时间公司要求调研这方面,所以今天这篇文章就来介绍mqtt是啥,以及Android可以用它来干啥。

MQTT介绍

MQTT协议实现方式

  实现MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。
MQTT传输的消息分为:主题(Topic)和负载(payload)两部分:

  • Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload);
  • payload,可以理解为消息的内容,是指订阅者具体要使用的内容。
    MQTT会构建底层网络传输:它将建立客户端到服务器的连接,提供两者之间的一个有序的、无损的、基于字节流的双向传输。
    当应用数据通过MQTT网络发送时,MQTT会把与之相关的服务质量(QoS)和主题名(Topic)相关连。

MQTT服务器

  MQTT服务器以称为"消息代理"(Broker),可以是一个应用程序或一台设备。它是位于消息发布者和订阅者之间,它可以:

  • (1)接受来自客户的网络连接;
  • (2)接受客户发布的应用信息;
  • (3)处理来自客户端的订阅和退订请求;
  • (4)向订阅的客户转发应用程序消息。

MQTT协议中的订阅、主题、会话

一、订阅(Subscription)
  订阅包含主题筛选器(Topic Filter)和最大服务质量(QoS)。订阅会与一个会话(Session)关联。一个会话可以包含多个订阅。每一个会话中的每个订阅都有一个不同的主题筛选器。
二、会话(Session)
  每个客户端与服务器建立连接后就是一个会话,客户端和服务器之间有状态交互。会话存在于一个网络之间,也可能在客户端和服务器之间跨越多个连续的网络连接。
三、主题名(Topic Name)
  连接到一个应用程序消息的标签,该标签与服务器的订阅相匹配。服务器会将消息发送给订阅所匹配标签的每个客户端。
四、主题筛选器(Topic Filter)
  一个对主题名通配符筛选器,在订阅表达式中使用,表示订阅所匹配到的多个主题。
五、负载(Payload)
  消息订阅者所具体接收的内容。

MQTT协议中的方法

  MQTT协议中定义了一些方法(也被称为动作),来于表示对确定资源所进行操作。这个资源可以代表预先存在的数据或动态生成数据,这取决于服务器的实现。通常来说,资源指服务器上的文件或输出。主要方法有:

  • (1)Connect。等待与服务器建立连接。
  • (2)Disconnect。等待MQTT客户端完成所做的工作,并与服务器断开TCP/IP会话。
  • (3)Subscribe。等待完成订阅。
  • (4)UnSubscribe。等待服务器取消客户端的一个或多个topics订阅。
  • (5)Publish。MQTT客户端发送消息请求,发送完成后返回应用程序线程。

MQTT服务质量 (QoS)

  qos为0"至多一次",消息发布完全依赖底层TCP/IP网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。这一种方式主要普通APP的推送,倘若你的智能设备在消息推送时未联网,推送过去没收到,再次联网也就收不到了。
qos为1"至少一次",确保消息到达,但消息重复可能会发生。
qos为2"只有一次",确保消息到达一次。在一些要求比较严格的计费系统中,可以使用此级别。在计费系统中,消息重复或丢失会导致不正确的结果。这种最高质量的消息发布服务还可以用于即时通讯类的APP的推送,确保用户收到且只会收到一次。
可以在订阅/发布消息的时候设置服务质量。

MQTT服务端(Broker)

  网上有开源的服务端项目代码可以在电脑上进行搭建,也可以试用一些供应商的服务器。

MQTT客户端

经调研,Android开发mqtt客户端主流使用的是eclipse提供的paho.mqtt,项目引用:

implementation'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5'
implementation'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'

mqtt服务选取

  因为是调研,所以就不考虑自己去打一个mqtt的服务,网上搜能搜到很多搭建的mqtt,这里我是用了一个三方可以免费试用14天的服务方EMQ,使用前需要注册一个账号,如果你有github账号的话可以直接提供使用。
然后我们选中试用的服务(基础版和专业版都可以试用14天,这边建议试用专业版的),我因为之前已经试用过了专业版,所以现在只能试用基础版的了,专业版的好处就是提供的ip:

  然后我们等待它自动部署好项目即可:

  完成之后我们需要添加一个认证用户,这个认证用户用来连接时候判断身份用的:


  认证的用户及密码需要记一下。

Android连接mqtt

配置

  首先要在项目的build文件里添加如下引用:

implementation'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5'
implementation'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'

其实这里有个问题,target为Android 12的(31),需要去做很多修改,包括去掉引用,当然这部分我准备放到第二篇里面来讲如何适配Android12版本手机实现mqtt使用。
  修改Androidmanifest.xml文件,添加权限和mqttservice的的注册:

<uses-permission android:name="android.permission.INTERNET" />
<uses-permission android:name="android.permission.WAKE_LOCK" />
<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" /><service android:name="org.eclipse.paho.android.service.MqttService" />

修改mainActivity文件

private val TAG = "MqttClient"
private lateinit var mqttClient: MqttAndroidClient
override fun onCreate(savedInstanceState: Bundle?) {//....val serverURI = "tcp://pc1c6e1c-shenzhen.emqx.cloud:11838"mqttClient = MqttAndroidClient(this, serverURI, "kotlin_mqtt_test1") //"kotlin_mqtt_test1"是作为连接客户端的名称来使用,所以要注意避免重复
}
fun connect() {mqttClient.setCallback(object : MqttCallback {override fun messageArrived(topic: String?, message: MqttMessage?) {Log.d(TAG, "Receive message: ${message.toString()} from topic: $topic")}override fun connectionLost(cause: Throwable?) {Log.d(TAG, "Connection lost ${cause.toString()}")}override fun deliveryComplete(token: IMqttDeliveryToken?) {}})val options = MqttConnectOptions()options.apply {userName = "tobeyr1"this.password = "1234".toCharArray()connectionTimeout = 12this.keepAliveInterval = 0this.isAutomaticReconnect = falsethis.isCleanSession = true}try {mqttClient.connect(options, null, object : IMqttActionListener {override fun onSuccess(asyncActionToken: IMqttToken?) {Log.d(TAG, "Connection success")}override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {Log.d(TAG, "Connection failure")}})} catch (e: MqttException) {e.printStackTrace()}}

  其中我们的serverurl可以在橄榄中看到:

  然后我们可以在调试台的监控里面看到已经连接到了mqtt服务:

订阅 topic

private fun subscribe(topic: String, qos: Int = 1) {try {mqttClient.subscribe(topic, qos, null, object : IMqttActionListener {override fun onSuccess(asyncActionToken: IMqttToken?) {Log.d(TAG, "Subscribed to $topic")}override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {Log.d(TAG, "Failed to subscribe $topic")}})} catch (e: MqttException) {e.printStackTrace()}}

  订阅成功之后也可以在控制台看到订阅的主题:

取消订阅 topic

private fun unsubscribe(topic: String) {try {mqttClient.unsubscribe(topic, null, object : IMqttActionListener {override fun onSuccess(asyncActionToken: IMqttToken?) {Log.d(TAG, "Unsubscribed to $topic")}override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {Log.d(TAG, "Failed to unsubscribe $topic")}})} catch (e: MqttException) {e.printStackTrace()}}

发布消息publish

private fun publish(topic: String, msg: String, qos: Int = 1, retained: Boolean = false) {try {val message = MqttMessage()message.payload = msg.toByteArray()message.qos = qosmessage.isRetained = retainedmqttClient.publish(topic, message, null, object : IMqttActionListener {override fun onSuccess(asyncActionToken: IMqttToken?) {Log.d(TAG, "$msg published to $topic")}override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {Log.d(TAG, "Failed to publish $msg to $topic")}})} catch (e: MqttException) {e.printStackTrace()}}

断开 MQTT 连接

private fun disconnect() {try {mqttClient.disconnect(null, object : IMqttActionListener {override fun onSuccess(asyncActionToken: IMqttToken?) {Log.d(TAG, "Disconnected")}override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {Log.d(TAG, "Failed to disconnect")}})} catch (e: MqttException) {e.printStackTrace()}}

  好了,本篇简单介绍Android(11及以下版本)连接mqtt服务就到此结束了,下篇《Android使用MQTT订阅及发布消息((二)兼容Android12 封装Mqtt客户端service)》将会介绍兼容Android12版本调用mqtt服务。有问题欢迎批评指正,觉得不错的也请点个赞,多谢。

更多推荐

Android使用MQTT订阅及发布消息((一)初步了解Mqtt以及实现Android操作mqtt服务)

本文发布于:2024-02-27 04:10:01,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1705127.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:消息   操作   MQTT   Android   Mqtt

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!