使用 pika 在 python 中的 SparkStreaming、RabbitMQ 和 MQTT

2022-01-11 00:00:00 python apache-spark mqtt rabbitmq pika

问题描述

为了让事情变得棘手,我想使用来自 rabbitMQ 队列的消息.现在我知道rabbit上有一个MQTT插件(https://www.rabbitmq.com/mqtt.html).

Just to make things tricky, I'd like to consume messages from the rabbitMQ queue. Now I know there is a plugin for MQTT on rabbit (https://www.rabbitmq.com/mqtt.html).

但是,我似乎无法举例说明 Spark 使用从 pika 生成的消息.

However I cannot seem to make an example work where Spark consumes a message that has been produced from pika.

例如,我在这里使用简单的 wordcount.py 程序(https://spark.apache.org/docs/1.2.0/streaming-programming-guide.html) 查看是否可以通过以下方式看到消息producer:

For example I am using the simple wordcount.py program here (https://spark.apache.org/docs/1.2.0/streaming-programming-guide.html) to see if I can I see a message producer in the following way:

import sys
import pika
import json
import future
import pprofile

def sendJson(json):

  connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
  channel = connection.channel()

  channel.queue_declare(queue='analytics', durable=True)
  channel.queue_bind(exchange='analytics_exchange',
                       queue='analytics')

  channel.basic_publish(exchange='analytics_exchange', routing_key='analytics',body=json)
  connection.close()

if __name__ == "__main__":
  with open(sys.argv[1],'r') as json_file:
    sendJson(json_file.read())

火花流消费者如下:

import sys
import operator

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.mqtt import MQTTUtils

sc = SparkContext(appName="SS")
sc.setLogLevel("ERROR")
ssc = StreamingContext(sc, 1)
ssc.checkpoint("checkpoint")
#ssc.setLogLevel("ERROR")


#RabbitMQ

"""EXCHANGE = 'analytics_exchange'
EXCHANGE_TYPE = 'direct'
QUEUE = 'analytics'
ROUTING_KEY = 'analytics'
RESPONSE_ROUTING_KEY = 'analytics-response'
"""


brokerUrl = "localhost:5672" # "tcp://iot.eclipse.org:1883"
topic = "analytics"

mqttStream = MQTTUtils.createStream(ssc, brokerUrl, topic)
#dummy functions - nothing interesting...
words = mqttStream.flatMap(lambda line: line.split(" "))
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)

wordCounts.pprint()
ssc.start()
ssc.awaitTermination()

但与简单的字数统计示例不同,我无法使其正常工作并出现以下错误:

However unlike the simple wordcount example, I cannot get this to work and get the following error:

16/06/16 17:41:35 ERROR Executor: Exception in task 0.0 in stage 7.0 (TID 8)
java.lang.NullPointerException
    at org.eclipse.paho.client.mqttv3.MqttConnectOptions.validateURI(MqttConnectOptions.java:457)
    at org.eclipse.paho.client.mqttv3.MqttAsyncClient.<init>(MqttAsyncClient.java:273)

所以我的问题是,MQTTUtils.createStream(ssc, brokerUrl, topic) 的设置应该是什么来监听队列以及是否有更完整的示例以及这些示例如何映射在rabbitMQ上.

So my questions are, what should be the settings in terms of MQTTUtils.createStream(ssc, brokerUrl, topic) to listen into the queue and whether there are any more fuller examples and how these map onto those of rabbitMQ.

我正在运行我的消费者代码:./bin/spark-submit ../../bb/code/skunkworks/sparkMQTTRabbit.py

I am running my consumer code with: ./bin/spark-submit ../../bb/code/skunkworks/sparkMQTTRabbit.py

我已按照一条评论的建议使用 TCP 参数更新了生产者代码如下:

I have updated the producer code as follows with TCP parameters as suggested by one comment:

url_location = 'tcp://localhost'
url = os.environ.get('', url_location)
params = pika.URLParameters(url)
connection = pika.BlockingConnection(params)

火花流为:

brokerUrl = "tcp://127.0.0.1:5672"
topic = "#" #all messages

mqttStream = MQTTUtils.createStream(ssc, brokerUrl, topic)
records = mqttStream.flatMap(lambda line: json.loads(line))
count = records.map(lambda rec: len(rec))
total = count.reduce(lambda a, b: a + b)
total.pprint()


解决方案

你好像使用了错误的端口号.假设:

It looks like you are using wrong port number. Assuming that:

  • 您有一个使用默认设置运行的本地 RabbitMQ 实例,并且您已启用 MQTT 插件(rabbitmq-plugins enable rabbitmq_mqtt)并重新启动 RabbitMQ 服务器
  • 在执行 spark-submit/pyspark 时包含 spark-streaming-mqtt(使用 packagesjars/driver-class-path)
  • you have a local instance of RabbitMQ running with default settings and you've enabled MQTT plugin (rabbitmq-plugins enable rabbitmq_mqtt) and restarted RabbitMQ server
  • included spark-streaming-mqtt when executing spark-submit / pyspark (either with packages or jars / driver-class-path)

您可以通过 tcp://localhost:1883 使用 TCP 进行连接.您还必须记住 MQTT 使用的是 amq.topic.

you can connect using TCP with tcp://localhost:1883. You have to also remember that MQTT is using amq.topic.

快速入门:

  • 使用以下内容创建 Dockerfile:

FROM rabbitmq:3-management

RUN rabbitmq-plugins enable rabbitmq_mqtt

  • 构建 Docker 镜像:

  • build Docker image:

    docker build -t rabbit_mqtt .
    

  • 启动镜像并等待服务器准备好:

  • start image and wait until server is ready:

    docker run -p 15672:15672 -p 5672:5672 -p 1883:1883 rabbit_mqtt 
    

  • 创建 producer.py,内容如下:

    import pika
    import time 
    
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
    channel = connection.channel()
    channel.exchange_declare(exchange='amq.topic',
                     type='topic', durable=True)
    
    for i in range(1000):
        channel.basic_publish(
            exchange='amq.topic',  # amq.topic as exchange
            routing_key='hello',   # Routing key used by producer
            body='Hello World {0}'.format(i)
        )
        time.sleep(3)
    
    connection.close()
    

  • 启动生产者

  • start producer

    python producer.py
    

    并访问管理控制台http://127.0.0.1:15672/#/exchanges/%2F/amq.topic

    查看收到的消息.

    创建 consumer.py 包含以下内容:

    create consumer.py with following content:

    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext
    from pyspark.streaming.mqtt import MQTTUtils
    
    sc = SparkContext()
    ssc = StreamingContext(sc, 10)
    
    mqttStream = MQTTUtils.createStream(
        ssc, 
        "tcp://localhost:1883",  # Note both port number and protocol
        "hello"                  # The same routing key as used by producer
    )
    mqttStream.count().pprint()
    ssc.start()
    ssc.awaitTermination()
    ssc.stop()
    

  • 下载依赖项(将 Scala 版本调整为用于构建 Spark 和 Spark 版本的版本):

  • download dependencies (adjust Scala version to the one used to build Spark and Spark version):

    mvn dependency:get -Dartifact=org.apache.spark:spark-streaming-mqtt_2.11:1.6.1
    

  • 确保 SPARK_HOMEPYTHONPATH 指向正确的目录.

  • make sure SPARK_HOME and PYTHONPATH point to the correct directories.

    提交 consumer.py with (调整版本如前):

    submit consumer.py with (adjust versions as before):

    spark-submit --packages org.apache.spark:spark-streaming-mqtt_2.11:1.6.1 consumer.py
    

  • 如果您按照所有步骤操作,您应该会在 Spark 日志中看到 Hello world 消息.

    If you followed all the steps you should see Hello world messages in the Spark log.

    相关文章