Python:线程、进程与协程(2)—

2023-01-31 05:01:10 python 线程 进程

    上一篇博文介绍了python线程、进程与协程的基本概念,通过这几天的学习总结,下面来讲讲Python的threading模块。首先来看看threading模块有哪些方法和类吧。

wKiom1gQv67C2z1tAAEA0zd2WHY584.png-wh_50


主要有:

Thread :线程类,这是用的最多的一个类,可以指定线程函数执行或者继承自它都可以实现子线程功能。

Timer:与Thread类似,但要等待一段时间后才开始运行,是Thread的子类。

Lock :原,是一个同步原语,当它锁住时不归某个特定的线程所有,这个可以对全局变量互斥时使用。

RLock :可重入锁,使单线程可以再次获得已经获得的锁,即可以被相同的线程获得多次。

Condition :条件变量,能让一个线程停下来,等待其他线程满足某个“条件”。

Event:事件对象,是线程间最简单的通信机制之一:线程可以激活在一个事件对象上等待的其他线程。

Semaphore:信号量对象,是个变量,管理一个内置的计数器,指定可同时访问资源或者进入临界区的线程数。

BoundedSemaphore :有界信号量对象,与semaphore类似,但不允许超过初始值;

ThreadError:线程错误信息类。


active_count()和activeCount():返回当前活着的Thread对象个数。

current_thread()和currentThread():返回当前的Thread对象,对应于调用者控制的线程。如果调用者控制的线程不是通过threading模块创建的,则返回一个只有有限功能的虚假线程对象。

enumerate():返回当前活着的Thread对象的列表。该列表包括守护线程、由current_thread()创建的虚假线程对象和主线程。它不包括终止的线程和还没有开始的线程。

settrace(func):为所有从threading模块启动的线程设置一个跟踪函数。在每个线程的run()方法调用之前,func将传递给sys.settrace()(该函数是设置系统的跟踪函数)。

setprofile(func):为所有从threading模块启动的线程设置一个profile函数。在每个线程的run()调用之前,func将传递给sys.setprofile()(这个函数用于设置系统的探查函数)。

stack_size([size]):返回创建新的线程时该线程使用的栈的大小,

可选的size参数指定后来创建的线程使用栈的大小,它必须是0(使用平台的或者配置的默认值)或不少于32,768(32kB)的正整数。


其它一些以"_"开头的,有些是引入其它模块的函数,然后起了个别名,比如_fORMat_exc,它的定义如下:

from traceback import format_exc as _format_exc


有些其实是类,比如RLock的定义如下:

def RLock(*args, **kwargs):
    """Factory function that returns a new reentrant lock.

    A reentrant lock must be released by the thread that acquired it. Once a
    thread has acquired a reentrant lock, the same thread may acquire it again
    without blocking; the thread must release it once for each time it has
    acquired it.

    """
    return _RLock(*args, **kwargs)

_RLock其实是个类,内部调用的类,RLock其实是个函数,返回一个类,感兴趣的可以去看看threading.py相关代码。

    由于篇幅比较长,不正之处欢迎批评指正,当然更希望大家有耐心、仔细看完,同时能实践一下示例并仔细体会。

(一)Thread对象

    这个类表示在单独的一个控制线程中运行的一个活动。有两种创建方法:创建线程要执行的函数,把这个函数传递进Thread对象里,让它来执行;而是从Thread类中继承,然后在子类中覆盖run()方法,在子类中不应该覆盖其它方法(__init__()除外),也就是只覆盖该类的__init__()和run()方法。


Thread类主要方法:

start():开始线程的活动。每个线程对象必须只能调用它一次。

run():表示线程活动的方法,可以在子类中覆盖这个方法。

join([timeout]):是用来阻塞当前上下文,直至该线程运行结束,一个线程可以被join()多次,timeout单位是秒。

name:一个字符串,只用于标识的目的。它没有语义。多个线程可以被赋予相同的名字。初始的名字通过构造函数设置。

getName()/setName():作用于name的两个函数,从字面就知道是干嘛的,一个是获取线程名,一个是设置线程名

ident:线程的ID,如果线程还未启动则为None,它是一个非零的整数当一个线程退出另外一个线程创建时,线程的ID可以重用,即使在线程退出后,其ID仍然可以访问。

is_alive()/isAlive():判断线程是否还活着。

