Scrapy 与分布式爬虫:使用 Redis 实现分布式爬取

2023-04-17 00:00:00 scrapy 分布式 爬虫

Scrapy 是一个高效、快速、灵活的 Python 网络爬虫框架。但在处理大规模数据时,单一节点的爬虫往往会受到限制。为了提高效率,可以采用分布式爬虫的方式。

Redis 是一个开源的内存数据结构存储系统,可用作数据库、缓存和消息代理。在分布式爬虫中,用 Redis 作为分布式队列,存储待爬取的 URl,多个爬虫节点从队列中取出 URl 进行爬取。

这里我们详细介绍使用 Redis 实现 Scrapy 分布式爬虫的步骤。

  1. 安装 Redis

首先要安装 Redis,可以在 Redis 官网下载安装包,或使用包管理工具直接安装。

  1. 编写 Redis 分布式队列

在 Scrapy 项目中新建一个名为 redis_queue.py 的 Python 文件,用于实现 Redis 分布式队列功能。

import redis
from scrapy.utils.misc import load_object

class RedisQueue(object):
    """redis分布式队列"""

    def __init__(self, queue_key, **redis_kwargs):
        """
        :param queue_key: 队列名
        """
        self.__db = redis.Redis(**redis_kwargs)
        self.key = queue_key
        self.load_lua_scripts()

    def __len__(self):
        """返回队列长度"""
        return self.__db.llen(self.key)

    def push(self, obj):
        """将对象推入队列"""
        self.__db.lpush(self.key, obj)

    def pop(self, block=False, timeout=None):
        """
        从队列中弹出一项
        :param block: 是否阻塞
        :param timeout: 最长等待时间
        """
        if block:
            item = self.__db.brpop(self.key, timeout=timeout)
            if item:
                item = item[1]
        else:
            item = self.__db.rpop(self.key)
        return item

    def clear(self):
        """清空队列"""
        self.__db.delete(self.key)

    def load_lua_scripts(self):
        """加载 Redis Lua 脚本"""
        try:
            path = 'redis_lua_scripts/lua_pop_and_push.lua'
            with open(path) as f:
                script = f.read()
            sha = self.__db.script_load(script)  # 加载脚本
            self.pop_and_push = self.__db.register_script(sha)
        except FileNotFoundError as e:
            raise Exception('Can\'t find Redis lua script file. [%s]' % e)

    def pop_and_push_spider(self, from_key, spider_name):
        """操作 Lua 脚本,实现从一个列表弹出元素并 push 到另一个列表"""
        self.pop_and_push(args=[from_key, spider_name, self.key])

该文件中定义了 RedisQueue 类,实现了通过 Redis 实现内存型队列的功能。其中:

  • __init__ 方法连接到 Redis 数据库,并设置队列名。
  • push 方法将元素推入队列。
  • pop 方法弹出一项元素,如果指定了 block=True,则为阻塞模式,等待元素从队列中取出。
  • clear 方法清空队列。
  • load_lua_scripts 方法用于加载 Redis Lua 脚本,即将运行 Lua 脚本的 SHA1 加载到 Redis 服务器,并创建执行脚本的 Python 方法。
  • pop_and_push_spider 方法是定义好的 Lua 脚本,在有多个爬虫节点时,用于将处于等待状态的爬虫节点轮流从 Redis 中取出数据。
  1. 在 Scrapy 爬虫项目中设置配置

在 settings.py 文件中添加几行代码,以使用 Redis 作为分布式队列:

# 使用自定义的去重类 DUPEFILTER_CLASS
DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"

# 使用自定义的调度器类 SCHEDULER
SCHEDULER = "scrapy_redis.scheduler.Scheduler"

# 不清除 Redis queues、dupefilters 和 stats
SCHEDULER_PERSIST = True

# 是否按优先级调度请求
SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.PriorityQueue'

# 配置 redis 连接
REDIS_URL = 'redis://:@localhost:6379'

# 使用指定的 RedisKey 調度队列
SCHEDULER_QUEUE_KEY = '%(spider)s:request_queue'

其中,设置 DUPEFILTER_CLASS 和 SCHEDULER 为 scrapy_redis 内置的类,分别用于去重和调度。

设置 SCHEDULER_PERSIST 为 True,使得分布式队列和去重集不被清空,可以实现断点续爬的功能。

设置 SCHEDULER_QUEUE_CLASS 为 PriorityQueue,使得爬取请求可以按照优先级来调度,更加灵活高效。

配置 REDIS_URL 为 Redis 数据库连接地址。

配置 SCHEDULER_QUEUE_KEY,将队列名设置为当前爬虫名,以区分不同的爬虫。

  1. 配置 Scrapy 爬虫

在 Spiders 中的爬虫文件中进行配置,需要添加如下代码:

# 导入 RedisQueue
from ..redis_queue import RedisQueue

class PidancodeSpider(scrapy.Spider):
    name = 'pidancode'
    allowed_domains = ['pidancode.com']
    start_urls = ['http://www.pidancode.com/']

    # 新增代码
    custom_settings = {
        'DOWNLOAD_DELAY': 1,
        'RANDOM_DELAY': 0.5,
        'CONCURRENT_REQUESTS': 16,  # 并发请求数
        'CONCURRENT_REQUESTS_PER_DOMAIN': 8,  # 每个域名并发请求数
        'ITEM_PIPELINES': {
            'pidancode.pipelines.PidancodePipeline': 300
        },
        'DUPEFILTER_CLASS': "scrapy_redis.dupefilter.RFPDupeFilter",
        'SCHEDULER': "scrapy_redis.scheduler.Scheduler",
        'SCHEDULER_PERSIST': True,
        'SCHEDULER_QUEUE_CLASS': 'scrapy_redis.queue.PriorityQueue',
        'REDIS_URL': 'redis://:@localhost:6379',
        'SCHEDULER_QUEUE_KEY': f'{name}:request_queue',
    }

    # 再增代码外
    redis_queue = RedisQueue(f'{name}:request_queue')
    ...

    def start_requests(self):
        """从 Redis 队列中读取请求"""
        while True:
            url = self.redis_queue.pop(block=True, timeout=10)
            if url:
                yield scrapy.Request(url=url.decode(), dont_filter=True)
            else:
                break

    def parse(self, response):
        """爬取页面数据"""
        # 解析数据代码

        # 将新的待爬取页面推入队列
        for link in links:
            self.redis_queue.push(link)

其中,

  • 在 custom_settings 中设置了跟 redis 相关的配置。
  • 在 Spider 中初始化一个 RedisQueue 对象,用于从队列中获取待爬取的 URL。
  • start_requests 方法从 Redis 队列中读取 URL,用于生成 Scrapy.Request 对象。
  • parse 方法中将新的 URL 推入队列,以保证爬虫可以真正实现分布式爬虫的功能。
  1. 运行 Scrapy 爬虫

在运行爬虫之前,需要先将待爬取的 URL 加入到 Redis 队列中。可以手动执行如下命令(需在 Redis 客户端中执行):

LPUSH pidancode:request_queue http://www.pidancode.com/

然后运行爬虫:

scrapy crawl pidancode

如果在多个节点上运行这个命令,爬虫会从 Redis 队列中获取 URL,以实现分布式爬取。

至此,我们就成功地实现了使用 Redis 实现 Scrapy 分布式爬虫的功能。

相关文章