REDIS 如何利用python 操作redis 集群 (投稿文章)

2021-08-13 00:00:00 创建 集群 连接 线程 连接池

注明: 此篇文章为投稿文字, 投稿人 闫树爽, (程序员, 目前从事REDIS ,MONGODB ,以及数据库运维自动化代码工作)


在NOSQL 数据库中的操作,与关系型数据库不同的是,会一门程序对于NOSQL的数据库操控是十分有利的,下面的内容是关于如何利用python程序来操作redis 集群的说明.


## 利用python操作redis集群


redis的cluster模式为大型应用中常用的方式,今天学习如何使用redis-py-cluster来操作redis集群


首先安装redis-py-cluster


```

pip install redis-py-cluster

```


基本用法


```python

from rediscluster import RedisCluster

from string import ascii_letters

import random


conn = RedisCluster(host="127.0.0.1", port=6379, password='password')#创建连接

for i in range(10000):

    key = ''.join(random.sample(ascii_letters, k=7))#创建一个随机字符串作为key

    # ex = random.randint(1, 60 * 60 * 24)

    conn.set(key, i, ex=500)

    print(i)

```


此时可以发现reidis中已经有了我们的数据


然而在现实使用中,redis的使用往往涉及高并发,每次都重新创建连接不是我们所建议的方式,我们可以使用连接池来创建连接,并通过多线程来进行访问。


redis-py-cluster的官方文档写的比较简单并没有给出详细的连接池使用方式,但是好在python能够查看源码,我们可以看到其中有一个ClusterConnectionPool类,这个从命名来看应该是连接池的。进源码去看这个类是继承自redis库的ConnectionPool。


直接连接尝试


```python

from rediscluster import ClusterBlockingConnectionPool,RedisCluster,ClusterConnectionPool

startup_nodes = [

    {'host': '10.50.132.92', 'port': 6379},

    {'host': '10.50.132.93', 'port': 6379},

    {'host': '10.50.132.94', 'port': 6379},


]

pool = ClusterConnectionPool(startup_nodes=startup_nodes,password="xxxx")

```


## 处理报错


会发现报错(个别现象,原因下面说),如果未遇到错误可以跳过这小节


```

redis.exceptions.ResponseError: unknown command `CONFIG`, with args beginning with: `GET`, `cluster-require-full-coverage`, 

```


原因是因为我将redis中的config命令rename掉了,但是在默认情况下,会检查槽的完整性,所以会使用到config命令,但是config命令已经被我rename掉了啊,所以会报这个错误。解决方法有两种:


1、修改自己的代码,这个参数就是字面意思,跳过完整性检查,这样也就不会调用config命令了。


```python

pool = ClusterBlockingConnectionPool(startup_nodes=startup_nodes,password="xxxx",skip_full_coverage_check=True)

```


2、修改库的代码,这样的好处是还是能够检查完整行,但是缺点是在进行项目迁移的时候还要再改一遍源码,修改方法如下:


首先找到redis包中的client.py,一般在site-packages/redis/client.py


如果找不到可以从报错的地方快速找到redis包的安装路径


![](F:\文档\redis\python连接redis集群\1.PNG)


找到这个1239行


```python

    def config_get(self, pattern="*"):

        "Return a dictionary of configuration based on the ``pattern``"

        return self.execute_command('CONFIG GET', pattern)

```


修改这个CONFIG GET为rename后的命令就可以了


通过以上两种方法,可以正常使用连接池了


## 连接池的使用


这个连接池的使用方法如下


```python

from rediscluster import ClusterBlockingConnectionPool,RedisCluster,ClusterConnectionPool

startup_nodes = [

    {'host': '10.50.132.92', 'port': 6379},

    {'host': '10.50.132.93', 'port': 6379},

    {'host': '10.50.132.94', 'port': 6379},


]

pool = ClusterConnectionPool(startup_nodes=startup_nodes,password="xxxx")

client = RedisCluster(connection_pool=pool)

```


这里的pool使用上面的方式就可以得到了一个client


## 实际使用


我们可以用多线程为例子,使用连接池来操作redis


