客户端工具MQTT.fx上定义的主题未使用ActiveMQ Artemis消息

我的用例保证使用发布-订阅模型,因此我使用在我的Spring Boot客户端中定义的主题。我已经使用工具MQTT.fx和两个Spring Boot客户端进行了测试。两个Spring Boot客户端能够相互通信,但MQTT.fx客户端只连接到代理,不生成或使用消息。两个Spring Boot客户端都连接到端口61616,而MQTT.fx客户端连接到端口1883

对于我的项目,我需要让MQTT.fx客户端工作,因为它高度代表打算与服务器一起工作的微控制器。MQTT.fx上的故障就是微控制器上的故障。

broker.xml文件中的acceptor配置如下所示。

<acceptors>
     <!-- Acceptor for every supported protocol -->
     <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true;supportAdvisory=false;suppressInternalManagementObjects=false</acceptor>

     <!-- AMQP Acceptor.  Listens on default AMQP port for AMQP traffic.-->
     <acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true</acceptor>

     <!-- STOMP Acceptor. -->
     <acceptor name="stomp">tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor>

     <!-- HornetQ Compatibility Acceptor.  Enables HornetQ Core and STOMP for legacy HornetQ clients. -->
     <acceptor name="hornetq">tcp://0.0.0.0:5445?anycastPrefix=jms.queue.;multicastPrefix=jms.topic.;protocols=HORNETQ,STOMP;useEpoll=true</acceptor>

     <!-- MQTT Acceptor -->
     <acceptor name="mqtt">tcp://0.0.0.0:1883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true</acceptor>
</acceptors>

Spring Boot客户端通信时的日志如下。首先,向主题server.weatherForecast发送一条消息,然后,经过一些处理后,向AMEBAA000105.device.weatherForecast发送响应。

2021-10-04 14:14:04,860 [AUDIT](Thread-8 (activemq-netty-threads)) 

AMQ601715: User admin(admins)@127.0.0.1:1125 successfully authenticated
2021-10-04 14:14:04,860 [AUDIT](Thread-8 (activemq-netty-threads)) AMQ601267: User admin(admins)@127.0.0.1:1125 is creating a core session on target resource ActiveMQServerImpl::name=0.0.0.0 [with parameters: [3b141c41-24ef-11ec-aa52-00155d831300, null, ****, 102400, RemotingConnectionImpl [ID=6280c69f, clientID=null, nodeID=b719c384-1d0a-11ec-8b7c-00155d831300, transportConnection=org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection@417b6fef[ID=6280c69f, local= /127.0.0.1:61616, remote=/127.0.0.1:1125]], true, true, false, false, null, org.apache.activemq.artemis.core.protocol.core.impl.CoreSessionCallback@103d417f, true, OperationContextImpl [61828916] [minimalStore=9223372036854775807, storeLineUp=0, stored=0, minimalReplicated=9223372036854775807, replicationLineUp=0, replicated=0, paged=0, minimalPage=9223372036854775807, pageLineUp=0, errorCode=-1, errorMessage=null, executorsPending=0, executor=OrderedExecutor(tasks=[])], {}]]
2021-10-04 14:14:04,864 [AUDIT](Thread-7 (ActiveMQ-server-org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl$6@b62d79)) AMQ601500: User admin(admins)@127.0.0.1:1125 is sending a message CoreMessage[messageID=528267,durable=true,userID=3b146a62-24ef-11ec-aa52-00155d831300,priority=4, timestamp=Mon Oct 04 14:14:04 IST 2021,expiration=0, durable=true, address=jms.topic.server.weatherForecast,size=314,properties=TypedProperties[__AMQ_CID=3b13ce1f-24ef-11ec-aa52-00155d831300]]@1994364957, with Context: RoutingContextImpl(Address=null, routingType=null, PreviousAddress=null previousRoute:null, reusable=null, version=0)
..................................................

2021-10-04 14:14:04,865 [AUDIT](Thread-7 (ActiveMQ-server-org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl$6@b62d79)) AMQ601501: User admin(admins)@127.0.0.1:1110 is consuming a message from 41207748-6ed3-42d2-b75e-044805212686: Reference[528267]:RELIABLE:CoreMessage[messageID=528267,durable=true,userID=3b146a62-24ef-11ec-aa52-00155d831300,priority=4, timestamp=Mon Oct 04 14:14:04 IST 2021,expiration=0, durable=true, address=jms.topic.server.weatherForecast,size=314,properties=TypedProperties[__AMQ_CID=3b13ce1f-24ef-11ec-aa52-00155d831300]]@1994364957
2021-10-04 14:14:04,868 [AUDIT](Thread-4 (ActiveMQ-server-org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl$6@b62d79)) AMQ601502: User admin(admins)@127.0.0.1:1110 is acknowledging a message from 41207748-6ed3-42d2-b75e-044805212686: CoreMessage[messageID=528267,durable=true,userID=3b146a62-24ef-11ec-aa52-00155d831300,priority=4, timestamp=Mon Oct 04 14:14:04 IST 2021,expiration=0, durable=true, address=jms.topic.server.weatherForecast,size=314,properties=TypedProperties[__AMQ_CID=3b13ce1f-24ef-11ec-aa52-00155d831300]]@1994364957
2021-10-04 14:14:08,059 [AUDIT](Thread-8 (ActiveMQ-server-org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl$6@b62d79)) AMQ601500: User admin(admins)@127.0.0.1:1110 is sending a message CoreMessage[messageID=528349,durable=true,userID=3cfc1623-24ef-11ec-aa52-00155d831300,priority=4, timestamp=Mon Oct 04 14:14:08 IST 2021,expiration=0, durable=true, address=null,size=190,properties=TypedProperties[__AMQ_CID=d04d0e87-24ee-11ec-aa52-00155d831300]]@600964226, with Context: RoutingContextImpl(Address=jms.topic.AMEBAA000105.device.weatherForecast, routingType=null, PreviousAddress=jms.topic.AMEBAA000105.device.weatherForecast previousRoute:null, reusable=null, version=0)
..................................................