daemon:一个布尔值,指示线程是(True)否(False)是一个守护线程。它必须在调用start()之前设置,否则会引发RuntimeError。它的初始值继承自创建它的线程;主线程不是一个守护线程,所以在主线程中创建的所有线程默认daemon = False。

    何为守护线程?举个例子,在程序运行中,执行一个主线程,如果主线程又创建一个子线程,主线程和子线程就分兵两路,当主线程完成想退出时,会检验子线程是否完成。 对于普通线程,如果子线程的任务没有结束,主线程不会退出,整个程序也不会退出;对于守护线程,即使子线程任务还没有结束,如果主线程退出该线程也会退出。

isDaemon()/setDaemon():作用与daemon的函数,一个是判断是不是守护线程,一个是设置守护线程。


下面通过简单的例子来展示Thread的使用方法,希望对你理解能有帮助,当然更希望你能动手操作下,了解上面方法的用处。

将函数传递到Thread对象

#coding=utf-8
import threading
import datetime
import time
def thread_fun(num):
    time.sleep(num)
    now = datetime.datetime.now()
    print "线程名:%s ,now is %s"\
          %( threading.currentThread().getName(), now)

def main(thread_num):
    thread_list = list()
    # 先创建线程对象
    for i in range(0, thread_num):
        thread_name = "thread_%s" %i
        thread_list.append(threading.Thread(target = thread_fun, name = thread_name, args = (2,)))

    # 启动所有线程
    for thread in thread_list:
        thread.setName("Good")#修改线程名
        print thread.is_alive()#判断线程是否是活的
        print thread.ident
        thread.start()
        print thread.ident
        print thread.is_alive()
        thread.join()
        print thread.is_alive()

if __name__ == "__main__":
    main(3)


运行结果如下:

wKiom1gSCvmRS7xMAABlhUlLmGs922.png-wh_50

上面的例子只是展示了几个简单的Thread类的方法,其它方法大家可以自己动手试试,体会下。这里再讲下threading.Thread()

它的原型是:

class threading.Thread(group=None, target=None, name=None, args=(), kwargs={})

group应该为None;被保留用于未来实现了ThreadGroup类时的扩展。

target是将被run()方法调用的可调用对象。默认为None,表示不调用任何东西。

name是线程的名字。默认情况下,以“Thread-N”的形式构造一个唯一的名字,N是一个小的十进制整数。

args是给调用目标的参数元组。默认为()。

kwargs是给调用目标的关键字参数的一个字典。默认为{}。


继承自threading.Thread类:

继承的话,主要是重定义__init__()和run()。

#coding=utf-8
import threading

class MyThread(threading.Thread):
    def __init__(self,group=None,target=None,name=None,args=(),kwargs=None):
        threading.Thread.__init__(self,group,target,name,args,kwargs)

        self.name = "Thread_%s"%threading.active_count()

    def run(self):
        print  "I am %s" %self.name


if __name__ == "__main__":

    for thread in range(0, 5):
        t = MyThread()
        t.start()

运行结果就不贴出来了。

(二)Lock互斥锁

    如果多个线程访问同时同一个资源,那么就有可能对这个资源的安全性造成破坏,还好Python的threading模块中引入了互斥锁对象,可以保证共享数据操作的完整性。每个对象都对应于一个可称为” 互斥锁” 的标记,这个标记用来保证在任一时刻,只能有一个线程访问该对象这时候可以使用threading模块提供的Lock类。它主要有两个方法:acquire()和 release()

acquire(blocking=True, timeout=-1)

    获取锁,设置为locked状态,blocking参数表示是否阻塞当前线程等待,timeout表示阻塞时的等待时间 。如果成功地获得lock,则acquire()函数返回True,否则返回False,timeout超时时如果还没有获得lock仍然返回False。

release()

    释放锁,设置为unlocked状态,当在未锁定的锁上调用时,会引发ThreadError。没有返回值。

下面的代码是没有设锁时,100个线程访问一个资源,从运行结果来看对资源的完整性造成了一定的影响。

#coding=utf-8
import threading
import time
num = 0
class MyThread(threading.Thread):
    def __init__(self,target=None,name=None,args=(),kwargs=None):
        threading.Thread.__init__(self,target=target,name=name,args=args,kwargs=kwargs)
    def run(self):
        global num
        time.sleep(1)#这句不能少,要不然看不到对公共资源完整性的破坏,当然也可以将该句放在
        #num += 1的后面,有兴趣可以试试
        num += 1
        self.setName("Thread-%s" % num)
        print("I am %s, set counter:%s" % (self.name, num))
if __name__ == "__main__":
    for i in range(0, 100):
        my_thread = MyThread()
        my_thread.start()

运行结果如下:

