Apache骆驼,RabbitMQ如何发送消息/对象

2022-01-11 00:00:00 rabbitmq spring java apache-camel

我希望有人可以在这个问题上提供一些帮助.

I hope someone can provide some help on this matter.

我正在使用骆驼rabbitmq,出于测试目的,我试图向队列发送一条消息,我试图在rabbitmq界面中显示该消息,然后将其读回.

I am using camel rabbitmq and for testing purpose I am trying to send a message to the queue, which I'm trying to display in rabbitmq interface and then also read it back.

但是我不能让它工作.

我认为可行的是,我在 rabbitmq 管理界面的交换选项卡中创建了一个新的交换.在我的 java 代码中,我将消息发送到该交换.执行代码时,我可以在 Web 界面中看到一个峰值,表明已收到某些内容,但我看不到已收到的内容.当我尝试阅读时,我无法阅读并收到以下错误:<在路由中: Route(route2)[[From[rabbitmq://192.168.59.103:5672/rt... 因为 Route route2 没有输出处理器.您需要向路由添加输出,例如 to("log:foo").

What I believe works is that I created, in the exchange tab of rabbitmq management interface, a new exchange. In my java code I send the message to that exchange. When the code is executed, I can see a spike in the web interface showing that something has been received but I can't see what has been received. When I try to read, I can't read and get the following errror: < in route: Route(route2)[[From[rabbitmq://192.168.59.103:5672/rt... because of Route route2 has no output processors. You need to add outputs to the route such as to("log:foo").

谁能给我一个关于如何发送消息、在网络交互中查看并阅读的实际示例?任何显示此过程的教程也将不胜感激.

Can someone provide me a practical example on how to send a message,see it in the web interace and also read it? any tutorial showing this process will be also appreciated.

谢谢

=================第二部分

================= SECOND PART

我现在遇到的错误如下:

The error I'm getting now is the following:

Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; reason: {#method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - cannot redeclare exchange 'rhSearchExchange' in vhost '/' with different type, durable, internal or autodelete value, class-id=40, method-id=10), null, ""}
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
    ... 47 more

我有以下设置:

我收到此错误,我认为我的 URI 有问题,我必须定义一些我缺少的额外参数我的交换是直接类型的我的队列是持久型我的 uri 是:rabbitmq://192.168.59.105:5672/rhSearchExchange?username=guest&password=guest&routingKey=rhSearchQueue

I get this error, I believe I’m doing something wrong with the URI and I have to define some extra parameters that I’m missing My exchange is of direct type My queue is of durable type And my uri is : rabbitmq://192.168.59.105:5672/rhSearchExchange?username=guest&password=guest&routingKey=rhSearchQueue

对此有何意见?

谢谢

推荐答案

所以我昨天能够解决这个问题,我遇到了与您相同(或至少相似)的问题.

So I was able to figure this out yesterday, I had the same (or at least similar) problems you were having.

您在 RabbitMQ URI 中拥有的选项必须与创建您的交换所使用的选项完全匹配.例如,在我的配置中,我有一个名为 tasks 的交换器,它是一种直接类型,是持久的,并且没有配置为自动删除.请注意,rabbitmq 骆驼组件中自动删除选项的默认值为 true.此外,我想使用路由键 camel 获取消息.这意味着我的 rabbitmq URI 需要看起来像:

The options you have in the RabbitMQ URI must exactly match the options that your exchange was created with. For example, in my configuration, I had an exchange called tasks that was a direct type, was durable, and was not configured to autodelete. Note that the default value for the autodelete option in the rabbitmq camel component is true. Additionally, I wanted to get the messages with the routing key camel. That means my rabbitmq URI needed to look like:

rabbitmq:localhost:5672/tasks?username=guest&password=guest&autoDelete=false&routingKey=camel

此外,我想从一个名为 task_queue 的现有队列中读取数据,而不是让 rabbitmq camel 组件声明它自己的队列.因此,我还需要添加一个额外的查询参数,所以我的 rabbitmq URI 是

Additionally, I wanted to read from an existing queue, called task_queue rather than have the rabbitmq camel component declare it's own queue. Therefore, I also needed to add an additional query parameter, so my rabbitmq URI was

rabbitmq:localhost:5672/tasks?username=guest&password=guest&autoDelete=false&routingKey=camel&queue=task_queue

此配置对我有用.下面,我从配置交换和队列并发送消息的代码中添加了一些 Java 代码片段,以及我的 Camel Route 配置.

This configuration worked for me. Below, I added some Java code snippets from the code that configures the exchange and queue and sends a message, and my Camel Route configuration.

rabbitConnFactory = new ConnectionFactory();
rabbitConnFactory.setHost("localhost");
final Connection conn = rabbitConnFactory.newConnection();
final Channel channel = conn.createChannel();

// declare a direct, durable, non autodelete exchange named 'tasks'    
channel.exchangeDeclare("tasks", "direct", true); 
// declare a durable, non exclusive, non autodelete queue named 'task_queue'
channel.queueDeclare("task_queue", true, false, false, null); 
// bind 'task_queue' to the 'tasks' exchange with the routing key 'camel'
channel.queueBind("task_queue", "tasks", "camel"); 

发送消息:

channel.basicPublish("tasks", "camel", MessageProperties.PERSISTENT_TEXT_PLAIN, "hello, world!".getBytes());

骆驼路线:

@Override
public void configure() throws Exception {
    from("rabbitmq:localhost:5672/tasks?username=guest&password=guest&autoDelete=false&routingKey=camel&queue=task_queue")
        .to("mock:result");
}

我希望这会有所帮助!

相关文章