Scrapy 与分布式爬虫:使用 Redis 实现分布式爬取
Scrapy 是一个高效、快速、灵活的 Python 网络爬虫框架。但在处理大规模数据时,单一节点的爬虫往往会受到限制。为了提高效率,可以采用分布式爬虫的方式。
Redis 是一个开源的内存数据结构存储系统,可用作数据库、缓存和消息代理。在分布式爬虫中,用 Redis 作为分布式队列,存储待爬取的 URl,多个爬虫节点从队列中取出 URl 进行爬取。
这里我们详细介绍使用 Redis 实现 Scrapy 分布式爬虫的步骤。
- 安装 Redis
首先要安装 Redis,可以在 Redis 官网下载安装包,或使用包管理工具直接安装。
- 编写 Redis 分布式队列
在 Scrapy 项目中新建一个名为 redis_queue.py 的 Python 文件,用于实现 Redis 分布式队列功能。
import redis from scrapy.utils.misc import load_object class RedisQueue(object): """redis分布式队列""" def __init__(self, queue_key, **redis_kwargs): """ :param queue_key: 队列名 """ self.__db = redis.Redis(**redis_kwargs) self.key = queue_key self.load_lua_scripts() def __len__(self): """返回队列长度""" return self.__db.llen(self.key) def push(self, obj): """将对象推入队列""" self.__db.lpush(self.key, obj) def pop(self, block=False, timeout=None): """ 从队列中弹出一项 :param block: 是否阻塞 :param timeout: 最长等待时间 """ if block: item = self.__db.brpop(self.key, timeout=timeout) if item: item = item[1] else: item = self.__db.rpop(self.key) return item def clear(self): """清空队列""" self.__db.delete(self.key) def load_lua_scripts(self): """加载 Redis Lua 脚本""" try: path = 'redis_lua_scripts/lua_pop_and_push.lua' with open(path) as f: script = f.read() sha = self.__db.script_load(script) # 加载脚本 self.pop_and_push = self.__db.register_script(sha) except FileNotFoundError as e: raise Exception('Can\'t find Redis lua script file. [%s]' % e) def pop_and_push_spider(self, from_key, spider_name): """操作 Lua 脚本,实现从一个列表弹出元素并 push 到另一个列表""" self.pop_and_push(args=[from_key, spider_name, self.key])
该文件中定义了 RedisQueue 类,实现了通过 Redis 实现内存型队列的功能。其中:
- __init__ 方法连接到 Redis 数据库,并设置队列名。
- push 方法将元素推入队列。
- pop 方法弹出一项元素,如果指定了 block=True,则为阻塞模式,等待元素从队列中取出。
- clear 方法清空队列。
- load_lua_scripts 方法用于加载 Redis Lua 脚本,即将运行 Lua 脚本的 SHA1 加载到 Redis 服务器,并创建执行脚本的 Python 方法。
- pop_and_push_spider 方法是定义好的 Lua 脚本,在有多个爬虫节点时,用于将处于等待状态的爬虫节点轮流从 Redis 中取出数据。
- 在 Scrapy 爬虫项目中设置配置
在 settings.py 文件中添加几行代码,以使用 Redis 作为分布式队列:
# 使用自定义的去重类 DUPEFILTER_CLASS DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter" # 使用自定义的调度器类 SCHEDULER SCHEDULER = "scrapy_redis.scheduler.Scheduler" # 不清除 Redis queues、dupefilters 和 stats SCHEDULER_PERSIST = True # 是否按优先级调度请求 SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.PriorityQueue' # 配置 redis 连接 REDIS_URL = 'redis://:@localhost:6379' # 使用指定的 RedisKey 調度队列 SCHEDULER_QUEUE_KEY = '%(spider)s:request_queue'
其中,设置 DUPEFILTER_CLASS 和 SCHEDULER 为 scrapy_redis 内置的类,分别用于去重和调度。
设置 SCHEDULER_PERSIST 为 True,使得分布式队列和去重集不被清空,可以实现断点续爬的功能。
设置 SCHEDULER_QUEUE_CLASS 为 PriorityQueue,使得爬取请求可以按照优先级来调度,更加灵活高效。
配置 REDIS_URL 为 Redis 数据库连接地址。
配置 SCHEDULER_QUEUE_KEY,将队列名设置为当前爬虫名,以区分不同的爬虫。
- 配置 Scrapy 爬虫
在 Spiders 中的爬虫文件中进行配置,需要添加如下代码:
# 导入 RedisQueue from ..redis_queue import RedisQueue class PidancodeSpider(scrapy.Spider): name = 'pidancode' allowed_domains = ['pidancode.com'] start_urls = ['http://www.pidancode.com/'] # 新增代码 custom_settings = { 'DOWNLOAD_DELAY': 1, 'RANDOM_DELAY': 0.5, 'CONCURRENT_REQUESTS': 16, # 并发请求数 'CONCURRENT_REQUESTS_PER_DOMAIN': 8, # 每个域名并发请求数 'ITEM_PIPELINES': { 'pidancode.pipelines.PidancodePipeline': 300 }, 'DUPEFILTER_CLASS': "scrapy_redis.dupefilter.RFPDupeFilter", 'SCHEDULER': "scrapy_redis.scheduler.Scheduler", 'SCHEDULER_PERSIST': True, 'SCHEDULER_QUEUE_CLASS': 'scrapy_redis.queue.PriorityQueue', 'REDIS_URL': 'redis://:@localhost:6379', 'SCHEDULER_QUEUE_KEY': f'{name}:request_queue', } # 再增代码外 redis_queue = RedisQueue(f'{name}:request_queue') ... def start_requests(self): """从 Redis 队列中读取请求""" while True: url = self.redis_queue.pop(block=True, timeout=10) if url: yield scrapy.Request(url=url.decode(), dont_filter=True) else: break def parse(self, response): """爬取页面数据""" # 解析数据代码 # 将新的待爬取页面推入队列 for link in links: self.redis_queue.push(link)
其中,
- 在 custom_settings 中设置了跟 redis 相关的配置。
- 在 Spider 中初始化一个 RedisQueue 对象,用于从队列中获取待爬取的 URL。
- start_requests 方法从 Redis 队列中读取 URL,用于生成 Scrapy.Request 对象。
- parse 方法中将新的 URL 推入队列,以保证爬虫可以真正实现分布式爬虫的功能。
- 运行 Scrapy 爬虫
在运行爬虫之前,需要先将待爬取的 URL 加入到 Redis 队列中。可以手动执行如下命令(需在 Redis 客户端中执行):
LPUSH pidancode:request_queue http://www.pidancode.com/
然后运行爬虫:
scrapy crawl pidancode
如果在多个节点上运行这个命令,爬虫会从 Redis 队列中获取 URL,以实现分布式爬取。
至此,我们就成功地实现了使用 Redis 实现 Scrapy 分布式爬虫的功能。
相关文章