如何通过 MQTT 传输并使用 RabbitMQ 和 Spring-AMQP 在 AMQP 上接收
所以我已经让 MQTT -> MQTT 和 AMQP -> AMQP 工作了;MQTT -> AMQP 的翻译似乎并没有在某处工作.这是我的测试,如果我的监听器"也在使用 paho 的 MQTT 中,则它通过了,但是这个 rabbitmq 实现没有.
So I've gotten MQTT -> MQTT and AMQP -> AMQP to work; the translation of MQTT -> AMQP doesn't seem to be working somewhere though. Here's my test, it passes if my "listener" is also in MQTT using paho, but this rabbitmq implementation doesn't.
@SpringBootTest
@SpringJUnitConfig
internal open class ProvisioningTest @Autowired constructor(
private val mqtt: IMqttAsyncClient,
private val mapper: ObjectMapper
) {
@Test
fun provision() {
val entity = Foley(
rfid = UUID.randomUUID().toString(),
)
val called = AtomicBoolean(false)
mqtt.subscribe("foley/created", 1) { _, _ -> called.set(true) }
mqtt.publish("foley/new", MqttMessage(mapper.writeValueAsBytes(entity)))
Awaitility.await().atMost(10, TimeUnit.SECONDS).untilTrue(called)
}
}
这是将保存的实体发布到另一个队列的侦听器;当我发布到 MQTT 时,它永远不会被调用.
this is the listener that publishes the saved entity to the other queue; it never gets called when I publish to MQTT.
@Service
open class Provisioning(private val repo: FoleyRepo) {
private val log: Logger = LogManager.getLogger(this::class.java)
@SendTo("foley.created")
@RabbitListener(queuesToDeclare = [Queue("foley.new")] )
open fun listen(entity: Foley): Foley {
log.trace("saving: {}", entity)
val save = repo.save(entity)
log.debug("saved: {}", save)
return save
}
}
我的全部消息配置
@Configuration
open class MessagingConfig {
@Bean
open fun client(
@Value("tcp://${mqtt.client.host:localhost}:${mqtt.client.port:1883}") uri: String,
@Value("${mqtt.client.user:#{null}}") user: String?,
@Value("${mqtt.client.pass:#{null}}") pass: CharArray?
): IMqttAsyncClient {
val connOpt = MqttConnectOptions()
user?.let { connOpt.userName = it }
pass?.let { connOpt.password = it }
connOpt.isCleanSession = false
connOpt.isAutomaticReconnect = true
val client = MqttAsyncClient(uri, MqttAsyncClient.generateClientId(), MemoryPersistence())
client.connect(connOpt)
return client
}
@Bean
open fun messageConverter( om: ObjectMapper): MessageConverter {
return Jackson2JsonMessageConverter(om)
}
@Bean
open fun builder(): Jackson2ObjectMapperBuilderCustomizer {
return Jackson2ObjectMapperBuilderCustomizer {
it.modules(JavaTimeModule(), KotlinModule())
}
}
}
使用启用了 mqtt 的 官方 docker rabbitmq 映像.
using the official docker rabbitmq image with mqtt enabled.
我需要纠正什么才能完成这项工作?
What do I need to correct to make this work?
推荐答案
MQTT插件发布到amq.topic
,以mqtt主题名作为路由key.
The MQTT plugin publishes to the amq.topic
with the mqtt topic name as the routing key.
在消费者端,它使用路由键将自动删除队列绑定到该交换;在以下示例中,队列名为 mqtt-subscription-mqttConsumerqos1
.
On the consumer side, it binds an auto-delete queue to that exchange, with the routing key; in the following example, the queue is named mqtt-subscription-mqttConsumerqos1
.
为了通过 AMQP 接收 MQTT 消息,您需要将自己的队列绑定到交换器.这是一个例子:
In order to receive MQTT messages over AMQP, you need to bind your own queue to the exchange. Here is an example:
@SpringBootApplication
public class So54995261Application {
public static void main(String[] args) {
SpringApplication.run(So54995261Application.class, args);
}
@Bean
@ServiceActivator(inputChannel = "toMQTT")
public MqttPahoMessageHandler sendIt(MqttPahoClientFactory clientFactory) {
MqttPahoMessageHandler handler = new MqttPahoMessageHandler("clientId", clientFactory);
handler.setAsync(true);
handler.setDefaultTopic("so54995261");
return handler;
}
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[] { "tcp://localhost:1883" });
options.setUserName("guest");
options.setPassword("guest".toCharArray());
factory.setConnectionOptions(options);
return factory;
}
@Bean
public MessageProducerSupport mqttInbound() {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("mqttConsumer",
mqttClientFactory(), "so54995261");
adapter.setCompletionTimeout(5000);
return adapter;
}
@Bean
public IntegrationFlow flow() {
return IntegrationFlows.from(mqttInbound())
.handle(System.out::println)
.get();
}
@RabbitListener(queues = "so54995261")
public void listen(byte[] in) {
System.out.println(new String(in));
}
@Bean
public Queue queue() {
return new Queue("so54995261");
}
@Bean
public Binding binding() {
return new Binding("so54995261", DestinationType.QUEUE, "amq.topic", "so54995261", null);
}
@Bean
public ApplicationRunner runner(MessageChannel toMQTT) {
return args -> toMQTT.send(new GenericMessage<>("foo"));
}
}
相关文章