使用MQTT.fx时的日志如下。最初订阅AMEBAA000105.device.weatherForecast,然后向server.weatherForecast发送消息。本来要接收这些消息的Spring Boot客户端不会接收任何内容,因此不会向AMEBAA000105.device.weatherForecast发送任何内容。当另一个Spring Boot客户端触发将消息从Spring Boot客户端发布到AMEBAA000105.device.weatherForecast时,MQTT.fx不捕获该消息。

2021-10-04 14:24:35,443 [AUDIT](Thread-17 (ActiveMQ-server-org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl$6@b62d79)) AMQ601265: User admin(admins)@192.168.0.107:18640 is creating a core consumer on target resource ServerSessionImpl() [with parameters: [534062, MQTT_FX_Client.AMEBAA000105.device.weatherForecast, null, 0, false, false, -1]]
2021-10-04 14:24:43,719 [AUDIT](Thread-10 (ActiveMQ-server-org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl$6@b62d79)) AMQ601500: User admin(admins)@192.168.0.107:18640 is sending a message CoreMessage[messageID=534380,durable=false,userID=null,priority=0, timestamp=0,expiration=0, durable=false, address=server.weatherForecast,size=200,properties=TypedProperties[mqtt.message.retain=false,mqtt.qos.level=0]]@1214406991, with Context: RoutingContextImpl(Address=server.weatherForecast, routingType=null, PreviousAddress=null previousRoute:null, reusable=null, version=0)

以上日志为审核日志。在artemis.log文件中,MQTT.fx不断ping代理。

2021-10-04 14:30:43,727 TRACE [org.apache.activemq.artemis.core.protocol.mqtt] MQTT(MQTT_FX_Client): OUT >> PINGRESP
2021-10-04 14:31:43,728 TRACE [org.apache.activemq.artemis.core.protocol.mqtt] MQTT(MQTT_FX_Client): OUT >> PINGRESP
2021-10-04 14:32:43,729 TRACE [org.apache.activemq.artemis.core.protocol.mqtt] MQTT(MQTT_FX_Client): OUT >> PINGRESP
2021-10-04 14:33:43,731 TRACE [org.apache.activemq.artemis.core.protocol.mqtt] MQTT(MQTT_FX_Client): OUT >> PINGRESP
2021-10-04 14:34:43,732 TRACE [org.apache.activemq.artemis.core.protocol.mqtt] MQTT(MQTT_FX_Client): OUT >> PINGRESP
2021-10-04 14:35:43,733 TRACE [org.apache.activemq.artemis.core.protocol.mqtt] MQTT(MQTT_FX_Client): OUT >> PINGRESP

artemis.log文件中的MQTT.fx订阅或发布的日志如下

2021-10-04 14:22:27,702 TRACE [org.apache.activemq.artemis.core.protocol.mqtt] MQTT(MQTT_FX_Client): IN << SUBSCRIBE(3)
AMEBAA000105.device.weatherForecast : AT_MOST_ONCE
2021-10-04 14:24:43,718 TRACE [org.apache.activemq.artemis.core.protocol.mqtt] MQTT(MQTT_FX_Client): IN << PUBLISH(-1) topic=server.weatherForecast, qos=AT_MOST_ONCE, retain=false, dup=false, payload={"serialNumber" : "AMEBAA000105"}

非常感谢您的帮助。


解决方案

您说在测试Spring Boot客户端时,您会向server.weatherForecast发送一条消息,经过一些处理后,会向AMEBAA000105.device.weatherForecast发送一个响应。然而,这是而不是日志显示的。日志显示您分别向jms.topic.server.weatherForecastjms.topic.AMEBAA000105.device.weatherForecast发送消息。请注意jms.topic.前缀。

我相信这是问题的根源,因为根据日志,MQTT实际上将使用server.weatherForecastAMEBAA000105.device.weatherForecast。由于这两个客户端没有使用匹配的名称,因此它们永远不能与您的当前配置一起工作。

我的猜测是您正在使用来自您的Spring Boot应用程序的遗留Artemis 1.x客户端,这就是使用jms.topic.前缀的原因。您应该迁移到较新的客户端(例如,与您正在使用的代理版本匹配的客户端),或者配置anycastPrefixmulticastPrefix以支持旧版1.x客户端。默认的broker.xml有一个说明如何执行此操作的注释:

<!-- Note: If an acceptor needs to be compatible with HornetQ and/or Artemis 1.x clients add
           "anycastPrefix=jms.queue.;multicastPrefix=jms.topic." to the acceptor url.
           See https://issues.apache.org/jira/browse/ARTEMIS-1644 for more information. -->

默认情况下,此注释显示在acceptors块中,但似乎已将其删除,因为它不在您粘贴的XML中。

如果要确认客户端正在使用哪个库,最简单的方法就是检查客户端的环境。但是,如果这不起作用,并且您有权访问代理日志,那么您可以对org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl执行enable TRACE logging,然后查看CreateSessionMessage中传递的version。如果是<;131,则它不是2.18.0客户端。

相关文章