spring-integration连接MQTT全过程
2023-03-11 11:03:51
spring
MQTT一种物联网数据传输协议,构建在tcp之上,采用发布与订阅的模式进行数据交互,发布与订阅是两个独立的连接通道,这里采用spring-integration-mqt来实现发布与订阅MQTT,与直接采用MQTT的SDK相对要简单许多,服务端采用ActiveMQ来支持MQTT的消息服务并实现消息转发。
首先需要引入spring-integration-mqt的包
这里只需要引入这一个包即可。
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>5.3.1.RELEASE</version>
</dependency>
MQTT的配置比较简单
和spring-integration集成一样,需要配置相对应的入站、出站就可以了
具体配置如下:
package org.noka.serialservice.config;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.noka.serialservice.service.MsgSendService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.event.MqttSubscribedEvent;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.AbstractMqttMessageHandler;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.integration.support.MessageBuilder;
@EnableIntegration
@Configuration
@ConditionalOnProperty("mqtt.services")
public class MQTTConfig implements ApplicationListener<ApplicationEvent> {
private static Logger logger = LoggerFactory.getLogger(MQTTConfig.class);
private final MsgSendService msgSendService;//发布消息到消息中间件接口
@Value("${mqtt.appid:mqtt_id}")
private String appid;//客户端ID
@Value("${mqtt.input.topic:mqtt_input_topic}")
private String[] inputTopic;//订阅主题,可以是多个主题
@Value("${mqtt.out.topic:mqtt_out_topic}")
private String[] outTopic;//发布主题,可以是多个主题
@Value("${mqtt.services:#{null}}")
private String[] mqttServices;//服务器地址以及端口
@Value("${mqtt.user:#{null}}")
private String user;//用户名
@Value("${mqtt.passWord:#{null}}")
private String password;//密码
@Value("${mqtt.KeepAliveInterval:300}")
private Integer KeepAliveInterval;//心跳时间,默认为5分钟
@Value("${mqtt.CleanSession:false}")
private Boolean CleanSession;//是否不保持session,默认为session保持
@Value("${mqtt.AutomaticReconnect:true}")
private Boolean AutomaticReconnect;//是否自动重联,默认为开启自动重联
@Value("${mqtt.CompletionTimeout:30000}")
private Long CompletionTimeout;//连接超时,默认为30秒
@Value("${mqtt.Qos:1}")
private Integer Qos;//通信质量,详见MQTT协议
public MQTTConfig(MsgSendService msgSendService) {
this.msgSendService = msgSendService;
}
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();//连接工厂类
MqttConnectOptions options = new MqttConnectOptions();//连接参数
options.setServerURIs(mqttServices);//连接地址
if(null!=user) {
options.setUserName(user);//用户名
}
if(null!=password) {
options.setPassword(password.toCharArray());//密码
}
options.seTKEepAliveInterval(KeepAliveInterval);//心跳时间
options.setAutomaticReconnect(AutomaticReconnect);//断开是否自动重联
options.setCleanSession(CleanSession);//保持session
factory.setConnectionOptions(options);
return factory;
}
@Bean
public MessageProducerSupport mqttInput(MqttPahoClientFactory mqttPahoClientFactory){
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(appid, mqttPahoClientFactory, inputTopic);//建立订阅连接
DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
converter.setPayloadAsBytes(true);//bytes类型接收
adapter.setCompletionTimeout(CompletionTimeout);//连接超时的时间
adapter.setConverter(converter);
adapter.setQos(Qos);//消息质量
adapter.setOutputChannelName(ChannelName.INPUT_DATA);//输入管道名称
return adapter;
}
@Bean
@ServiceActivator(inputChannel = ChannelName.OUTPUT_DATA_MQTT)
public AbstractMqttMessageHandler MQTTOutAdapter(MqttPahoClientFactory connectionFactory) {
//创建一个新的出站管道,由于MQTT的发布与订阅是两个独立的连接,因此客户端的ID(即APPID)不能与订阅时所使用的ID一样,否则在服务端会认为是同一个客户端,而造成连接失败
MqttPahoMessageHandler outGate = new MqttPahoMessageHandler(appid + "_put", connectionFactory);
DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
converter.setPayloadAsBytes(true);//bytes类型接收
outGate.setAsync(true);
outGate.setCompletionTimeout(CompletionTimeout);//设置连接超时时时
outGate.setDefaultQos(Qos);//设置通信质量
outGate.setConverter(converter);
return outGate;
}
@Override
public void onApplicationEvent(ApplicationEvent event) {
if (event instanceof MqttSubscribedEvent) {
String msg = "OK";
msgSendService.send(MessageBuilder.withPayload(msg.getBytes()).build());
}
}
}
其中ChanneName是一个常量类
来标识入站、出站管道的名称,以便在其它需要的地方使用,实现方法如下:
public class ChannelName {
public final static String INPUT_DATA="input_data";//入站管道
public final static String OUTPUT_DATA_TCP="output_data_TCP";//TCP出站管道
public final static String OUTPUT_DATA_MQTT="output_data_MQTT";//mqtt出站管道名称
}
此时所有配置完成,接下来需要做的就是处理接收到的数据和发布数据,以上配置完成以后,接收和发送数据都是通过数据管道来完成,配置的是数据管道名称。
数据发送网关只是一个接口
用于向指定的数据管道里面发送数据,实现如下:
package org.noka.serialservice.service;
import org.noka.serialservice.config.ChannelName;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
@MessagingGateway
@Component
public interface MsgGateway {
@Gateway(requestChannel = ChannelName.OUTPUT_DATA_MQTT)
void send(@Header(MqttHeaders.TOPIC) String a, Message<byte[]> out);
}
在需要的地方,可以向下面这样调用这个接口,向MQTT服务器发送消息
//topic为主题名称,out为消息内容
msgGateway.send(topic, out);
MQTT服务器有数据下发时
会自动调将数据放入配置的入站数据管道中,在需要接收数据的地方,向下面这样配置即可
@ServiceActivator(inputChannel = ChannelName.INPUT_DATA)
public void upCase(Message<byte[]> in) {
logger.info("[net service data]========================================");
logger.info("[net dow data]"+new String(in.getPayload()));//字符串方式打印服务器下发的数据
logger.info("[net dow hex]"+ Hex.encodeHexString(in.getPayload(),false));//16进制方式打印服务器下发的数据
serialService.send(in.getPayload());//将服务器下发的数据转发给串口
}
最后是参数配置文件
#--------MQTT---------------------------
#设备ID,唯一标识
mqtt.appid=mqtt_id
#订阅主题,多个主题用逗号分隔
mqtt.input.topic=mqtt_input_topic
#发布主题
mqtt.out.topic=mqtt_out_topic,aac
#MQTT服务器地址,可以是多个地址
mqtt.services=tcp://47.244.191.41:1883
#mqtt用户名,默认无
#mqtt.user=guest
#mqtt密码,默认无
#mqtt.password=guest
#心跳间隔时间,默认3000
#mqtt.KeepAliveInterval=3000
#是否不保持session,默认false
#mqtt.CleanSession=false
#是否自动连接,默认true
#mqtt.AutomaticReconnect=true
#连接超时,默认30000
#mqtt.CompletionTimeout=30000
#传输质量,默认1
#mqtt.Qos=1
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持。
相关文章