wKiom1gT7kKT-EAiAAA_jGAy97o879.png-wh_50

看看部分数据的完整性遭到了“破坏”

再看看使用互斥锁的例子

#coding=utf-8
import threading
import time
num = 0
lock = threading.Lock()
class MyThread(threading.Thread):
    def __init__(self,target=None,name=None,args=(),kwargs=None):
        threading.Thread.__init__(self,target=target,name=name,args=args,kwargs=kwargs)
    def run(self):
        global num
        time.sleep(1)
        if lock.acquire():
            num += 1
            self.setName("Thread-%s" % num)
        print("I am %s, set counter:%s" % (self.name, num))
        lock.release()
if __name__ == "__main__":
    for i in range(0, 100):
        my_thread = MyThread()
        my_thread.start()

运行结果大家可以动手试试。

同步阻塞

    当一个线程调用Lock对象的acquire()方法获得锁时,这把锁就进入“locked”状态。因为每次只有一个线程可以获得锁,所以如果此时另一个线程试图获得这个锁,该线程就会变为“block“同步阻塞状态。直到拥有锁的线程调用锁的release()方法释放锁之后,该锁进入“unlocked”状态。线程调度程序从处于同步阻塞状态的线程中选择一个来获得锁,并使得该线程进入运行状态。


死锁

是指两个或两个以上的进程在执行过程中,由于竞争资源或者由于彼此通信而造成的一种阻塞的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程


(三)可重入锁RLock

用法和Lock用法一样,只RLock支持嵌套,而Lock不支持嵌套;RLock允许在同一线程中被多次acquire(如果使用RLock,那么acquire和release必须成对出现,即调用了n次acquire,必须调用n次的release才能真正释放所占用的琐),而Lock却不允许这种情况,否则会出现死锁。看下面例子:

#coding=utf-8
import threading
import time
num = 0
lock = threading.Lock()
class MyThread(threading.Thread):
    def __init__(self,target=None,name=None,args=(),kwargs=None):
        threading.Thread.__init__(self,target=target,name=name,args=args,kwargs=kwargs)
    def run(self):
        global num
        time.sleep(1)
        if lock.acquire():
            num += 1
            self.setName("Thread-%s" % num)
            print("I am %s, set counter:%s" % (self.name, num))
            if lock.acquire():
                num += 1
                self.setName("Thread-%s" % num)
                print("I am %s, set counter:%s" % (self.name, num))
            lock.release()

        lock.release()
if __name__ == "__main__":
    for i in range(0, 100):
        my_thread = MyThread()
        my_thread.start()

上面的例子使用Lock锁嵌套使用,运行后就出现了死锁,运行不下去,“卡住了”。

wKioL1gUED2j0n46AAAeil9hLHI239.png-wh_50

这个时候就出现了RLock,它支持在同一线程中可以多次请求同一资源。RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。所以将上面的代码中

lock = threading.Lock()

改为

lock = threading.RLock()

就不会出现死锁情况。

(四)条件变量Condition

        除了互斥锁外,对复杂线程同步问题,Python提供了Condition对象来支持解决。Condition被称为条件变量,除了提供与Lock类似的acquire和release方法外,还提供了wait和notify方法。线程首先acquire一个条件变量,然后判断一些条件。如果条件不满足则wait;如果条件满足,进行一些处理改变条件后,通过notify方法通知其他线程,其他处于wait状态的线程接到通知后会重新判断条件。不断的重复这一过程,从而解决复杂的同步问题。

除了acquire和release方法外还有如下方法:

wait([timeout]):  

  w释放内部所占用的琐,同时线程被挂起,直至接收到通知被唤醒或超时(如果提供了timeout参数的话)。当线程被唤醒并重新占有琐的时候,程序才会继续执行下去。

notify():

  唤醒一个挂起的线程(如果存在挂起的线程),该方法不会释放所占用的琐。

notify_all()/notifyAll()

  唤醒所有挂起的线程(如果存在挂起的线程),这些方法不会释放所占用的琐。

同时要主要这四个方法只有在占用琐(acquire)之后才能调用,否则将会报RuntimeError异常。

    Condition其实是一把高级的锁,它内部维护着一个锁对象(默认是RLock),当然你也可以通过它的构造函数传递一个Lock/RLock对象给它。我们可以看看它的__init__()函数的定义:

