Python中如何实现基于队列的分布式任务处理系统的架构设计与实现
基于队列的分布式任务处理系统的架构设计可以采用Master-Worker模式。
Master节点负责接收任务,将任务队列中的任务分配给Worker节点处理,同时监控Worker节点的状态和任务进度,根据需求动态添加或删除Worker节点。
Worker节点负责从任务队列中获取任务,并处理完任务后将结果返回给Master节点。
代码演示:
- 安装依赖
pip install rq
- 编写任务函数
在本例中,定义了一个job.py文件,其中包含了一个任务函数,用于将字符串转换为大写并返回结果。
# job.py def upper_case(string): return string.upper()
- 编写Master节点代码
在本例中,Master节点为一个Flask应用程序,用于接收任务请求,并将任务添加到队列中。Master节点还可以动态添加或删除Worker节点。
# master_node.py from flask import Flask, request from rq import Queue, Connection from redis import Redis app = Flask(__name__) redis_conn = Redis(host='localhost', port=6379) @app.route('/task', methods=["POST"]) def add_job(): job_data = request.get_json() with Connection(redis_conn): queue = Queue() job = queue.enqueue('job.upper_case', job_data['string']) return {'job_id': job.get_id()} if __name__ == '__main__': app.run(debug=True)
- 编写Worker节点代码
在本例中,Worker节点为一个Python脚本,用于从队列中获取任务,并处理完任务后将结果返回给Master节点。
# worker_node.py from rq import Connection, Worker from redis import Redis redis_conn = Redis(host='localhost', port=6379) with Connection(redis_conn): worker = Worker(['default']) worker.work()
- 运行Master节点
执行以下命令启动Master节点。
python master_node.py
- 运行Worker节点
执行以下命令启动Worker节点。
python worker_node.py
- 发送任务请求
使用以下Python代码发送任务请求。
import requests import json job_data = {'string': 'pidancode.com'} headers = {'Content-Type': 'application/json'} response = requests.post('http://localhost:5000/task', data=json.dumps(job_data), headers=headers) job_id = response.json().get('job_id') print('Job ID:', job_id)
- 获取任务结果
使用以下Python代码获取任务结果。
from rq import Connection, Queue from redis import Redis redis_conn = Redis(host='localhost', port=6379) with Connection(redis_conn): queue = Queue() result = queue.fetch_job(job_id).result print('Result:', result)
此时,结果应该为“PIDANCODE.COM”。
相关文章