REDIS 如何利用python 操作redis 集群 (投稿文章)
注明: 此篇文章为投稿文字, 投稿人 闫树爽, (程序员, 目前从事REDIS ,MONGODB ,以及数据库运维自动化代码工作)
在NOSQL 数据库中的操作,与关系型数据库不同的是,会一门程序对于NOSQL的数据库操控是十分有利的,下面的内容是关于如何利用python程序来操作redis 集群的说明.
## 利用python操作redis集群
redis的cluster模式为大型应用中常用的方式,今天学习如何使用redis-py-cluster来操作redis集群
首先安装redis-py-cluster
```
pip install redis-py-cluster
```
基本用法
```python
from rediscluster import RedisCluster
from string import ascii_letters
import random
conn = RedisCluster(host="127.0.0.1", port=6379, password='password')#创建连接
for i in range(10000):
key = ''.join(random.sample(ascii_letters, k=7))#创建一个随机字符串作为key
# ex = random.randint(1, 60 * 60 * 24)
conn.set(key, i, ex=500)
print(i)
```
此时可以发现reidis中已经有了我们的数据
然而在现实使用中,redis的使用往往涉及高并发,每次都重新创建连接不是我们所建议的方式,我们可以使用连接池来创建连接,并通过多线程来进行访问。
redis-py-cluster的官方文档写的比较简单并没有给出详细的连接池使用方式,但是好在python能够查看源码,我们可以看到其中有一个ClusterConnectionPool类,这个从命名来看应该是连接池的。进源码去看这个类是继承自redis库的ConnectionPool。
直接连接尝试
```python
from rediscluster import ClusterBlockingConnectionPool,RedisCluster,ClusterConnectionPool
startup_nodes = [
{'host': '10.50.132.92', 'port': 6379},
{'host': '10.50.132.93', 'port': 6379},
{'host': '10.50.132.94', 'port': 6379},
]
pool = ClusterConnectionPool(startup_nodes=startup_nodes,password="xxxx")
```
## 处理报错
会发现报错(个别现象,原因下面说),如果未遇到错误可以跳过这小节
```
redis.exceptions.ResponseError: unknown command `CONFIG`, with args beginning with: `GET`, `cluster-require-full-coverage`,
```
原因是因为我将redis中的config命令rename掉了,但是在默认情况下,会检查槽的完整性,所以会使用到config命令,但是config命令已经被我rename掉了啊,所以会报这个错误。解决方法有两种:
1、修改自己的代码,这个参数就是字面意思,跳过完整性检查,这样也就不会调用config命令了。
```python
pool = ClusterBlockingConnectionPool(startup_nodes=startup_nodes,password="xxxx",skip_full_coverage_check=True)
```
2、修改库的代码,这样的好处是还是能够检查完整行,但是缺点是在进行项目迁移的时候还要再改一遍源码,修改方法如下:
首先找到redis包中的client.py,一般在site-packages/redis/client.py
如果找不到可以从报错的地方快速找到redis包的安装路径
![](F:\文档\redis\python连接redis集群\1.PNG)
找到这个1239行
```python
def config_get(self, pattern="*"):
"Return a dictionary of configuration based on the ``pattern``"
return self.execute_command('CONFIG GET', pattern)
```
修改这个CONFIG GET为rename后的命令就可以了
通过以上两种方法,可以正常使用连接池了
## 连接池的使用
这个连接池的使用方法如下
```python
from rediscluster import ClusterBlockingConnectionPool,RedisCluster,ClusterConnectionPool
startup_nodes = [
{'host': '10.50.132.92', 'port': 6379},
{'host': '10.50.132.93', 'port': 6379},
{'host': '10.50.132.94', 'port': 6379},
]
pool = ClusterConnectionPool(startup_nodes=startup_nodes,password="xxxx")
client = RedisCluster(connection_pool=pool)
```
这里的pool使用上面的方式就可以得到了一个client
## 实际使用
我们可以用多线程为例子,使用连接池来操作redis
```python
from rediscluster import ClusterBlockingConnectionPool,RedisCluster,ClusterConnectionPool
import threading
from string import ascii_letters
import random
startup_nodes = [
{'host': '10.50.132.92', 'port': 6379},
{'host': '10.50.132.93', 'port': 6379},
{'host': '10.50.132.94', 'port': 6379},
]
pool = ClusterConnectionPool(startup_nodes=startup_nodes,password="xxxx",max_connections=10)
threads=[]
lock = threading.Lock()
def test_thread(thread_id):
client = RedisCluster(connection_pool=pool)
for i in range(100):
key = ''.join(random.sample(ascii_letters, k=7))
client.set(key,random.randint(1,100),ex=100)
lock.acquire()
print(f"Thread-{thread_id}:processed {i+1} times")
lock.release()
#创建线程
for i in range(10):
threads.append(threading.Thread(target=test_thread,args=(i,)))
#开启线程
for th in threads:
th.start()
```
此处我们的线程池里面放10个连接,然后开启10个线程,为了显示的不太难看,我们在print的地方加了个锁,我们期待这个样能够正常的10个线程同时向redis里面写数据,但是实际却发现
![](F:\文档\redis\python连接redis集群\2.PNG)
会显示有太多的连接,猜测应该是连接池中的连接不够,所以我们调大max_connections参数为50,发现可以正常使用了,但是为什么呢?
熟悉redis的同学可能一下就猜测到了原因。因为我们的集群模式,key键是要根据hash值来分配的,具体连接到那个我们插入之间是不知道的,所以在连接创建之前,客户端也是不知道的,所以显示出这个。在初始化函数中果然找到了相应的参数max_connections_per_node。我们将它设置为True。
这样测试,我们将max_connections设置为10,同时将max_connections_per_node设置为True
这样一来每个节点都有10个连接,我们可以开启30个线程,如下
```python
from rediscluster import ClusterBlockingConnectionPool,RedisCluster,ClusterConnectionPool
import threading
from string import ascii_letters
import random
startup_nodes = [
{'host': '10.50.132.92', 'port': 6379},
{'host': '10.50.132.93', 'port': 6379},
{'host': '10.50.132.94', 'port': 6379},
]
pool = ClusterConnectionPool(startup_nodes=startup_nodes,password="xxx",max_connections=10,skip_full_coverage_check=True,max_connections_per_node=True)
threads=[]
lock = threading.Lock()
def test_thread(thread_id):
client = RedisCluster(connection_pool=pool)
for i in range(100):
key = ''.join(random.sample(ascii_letters, k=7))
client.set(key,random.randint(1,100),ex=100)
lock.acquire()
print(f"Thread-{thread_id}:processed {i+1} times")
lock.release()
#创建线程
for i in range(30):
threads.append(threading.Thread(target=test_thread,args=(i,)))
#开启线程
for th in threads:
th.start()
```
但是~~~,还是不行,跑了一段时间以后有的线程会报错
![](F:\文档\redis\python连接redis集群\3.PNG)
简单思考下,因为这样虽然开启了这么多的pool,但是对于30个线程来说,仍然有可能引发冲突,因为我们设置的是每个节点10个连接
但是30个线程在同一时间可能有11个访问到了同一个节点,所以会产生这个错误。所以这里应该改成max_connections=30
但是如果我们连接池只想创建20个呢?我们需要自己加锁判断吗?这样是否太麻烦了?
of course not~
在上面大家应该注意到,我在引入的时候还另外引入了一个ClusterBlockingConnectionPool,这个注释中写的是
```
Thread-safe blocking connection pool for Redis Cluster::
```
这是一个线程安全的连接池,继承自ClusterConnectionPool,
使用我们仅需要将ClusterConnectionPool替换为ClusterBlockingConnectionPool即可。
至此已经可以正常使用redis连接池。
## 时间对比:
多线程连接池方式30个线程每个线程插入100个共3000次数据耗时5.1秒
单线程访问插入1000个数据用时49秒,如果3000个应该在150秒左右,大概差了不到30倍,符合我们开的30个线程
## 空闲超时
在没有经过测试之前,我有一个担忧就是服务端设置了空闲超时,我们这个python客户端会被服务端断开,那么单机还好,因为每次是单独的创建连接,那么连接池呢?
我们可以在test_thread中加入一个延时,然后过一段时间让连接空闲并超时以后再访问。经过测试,还是能连接的。
相关文章