我正在尝试构建一个应用程序,该应用程序订阅多个mqtt主题,获取信息,对其进行处理并形成xml,并在处理后触发一个事件,以便可以将这些事件发送到某个云服务器,并从中成功响应.发送回mqtt频道.
I am trying to build an application which subscribes to multiple mqtt topics, get the information, process it and form xmls and upon processing trigger an event so that these can be sent to some cloud server and the successful response from there to be sent back to the mqtt channel.
<int-mqtt:message-driven-channel-adapter id="mqttAdapter" client-id="${clientId}" url="${brokerUrl}" topics="${topics}" channel="startCase" auto-startup="true" /> <int:channel id="startCase" /> <int:service-activator id="startCaseService" input-channel="startCase" ref="msgPollingService" method="pollMessages" /> <bean id="mqttTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <property name="corePoolSize" value="5" /> <property name="maxPoolSize" value="10" /> </bean> <bean id="msgPollingService" class="com.xxxx.xxx.mqttclient.mqtt.MsgPollingService"> <property name="taskExecutor" ref="mqttTaskExecutor" /> <property name="vendorId" value="${vendorId}" /> </bean>我的问题是如何将其发布到多个渠道,即是否可以选择将X消息发布到Y主题.目前,我有以下内容:
My question is how do I publish this to multiple channels, i.e. if I have an option to publish X message to Y topic. At present I have the below:
<int:channel id="outbound" /> <int-mqtt:outbound-channel-adapter id="mqtt-publish" client-id="kj" client-factory="clientFactory" auto-startup="true" url="${brokerUrl}" default-qos="0" default-retained="true" default-topic="${responseTopic}" channel="outbound" /> <bean id="eventListner" class="com.xxxx.xxxx.mqttclient.event.EventListener"> <property name="sccUrl" value="${url}" /> <property name="restTemplate" ref="restTemplate" /> <property name="channel" ref="outbound" /> </bean>我可以这样发布:
channel.send(MessageBuilder.withPayload("customResponse").build());我可以做类似的事情吗?
Can I do something like:
channel.send(Message<?>, topic)推荐答案
您的配置看起来不错.但是,MessageChannel是松耦合的抽象,并且仅处理Message.
Your configuration looks good. However the MessageChannel is an abstraction for loosely-coupling and gets deal only with Message.
因此,您要求a-la channel.send(Message<?>, topic)对于消息传递概念不正确.
So, you request a-la channel.send(Message<?>, topic) isn't correct for Messaging concepts.
但是,我们为您提供了一个窍门.来自AbstractMqttMessageHandler:
However we have a trick for you. From AbstractMqttMessageHandler:
String topic = (String) message.getHeaders().get(MqttHeaders.TOPIC); ..... this.publish(topic == null ? this.defaultTopic : topic, mqttMessage, message);因此,您需要的代码是这样的:
So, what you need from your code is this:
channel.send(MessageBuilder.withPayload("customResponse").setHeader(MqttHeaders.TOPIC, topic).build());换句话说,您应该发送带有mqtt_topic标头的Message来从<int-mqtt:outbound-channel-adapter>实现动态发布.
In other words you should send a Message with mqtt_topic header to achieve a dynamic publication from <int-mqtt:outbound-channel-adapter>.
另一方面,我们不建议直接从应用程序中使用MessageChannel.带有服务接口的<gateway>用于最终应用程序.其中topic可以是标记为@Header(MqttHeaders.TOPIC)
From other side we don't recommend to use MessageChannels directly from the application. The <gateway> with service interface is for such a case for end-application. Where that topic can be one of service method argument marked as @Header(MqttHeaders.TOPIC)
更多推荐
Spring集成MQTT发布和订阅多个主题
发布评论