使用 RabbitMQ(Java 客户端),有没有办法确定消费期间网络连接是否关闭?

2022-01-11 00:00:00 rabbitmq amqp java

我正在使用 Java 客户端在 RHEL 5.3 上使用 RabbitMQ.我有 2 个节点(机器).Node1 正在使用 Java 帮助程序类 QueueingConsumer 从 Node2 上的队列中消费消息.

I'm using RabbitMQ on RHEL 5.3 using the Java client. I have 2 nodes (machines). Node1 is consuming messages from a queue on Node2 using the Java helper class QueueingConsumer.

QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume("MyQueueOnNode2", noAck, consumer);
while (true)
{
   QueueingConsumer.Delivery delivery = consumer.nextDelivery();
   ... Process message - delivery.getBody()
}

如果接口在 Node1 或 Node2 上关闭(例如 ifconfig eth1 down),客户端(上图)永远不会知道网络不再存在.RabbitMQ 是否在 Java 客户端上提供某种类型的配置,可用于确定连接是否已消失.关闭 Node2 上的 RabbitMQ 服务器将触发 ShutdownSignalException,可以捕获该异常,并且应用程序可以进入重新连接循环.但是关闭接口不会导致任何类型的异常发生,因此代码将在 consumer.nextDelivery() 上永远等待.

If the interface is brought down on Node1 or Node2 (e.g. ifconfig eth1 down), the client (above) never knows the network isn't there anymore. Does RabbitMQ provide some type of configuration on the Java client that can be used to determine if the connection has gone away. Shutting down the RabbitMQ server on Node2 will trigger a ShutdownSignalException, which can be caught and the app can go into a reconnect loop. But bringing down the interface doesn't cause any type of exception to happen, so the code will be waiting forever on consumer.nextDelivery().

我也尝试过使用这个调用的超时版本.例如

I've also tried using the timeout version of this call. e.g.

QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume("MyQueueOnNode2", noAck, consumer);
int timeout_ms = 30000;
while (true)
{
   QueueingConsumer.Delivery delivery = consumer.nextDelivery(timeout_ms);
   if (delivery == null)
   {
      if (channel.isOpen() == false)             // Seems to always return true
      { throw new ShutdownSignalException(); }
   }
   else
   {
     ... Process message - delivery.getBody()
   }
}

但似乎这总是返回 true(即使接口已关闭).我假设在连接上注册 ShutdownListener 会产生相同的结果,但还没有尝试过.

but appears that this always returns true (even though the interface is down). I assume registering for the ShutdownListener on the connection will yield the same results, but haven't tried that yet.

有没有办法配置某种心跳,或者你只需​​要编写自定义租约逻辑(例如我现在在这里")就可以让它工作?

Is there a way to configure some sort of heartbeat, or do you just have to write custom lease logic (e.g. "I'm here now") in order to get this to work?

推荐答案

一般来说,你最好在 rabbitmq-discuss 邮件列表上发布有关 rabbitmq 的问题.我们不倾向于跟踪在此之外提出的问题.

In general, you're much better off posting questions regarding rabbitmq on the rabbitmq-discuss mailing list. We don't tend to track questions being asked outside of this.

您可以配置心跳,但默认情况下它是关闭的.您也可以打开 TCP Keep Alive.在创建新连接之前调用 ConnectionFactory 上的 setRequestedHeartbeat,或者,子类 ConnectionFactory,覆盖 configureSocket 方法,并调用 socket.setKeepAlive(true).当网络中断时,两者都应该导致连接通知.

There is a heartbeat that you can configure, though it is off by default. You could also turn on TCP Keep Alive. Either call setRequestedHeartbeat on the ConnectionFactory before creating a new connection, or, subclass ConnectionFactory, override the configureSocket method, and call socket.setKeepAlive(true). Both should result in the connection noticing when the network dies.

相关文章