```python

from rediscluster import ClusterBlockingConnectionPool,RedisCluster,ClusterConnectionPool

import threading

from string import ascii_letters

import random


startup_nodes = [

    {'host': '10.50.132.92', 'port': 6379},

    {'host': '10.50.132.93', 'port': 6379},

    {'host': '10.50.132.94', 'port': 6379},


]

pool = ClusterConnectionPool(startup_nodes=startup_nodes,password="xxxx",max_connections=10)

threads=[]

lock = threading.Lock()

def test_thread(thread_id):

    client = RedisCluster(connection_pool=pool)

    for i in range(100):

        key = ''.join(random.sample(ascii_letters, k=7))

        client.set(key,random.randint(1,100),ex=100)

        lock.acquire()

        print(f"Thread-{thread_id}:processed {i+1} times")

        lock.release()

#创建线程

for i in range(10):

    threads.append(threading.Thread(target=test_thread,args=(i,)))

#开启线程

for th in threads:

    th.start() 

```


此处我们的线程池里面放10个连接,然后开启10个线程,为了显示的不太难看,我们在print的地方加了个锁,我们期待这个样能够正常的10个线程同时向redis里面写数据,但是实际却发现


![](F:\文档\redis\python连接redis集群\2.PNG)


会显示有太多的连接,猜测应该是连接池中的连接不够,所以我们调大max_connections参数为50,发现可以正常使用了,但是为什么呢?


熟悉redis的同学可能一下就猜测到了原因。因为我们的集群模式,key键是要根据hash值来分配的,具体连接到那个我们插入之间是不知道的,所以在连接创建之前,客户端也是不知道的,所以显示出这个。在初始化函数中果然找到了相应的参数max_connections_per_node。我们将它设置为True。


这样测试,我们将max_connections设置为10,同时将max_connections_per_node设置为True


这样一来每个节点都有10个连接,我们可以开启30个线程,如下


```python

from rediscluster import ClusterBlockingConnectionPool,RedisCluster,ClusterConnectionPool

import threading

from string import ascii_letters

import random


startup_nodes = [

    {'host': '10.50.132.92', 'port': 6379},

    {'host': '10.50.132.93', 'port': 6379},

    {'host': '10.50.132.94', 'port': 6379},


]

pool = ClusterConnectionPool(startup_nodes=startup_nodes,password="xxx",max_connections=10,skip_full_coverage_check=True,max_connections_per_node=True)

threads=[]

lock = threading.Lock()

def test_thread(thread_id):

    client = RedisCluster(connection_pool=pool)

    for i in range(100):

        key = ''.join(random.sample(ascii_letters, k=7))

        client.set(key,random.randint(1,100),ex=100)

        lock.acquire()

        print(f"Thread-{thread_id}:processed {i+1} times")

        lock.release()

#创建线程

for i in range(30):

    threads.append(threading.Thread(target=test_thread,args=(i,)))

#开启线程

for th in threads:

    th.start()

```


但是~~~,还是不行,跑了一段时间以后有的线程会报错


![](F:\文档\redis\python连接redis集群\3.PNG)


简单思考下,因为这样虽然开启了这么多的pool,但是对于30个线程来说,仍然有可能引发冲突,因为我们设置的是每个节点10个连接


但是30个线程在同一时间可能有11个访问到了同一个节点,所以会产生这个错误。所以这里应该改成max_connections=30


但是如果我们连接池只想创建20个呢?我们需要自己加锁判断吗?这样是否太麻烦了?


of course not~


在上面大家应该注意到,我在引入的时候还另外引入了一个ClusterBlockingConnectionPool,这个注释中写的是


```

Thread-safe blocking connection pool for Redis Cluster::

```


这是一个线程安全的连接池,继承自ClusterConnectionPool,


使用我们仅需要将ClusterConnectionPool替换为ClusterBlockingConnectionPool即可。


至此已经可以正常使用redis连接池。




## 时间对比:


多线程连接池方式30个线程每个线程插入100个共3000次数据耗时5.1秒


单线程访问插入1000个数据用时49秒,如果3000个应该在150秒左右,大概差了不到30倍,符合我们开的30个线程


## 空闲超时


在没有经过测试之前,我有一个担忧就是服务端设置了空闲超时,我们这个python客户端会被服务端断开,那么单机还好,因为每次是单独的创建连接,那么连接池呢?


我们可以在test_thread中加入一个延时,然后过一段时间让连接空闲并超时以后再访问。经过测试,还是能连接的。




相关文章