从 Java 与 Django/Celery 互操作

2022-01-11 00:00:00 python django celery rabbitmq java

我们公司有一个基于 Python 的网站和一些基于 Python 的工作节点,它们通过 Django/Celery 和 RabbitMQ 进行通信.我有一个基于 Java 的应用程序,它需要向基于 Celery 的工作人员提交任务.我可以很好地从 Java 向 RabbitMQ 发送工作,但是基于 Celery 的工作人员永远不会接受工作.通过查看两种类型的作业提交的数据包捕获,存在差异,但我无法理解如何解释它们,因为其中很多是二进制文件,我找不到有关解码的文档.这里有没有人有 Java/RabbitMQ 和 Celery 一起工作的参考或经验?

Our company has a Python based web site and some Python based worker nodes which communicate via Django/Celery and RabbitMQ. I have a Java based application which needs to submit tasks to the Celery based workers. I can send jobs to RabbitMQ from Java just fine, but the Celery based workers are never picking up the jobs. From looking at the packet captures of both types of job submissions, there are differences, but I cannot fathom how to account for them because a lot of it is binary that I cannot find documentation about decoding. Does anyone here have any reference or experience with having Java/RabbitMQ and Celery working together?

推荐答案

我找到了解决方案.RabbitMQ 的 Java 库是指交换/队列/路由键.在 Celery 中,队列名称实际上是映射到 Java 库中引用的交换.默认情况下,Celery 的队列只是celery".如果您的 Django 设置使用以下语法定义了一个名为myqueue"的队列:

I found the solution. The Java library for RabbitMQ refers to exchanges/queues/routekeys. In Celery, the queue name is actually mapping to the exchange referred to in the Java library. By default, the queue for Celery is simply "celery". If your Django settings define a queue called "myqueue" using the following syntax:

CELERY_ROUTES = {
    'mypackage.myclass.runworker'      : {'queue':'myqueue'},
}

那么基于 Java 的代码需要做如下的事情:

Then the Java based code needs to do something like the following:

        ConnectionFactory factory = new ConnectionFactory();
        Connection connection = null ;
        try {
            connection = factory.newConnection(mqHost, mqPort);
        } catch (IOException ioe) {
            log.error("Unable to create new MQ connection from factory.", ioe) ;
        }

        Channel channel = null ;
        try {
            channel = connection.createChannel();
        } catch (IOException ioe) {
            log.error("Unable to create new channel for MQ connection.", ioe) ;
        }

        try {
            channel.queueDeclare("celery", false, false, false, true, null);
        } catch (IOException ioe) {
            log.error("Unable to declare queue for MQ channel.", ioe) ;
        }

        try {
            channel.exchangeDeclare("myqueue", "direct") ;
        } catch (IOException ioe) {
            log.error("Unable to declare exchange for MQ channel.", ioe) ;
        }

        try {
            channel.queueBind("celery", "myqueue", "myqueue") ;
        } catch (IOException ioe) {
            log.error("Unable to bind queue for channel.", ioe) ;
        }

            // Generate the message body as a string here.

        try {
            channel.basicPublish(mqExchange, mqRouteKey, 
                new AMQP.BasicProperties("application/json", "ASCII", null, null, null, null, null, null, null, null, null, "guest", null, null),
                messageBody.getBytes("ASCII"));
        } catch (IOException ioe) {
            log.error("IOException encountered while trying to publish task via MQ.", ioe) ;
        }

事实证明,这只是术语上的差异.

It turns out that it is just a difference in terminology.

相关文章