Python Day10
死锁
所谓死锁:是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象
如下就是死锁:
from threading import Thread,Lock
import time
mutexA=Lock()
mutexB=Lock()
class MyThread(Thread):
def run(self):
self.f1()
self.f2()
def f1(self):
mutexA.acquire()
print('%s 拿到了A锁' %self.name)
mutexB.acquire()
print('%s 拿到了B锁' % self.name)
mutexB.release() #1
mutexA.release() #0
def f2(self):
mutexB.acquire()
print('%s 拿到了B锁' % self.name)
time.sleep(0.1) #线程Thread-1执行到这里发生了阻塞,此时线程Thread-2已经运行起来了,因此抢到了A锁
mutexA.acquire() #线程Thread-1再想抢A锁时A锁已经在线程Thread-2手里
print('%s 拿到了A锁' % self.name)
mutexA.release()
mutexB.release()
if __name__ == '__main__':
for i in range(10):
t=MyThread()
t.start()
运行结果:
Thread-1 拿到了A锁
Thread-1 拿到了B锁
Thread-1 拿到了B锁
Thread-2 拿到了A锁
递归锁
解决方法,递归锁,在python中为了支持在同一线程中多次请求同一资源,Python提供了可重入锁RLock。
这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。
直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁。
二者的区别是:递归锁可以连续acquire多次,而互斥锁只能acquire一次
from threading import Thread,Lock,RLock
import time
mutexA=mutexB=RLock() #这里实际还是一把锁,只是这样写下面的代码就不用变动了
#一个线程拿到锁,counter加1,该线程内又碰到加锁的情况,则counter继续加1,这期间所有其他线程都只能等待,等待该线程释放所有锁,即counter递减到0为止
class MyThread(Thread):
def run(self):
self.f1()
self.f2()
def f1(self):
mutexA.acquire()
print('%s 拿到了A锁' %self.name)
mutexB.acquire()
print('%s 拿到了B锁' % self.name)
mutexB.release() #1
mutexA.release() #0
def f2(self):
mutexB.acquire()
print('%s 拿到了B锁' % self.name)
time.sleep(0.1)
mutexA.acquire()
print('%s 拿到了A锁' % self.name)
mutexA.release()
mutexB.release()
if __name__ == '__main__':
for i in range(10):
t=MyThread()
t.start()
信号量也是一把锁,可以指定信号量为5(也可以理解为门口挂着的钥匙数量),对比互斥锁同一时间只能有一个任务抢到锁去执行,信号量同一时间可以有5个任务拿到锁去执行
如果说互斥锁是合租房屋的人去抢一个厕所,那么信号量就相当于一群路人争抢公共厕所
公共厕所有多个坑位,这意味着同一时间可以有多个人上公共厕所,但公共厕所容纳的人数是一定的(挂着的钥匙数量有限),这便是信号量的大小
from threading import Thread,Semaphore,current_thread
import time,random
def func():
with sm: #等于sm.acquire()+sm.release()
print('%s get sm' %current_thread().getName())
time.sleep(random.randint(1,3))
if __name__ == '__main__':
sm=Semaphore(5)
for i in range(20):
t=Thread(target=func)
t.start()
Semaphore管理一个内置的计数器,
每当调用acquire()时内置计数器-1;
调用release() 时内置计数器+1;
计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()
互斥锁与信号量推荐博客
线程的一个关键特性是每个线程都是独立运行且状态不可预测。
如果程序中的其他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。
为了解决这些问题,我们需要使用threading库中的Event对象。
对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。
在初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。
一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。
如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行
方法
from threading import Event
event.isSet():返回event的状态值;
event.wait():如果 event.isSet()==False将阻塞线程;
event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;
event.clear():恢复event的状态值为False。
示例代码
例如,有多个工作线程尝试链接Mysql,我们想要在链接前确保mysql服务正常才让那些工作线程去连接Mysql服务器,如果连接不成功,都会去尝试重新连接。
那么我们就可以采用threading.Event机制来协调各个工作线程的连接操作
from threading import Thread,Event,current_thread
import time
event = Event() #信号初始值为False
def check():
print('checking MySQL...')
time.sleep(5)
event.set() #设置event的状态值为True
def conn():
count = 1
while not event.isSet(): #如果event.isSet()返回值为False则成立
if count > 3:
raise TimeoutError('超时') #判断计数大于3则主动退出进程并报错
print('%s wait to connect MySQL 连接%s次' %(current_thread().getName(),count))
event.wait(2) #如果event.isSet()==False将阻塞线程,同时设置超时事件为2秒,即2秒后event的状态值为False也会往下执行代码
count += 1
print('%s connect MySQL' %current_thread().getName())
if __name__ == '__main__':
t1 = Thread(target=check)
t2 = Thread(target=conn,name='线程2')
t3 = Thread(target=conn,name='线程3')
t4 = Thread(target=conn,name='线程4')
t1.start()
t2.start()
t3.start()
t4.start()
运行结果:
checking MySQL...
线程2 wait to connect MySQL 连接1次
线程3 wait to connect MySQL 连接1次
线程4 wait to connect MySQL 连接1次
线程2 wait to connect MySQL 连接2次
线程4 wait to connect MySQL 连接2次
线程3 wait to connect MySQL 连接2次
线程2 wait to connect MySQL 连接3次
线程4 wait to connect MySQL 连接3次
线程3 wait to connect MySQL 连接3次
线程3 connect MySQL
线程2 connect MySQL
线程4 connect MySQL
定时器,指定n秒后执行某操作
示例代码:
from threading import Timer
def hello(name):
print('hello, world %s' %name)
t = Timer(3,hello,args=('dzm',))
#def __init__(self, interval, function, args=None, kwargs=None)
#第一个参数代表等待几秒,第二个参数是函数名,后面是传参(元祖形式)
t.start()
应用场景:验证码定时器
queue is especially useful in threaded programming when infORMation must be exchanged safely between multiple threads.
有三种不同的用法
队列:先进先出
import queue
q = queue.Queue(3)
q.put(1)
q.put(2)
q.put(3)
#q.put_nowait(4) #不阻塞,队列满了直接报错
#q.put(4,block=False) #与q.put_nowait(4)相同
q.put(5,block=True,timeout=3) #阻塞,等待3秒
print(q.get())
print(q.get())
print(q.get())
#print(q.get())
堆栈:last in fisrt out 后进先出
import queue
q=queue.LifoQueue(3)
q.put(1)
q.put(2)
q.put(3)
print(q.get())
print(q.get())
print(q.get())
执行结果:
3
2
1
优先级队列:存储数据时可设置优先级的队列
import queue
q=queue.PriorityQueue(3)
q.put((10,'a')) #数字越小优先级越高,括号里可以是元祖也可以是列表
q.put((-3,'b'))
q.put((100,'c'))
print(q.get())
print(q.get())
print(q.get())
执行结果:
(-3, 'b')
(10, 'a')
(100, 'c')
concurrent.futures模块提供了高度封装的异步调用接口
ThreadPoolExecutor:线程池,提供异步调用
ProcessPoolExecutor: 进程池,提供异步调用
基本方法
1、submit(fn, *args, **kwargs)
异步提交任务2、map(func, *iterables, timeout=None, chunksize=1)
取代for循环submit的操作3、shutdown(wait=True)
相当于进程池的pool.close()+pool.join()操作
wait=True,等待池内所有任务执行完毕回收完资源后才继续
wait=False,立即返回,并不会等待池内的任务执行完毕
但不管wait参数为何值,整个程序都会等到所有任务执行完毕
submit和map必须在shutdown之前4、result(timeout=None)
取得结果5、add_done_callback(fn)
回调函数
可以为进程池或线程池内的每个进程或线程绑定一个函数,该函数在进程或线程的任务执行完毕后自动触发,并接收任务的返回值当作参数,该函数称为回调函数
提交任务的两种方式
同步调用:提交完任务后,就在原地等待,等待任务执行完毕,拿到任务的返回值,才能继续下一行代码,导致程序串行执行
异步调用+回调机制:提交完任务后,不在原地等待,任务一旦执行完毕就会触发回调函数的执行,程序是并发执行的
进程的执行状态
同步调用是一种提交任务的方式,阻塞是指程序遇到I/O时进入的一种状态
阻塞
非阻塞
进程池
示例代码1:
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import time,random,os
def task(n):
print('%s is running' %os.getpid())
time.sleep(random.randint(1,3))
return n**2
if __name__ == '__main__':
pool = ProcessPoolExecutor(2) #指定最大进程数,默认不要超过cpu核数的两倍
for i in range(5):
pool.submit(task,i)
#把任务提交到pool,pool.submit会得到一个对象,对象下面有个result()方法,result就是拿到执行结果
#但在这一步执行result()方法后会使程序变成串行执行(同步调用)
print('主')
执行结果:
主
18264 is running
544 is running
544 is running
544 is running
18264 is running
示例代码2:异步调用 + 回调机制
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time, random, os
def task(n):
print('%s is running' % os.getpid())
time.sleep(random.randint(1, 3))
return n ** 2
def handel(res):
res = res.result() #在这一步执行result()方法取得结果
print('handle res %s' %res)
if __name__ == '__main__':
# 异步调用 + 回调机制
pool = ProcessPoolExecutor(2) # 指定最大进程数,默认不要超过cpu核数的两倍
for i in range(5):
obj=pool.submit(task, i) #得到一个对象
obj.add_done_callback(handel) #回调函数
pool.shutdown(wait=True) # 不允许再往进程池内提交任务,等进程池内所有进程都运行完毕再执行下一行代码,wait默认等于True
print('主')
运行结果:
15124 is running
9212 is running
15124 is running
handle res 0
15124 is running
handle res 4
9212 is running
handle res 1
handle res 9
handle res 16
主
线程池
示例代码:
from concurrent.futures import ThreadPoolExecutor
from threading import current_thread
import requests
def get(url):
print('%s GET %s' % (current_thread().getName(), url))
response = requests.get(url)
if response.status_code == 200:
return {'url': url, 'content': response.text}
def parse(res):
res=res.result() #得到返回结果
print('parse:[%s] res:[%s]' %(res['url'],len(res['content'])))
if __name__ == '__main__':
pool=ThreadPoolExecutor(2)
urls=[
'https://www.baidu.com',
'Https://www.python.org',
'http://www.oldboyedu.com',
'http://www.oldboyedu.com',
'http://www.oldboyedu.com',
'http://www.oldboyedu.com',
'http://www.oldboyedu.com',
'http://www.oldboyedu.com',
'http://www.oldboyedu.com',
'http://www.oldboyedu.com',
'http://www.oldboyedu.com',
'http://www.oldboyedu.com',
]
for url in urls:
pool.submit(get,url).add_done_callback(parse) #异步调用 + 回调机制
pool.shutdown
执行结果:
ThreadPoolExecutor-0_0 GET https://www.baidu.com
ThreadPoolExecutor-0_1 GET https://www.python.org
parse:[https://www.baidu.com] res:[2443]
ThreadPoolExecutor-0_0 GET http://www.oldboyedu.com
parse:[http://www.oldboyedu.com] res:[89173]
ThreadPoolExecutor-0_0 GET http://www.oldboyedu.com
parse:[http://www.oldboyedu.com] res:[89173]
ThreadPoolExecutor-0_0 GET http://www.oldboyedu.com
parse:[http://www.oldboyedu.com] res:[89173]
ThreadPoolExecutor-0_0 GET http://www.oldboyedu.com
parse:[http://www.oldboyedu.com] res:[89173]
ThreadPoolExecutor-0_0 GET http://www.oldboyedu.com
parse:[https://www.python.org] res:[48869]
ThreadPoolExecutor-0_1 GET http://www.oldboyedu.com
parse:[http://www.oldboyedu.com] res:[89173]
ThreadPoolExecutor-0_0 GET http://www.oldboyedu.com
parse:[http://www.oldboyedu.com] res:[89173]
ThreadPoolExecutor-0_1 GET http://www.oldboyedu.com
parse:[http://www.oldboyedu.com] res:[89173]
ThreadPoolExecutor-0_0 GET http://www.oldboyedu.com
parse:[http://www.oldboyedu.com] res:[89173]
ThreadPoolExecutor-0_1 GET http://www.oldboyedu.com
parse:[http://www.oldboyedu.com] res:[89173]
parse:[http://www.oldboyedu.com] res:[89173]
submit的实现机制
pool=ThreadPoolExecutor(2)
上面的pool是通过ThreadPoolExecutor实例化出来的
先拿到最大线程数量(设置的线程数,如这里的2),默认是cpu个数乘以5
拿到一个队列
拿到一把锁ThreadPoolExecutor下submit方法
加锁,submit时同一时间只能一个一个提交
判断如果shutdown等于True那么报错
把任务封装成了一个对象
把对象丢队列里面去了实际在第一次提交任务时就造好了线程,这些线程都去队列里面拿任务
submit的本质是把任务提交到队列里去,让线程去队列里拿任务,这算是线程queue的一个使用场景pool.shutdown(wait=True)做了什么?
把shutdown变为True,默认是False
map方法
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import os,time,random
def task(n):
print('%s is runing' %os.getpid())
time.sleep(random.randint(1,3))
return n**2
if __name__ == '__main__':
executor=ThreadPoolExecutor(max_workers=3)
# for i in range(11):
# future=executor.submit(task,i)
executor.map(task,range(1,12)) #map取代了for+submit
协程介绍
本节的主题是基于单线程来实现并发,即只用一个主线程(很明显可利用的cpu只有一个)情况下实现并发,为此我们需要先回顾下并发的本质:切换+保存状态
cpu正在运行一个任务,会在两种情况下切走去执行其他的任务(切换由操作系统强制控制),
一种情况是该任务发生了阻塞,
另外一种情况是该任务计算的时间过长或有一个优先级更高的程序替代了它
其中第二种情况并不能提升效率,只是为了让cpu能够雨露均沾,实现看起来所有任务都被“同时”执行的效果,如果多个任务都是纯计算的,这种切换反而会降低效率。
对于单线程下,我们不可避免程序中出现io操作,但如果我们能在自己的程序中(即用户程序级别,而非操作系统级别)控制单线程下的多个任务能在一个任务遇到io阻塞时就切换到另外一个任务去计算,
这样就保证了该线程能够最大限度地处于就绪态,即随时都可以被cpu执行的状态,相当于我们在用户程序级别将自己的io操作最大限度地隐藏起来,
从而可以迷惑操作系统,让其看到:该线程好像是一直在计算,io比较少,从而更多的将cpu的执行权限分配给我们的线程。
协程的本质就是在单线程下,由用户自己控制一个任务遇到io阻塞了就切换另外一个任务去执行,以此来提升效率。
为了实现它,我们需要找寻一种可以同时满足以下条件的解决方案:
1. 可以控制多个任务之间的切换,切换之前将任务的状态保存下来,以便重新运行时,可以基于暂停的位置继续执行。
2. 作为1的补充:可以检测io操作,在遇到io操作的情况下才发生切换
一句话说明什么是协程:协程是一种用户态的轻量级线程,即协程是由用户程序自己控制调度的。
需要强调的是:
1. python的线程属于内核级别的,即由操作系统控制调度(如单线程遇到io或执行时间过长就会被迫交出cpu执行权限,切换其他线程运行)
2. 单线程内开启协程,一旦遇到io,就会从应用程序级别(而非操作系统)控制切换,以此来提升效率(!!!非io操作的切换与效率无关)
对比操作系统控制线程的切换,用户在单线程内控制协程的切换
优点如下:
1. 协程的切换开销更小,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级
2. 单线程内就可以实现并发的效果,最大限度地利用cpu
缺点如下:
1. 协程的本质是单线程下,无法利用多核,可以是一个程序开启多个进程,每个进程内开启多个线程,每个线程内开启协程
2. 协程指的是单个线程,因而一旦协程出现阻塞,将会阻塞整个线程
总结协程特点:
必须在只有一个单线程里实现并发
修改共享数据不需加锁
用户程序里自己保存多个控制流的上下文栈
附加:一个协程遇到IO操作自动切换到其它协程(如何实现检测IO,yield、greenlet都无法实现,就用到了gevent模块(select机制))
gevent模块
安装
pip3 install gevent
Gevent 是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。
Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。
#用法
g1=gevent.spawn(func,1,,2,3,x=4,y=5)创建一个协程对象g1,spawn括号内第一个参数是函数名,如eat,后面可以有多个参数,可以是位置实参或关键字实参,都是传给函数eat的
g2=gevent.spawn(func2)
g1.join() #等待g1结束
g2.join() #等待g2结束
#或者上述两步合作一步:gevent.joinall([g1,g2])
g1.value#拿到func1的返回值
from gevent import monkey;monkey.patch_all()必须放到被打补丁者的前面,如time,Socket模块之前
或者我们干脆记忆成:要用gevent,需要将from gevent import monkey;monkey.patch_all()放到文件的开头
示例代码:
from gevent import monkey;monkey.patch_all()
import gevent
import time
def eat():
print('eat food 1')
time.sleep(2)
print('eat food 2')
def play():
print('play 1')
time.sleep(1)
print('play 2')
g1=gevent.spawn(eat)
g2=gevent.spawn(play)
gevent.joinall([g1,g2]) #列表、元祖的形式都可以
print('主')
运行结果:
eat food 1
play 1
play 2
eat food 2
主
相关文章