def __init__(self, lock=None, verbose=None):
        _Verbose.__init__(self, verbose)
        if lock is None:
            lock = RLock()
        self.__lock = lock
        # Export the lock's acquire() and release() methods
        self.acquire = lock.acquire
        self.release = lock.release
        # If the lock defines _release_save() and/or _acquire_restore(),
        # these override the default implementations (which just call
        # release() and acquire() on the lock).  Ditto for _is_owned().
        try:
            self._release_save = lock._release_save
        except AttributeError:
            pass
        try:
            self._acquire_restore = lock._acquire_restore
        except AttributeError:
            pass
        try:
            self._is_owned = lock._is_owned
        except AttributeError:
            pass
        self.__waiters = []

是不是一目了然。条件变量的经典问题就是生产者与消费者的例子了。假设某代工厂的两个车间,一个车间生产某种配件,另一个车间需要这些配件进行组装产品,那么前者就是生产者(Producer),后者就是消费者(Consumer),当配件数低于1000就需要生产,多余5000个可以暂停生产,生产者一秒中可生产200个,消费者的策略是剩余数量少于800就不组装了,一秒钟可消费100个。代码如下

#coding=utf-8
import threading
import time
left_num = 500#假设初始剩余数为500
con = threading.Condition()

class Producer(threading.Thread):
    """生产者"""
    def __init__(self, target=None, name=None, args=(), kwargs=None):
        threading.Thread.__init__(self, target=target, name=name, args=args, kwargs=kwargs)
    def run(self):
        global left_num
        is_product = True
        while True:
            if con.acquire():
                if left_num > 5000:#大于5000可以暂停生产
                    is_product = False#不用继续生产
                    con.wait()#该“生产线”挂起
                elif left_num < 1000:
                    is_product = True#继续生产
                    left_num += 200
                    msg = self.name+' produce 200, left_num=%s'%left_num
                    print msg
                    con.notify()
                else:
                    #对于剩余数量处在中间的,就要分情况讨论了,一旦开启“生产”就要等
                    #到生产到指定数目才能停止,仔细想想很好理解的
                    if is_product:
                        left_num += 200
                        msg = self.name + ' produce 200, left_num=%s' % left_num
                        print msg
                        con.notify()
                    else:
                        con.wait()
                con.release()
                time.sleep(1)

class Consumer(threading.Thread):
    """消费者"""
    def __init__(self, target=None, name=None, args=(), kwargs=None):
        threading.Thread.__init__(self, target=target, name=name, args=args, kwargs=kwargs)
    def run(self):
        global left_num
        while True:
            if con.acquire():
                if left_num < 800:#少于800不组装
                    con.wait()
                else:
                    left_num -= 100
                    msg = self.name+' consume 100, left_num=%s'%left_num
                    print msg
                    con.notify()
                con.release()
                time.sleep(1)


def test():
    for i in range(1):
        p = Producer()
        p.start()
    for i in range(1):
        c = Consumer()
        c.start()
if __name__ == '__main__':
    test()

        大家可以把代码复制下来运行看看,仔细体会下,当累计剩余数大于5000后,就停止了“生产",之后只有消费者在运作,当剩余数低于1000,生产者”生成线"重启生产,直到累计剩余数大于5000,后又停止生产,以此类推。示例中展示的都是生产者与消费者都只有一条”流水线”,实际上,生产者有多条”流水线",消费者也有多条"流水线“,其实都一样,将上面代码的for循环的范围改下就可以了。还有一点,Condition可以接受一个Lock/RLock对象,大家可以试试创建一个Lock/RLock传递给Condition,如下:

lock = threading.Lock()
con = threading.Condition(lock)


效果是一样的,大家可以动手试试。


    

(五)事件对象Event

        事件对象是线程间最简单的通信机制之一:线程可以激活在一个事件对象上等待的其他线程。类似于一个线程向其它多个线程“发号施令”的模式,其它线程都会持有一个Event的对象,这些线程都会等待这个事件的“发生”,如果此事件一直不发生,那么这些线程将会阻塞,直至事件的“发生”。

主要方法有:

set():

将内部标志设置为true。 所有等待它成为真正的线程都被唤醒。如果内部标志 false,当程序执行 event.wait() 方法时就会阻塞;如果值为True,event.wait() 方法

clear():

将内部标志重置为false,随后调用wait()的线程将阻塞,直到调用set(),再次将内部标志设置为true。

wait([timeout]):

用来阻塞当前线程,直到event的内部标志位被设置为true或者timeout超时。如果内部标志位为true则wait()函数立即返回。当timeout参数存在而不是None时,它应该是一个浮点数,以秒为单位指定操作的超时(或其分数)。

is_set()/isSet():

判断内部标准的值,为true则返回true,为false则返回false。

看下面一个简单的例子吧

#coding=utf-8
import threading
import time

def worker(event,name):
    print('Thread %s Waiting for Redis ready...'%name)
    event.wait()
    print('redis ready, and connect to redis server and Thread %s do some work at %s'% (name,time.ctime()))
    time.sleep(1)

readis_ready = threading.Event()
t1 = threading.Thread(target=worker, args=(readis_ready,"t1"), name='t1')
t1.start()

t2 = threading.Thread(target=worker, args=(readis_ready,"t2"), name='t2')
t2.start()

print('first of all, check redis server, make sure it is OK, and then trigger the redis ready event')
time.sleep(3) 
readis_ready.set()

运行结果如下:

wKiom1gVlEPBrfzoAABWqqM8-z4186.png-wh_50


     t1和t2线程开始的时候都阻塞在等待redis服务器启动的地方,一旦主线程确定了redis服务器已经正常启动,那么会触发redis_ready事件,各个工作线程就会去连接redis去做响应的工作。


(六)信号量Semaphore/BoundedSemaphore

    Semaphore是一个变量,控制着对公共资源或者临界区的访问,维护着一个计数器,指定可同时访问资源或者进入临界区的线程数。每次有一个线程获得信号量时,计数器-1。若计数器为0,其他线程就停止访问信号量,直到另一个线程释放信号量,同时计数器不能为负数。

    Semaphore对象接受一个正整数参数,默认情况是1,即threading.Semaphore([value])

  主要方法是acquire()和release()。调用acquire()时,获取信号量并且计数器减1;调用release()时释放信号量,并且计数器加1。

    BoundedSemaphore 与Semaphore的唯一区别在于前者将在调用release()时检查计数器的值是否超过了计数器的初始值,如果超过了将抛出一个异常。下面有个简单的例子:

#coding=utf-8
import threading
import time

semaphore = threading.Semaphore(5)#信号量
def func():
    if semaphore.acquire():
        time.sleep(2)
        print (threading.currentThread().getName() + ' get semaphore')
        semaphore.release()

for i in range(10):
  t1 = threading.Thread(target=func)
  t1.start()

开了十个线程,但使用semaphore限制了最多有5个进程同时执行,结果就不贴了,大家可以比较下使用semaphore和不使用semaphore时的效果,比较下就知道区别了。


(七)Timer 对象

    这个类表示一个动作应该在一个特定的时间之后运行,也就是一个定时器。Timer是Thread的子类, 因此也可以使用函数创建自定义线程。Timers通过调用它们的start()方法作为线程启动。timer可以通过调用cancel()方法(在它的动作开始之前)停止。timer在执行它的动作之前等待的时间间隔可能与用户指定的时间间隔不完全相同

threading.Timer(interval, function, args=[], kwargs={})

创建一个timer,在interval秒过去之后,它将以参数args和关键字参数kwargs运行function 。

cancel()

停止timer,并取消timer动作的执行。这只在timer仍然处于等待阶段时才工作。

这里贴出Timer的源码,其实用法和Thread对象一样,就多了一个参数interval.

def Timer(*args, **kwargs):
    """Factory function to create a Timer object.

    Timers call a function after a specified number of seconds:

        t = Timer(30.0, f, args=[], kwargs={})
        t.start()
        t.cancel()     # stop the timer's action if it's still waiting

    """
    return _Timer(*args, **kwargs)

class _Timer(Thread):
    """Call a function after a specified number of seconds:

            t = Timer(30.0, f, args=[], kwargs={})
            t.start()
            t.cancel()     # stop the timer's action if it's still waiting

    """

    def __init__(self, interval, function, args=[], kwargs={}):
        Thread.__init__(self)
        self.interval = interval
        self.function = function
        self.args = args
        self.kwargs = kwargs
        self.finished = Event()

    def cancel(self):
        """Stop the timer if it hasn't finished yet"""
        self.finished.set()

    def run(self):
        self.finished.wait(self.interval)
        if not self.finished.is_set():
            self.function(*self.args, **self.kwargs)
        self.finished.set()


另外,Lock、RLock、Condition、Semaphore和BoundedSemaphore对象里定义了__enter__()和__exit__()函数,而这两个真是上下文管理协议的必要条件(有兴趣可以参看之前博文 Python:with语句和上下文管理器对象),这就说明,在with语句中可以使用这些对象。比如最简单的

import threading
some_rlock = threading.RLock()
with some_rlock:
    print "some_rlock is locked while this executes"

有兴趣的可以去看看threading.py里面的源码。


相关文章