Python进程/线程/协程

2023-01-31 05:01:24 python 线程 进程

第1章 操作系统历史

1.1为什么要有操作系统?

程序员无法把所有的硬件操作细节全部了解到,管理这些硬件并且加以优化使用时非常繁琐的工作,这个繁琐的工作就是由操作系统来干的,有了它,程序员就从这些繁琐的工作中解脱了出来,只需要考虑自己的应用软件编写就可以了,应用软件直接使用操作系统提供的功能来间接使用硬件

1.2什么事操作系统?

操作系统就是一个协调,管理和控制计算机硬件资源和软件资源的控制程序

1.3操作系统历史:

Ø  第一代计算机:真空管和穿孔卡片

优点:程序员在申请的时间段内独享整个资源,可以即时的调试自己的程序(发现bug可以立即处理)

缺点:浪费计算机资源,一个时间段内只有一个人使用。同一时刻只有一个程序在内存中,被cpu调用执行,比方说10个程序执行,都是串行的

Ø  第二代计算机:晶体管和批处理系统

       有点:批处理,节省了机时

       缺点:整个流程需要人参与控制,将磁盘搬来搬去,计算的过程仍然是串行,调试程序不便

Ø  第三代计算机:即成电路芯片和多道程序设计

多道技术:

多道技术指的是多个程序,多道技术的事件是为了解决多个程序竞争或者说共享一个资源的有序调度问题,解决方式即多路复用,多路复用分为时间上的复用和空间复用   

       空间复用:将内存分为几部分,每个部分放入一个程序,这样,同一时间内存中就有了多道程序

       时间复用:当一个程序在等待io时,另一个程序可以使用cpu,如果内存中可以同时存放足够多的作业,则cpu的利用率可以接近100%(操作系统采用了多道技术后,可以控制进程的切换,后者说进程之间去争抢cpu的执行权限,这种切换不仅会在一个进程遇到IO时进行,一个进程占用cpu时间过长也会切换,或者说被操作系统夺走cpu的执行权限)

2.1什么是进程?

正在运行的程序就是进程,而负责执行任务的就是CPU

2.2串行

一个任务完完整整的执行完了,在执行下一个任务

2.3并发

一个任务执行的一段时间,就切换到另一个任务

单核cpu + 多道技术就可以实现并发(并行也属于并发)

       cpu切换条件:

1.     遇到IO(网络延迟/文件读写)

2.     优先级更高的任务也会优先切换

3.     如果一个任务长时间占用cpu也会切换

2.4并行

多核cpu同时执行多个任务

3.1函数方式开启进程

子进程的开启是比较耗费时间和资源的,所以子进程的开启不建议过多

from multiprocessingimport Process
def task(x):
    print('%s is 子进程开始' %(x))
    print('%s is 子进程结束' %(x))

if __name__ == '__main__':
    p = Process(target=task, kwargs={'x':'主进程'})
    p.start()
   

3.2类方式开启进程

from multiprocessingimport Process

class MyProcess(Process):
    def__init__(self, x=None):
        super(MyProcess,self).__init__()
        self.x=x
    defrun(self):
        print('%s is 子进程开始' % self )

        print('%s is 子进程结束' % self )

if __name__ == '__main__':
    p=MyProcess('子进程')
    p.start()

3.3子进程的两种状态:

所有的子进程在执行完毕之后,并不会立即消失,但会保留进程号,进程时间等,此时就会变成僵尸进程,僵尸进程是所有子进程都会经历的一个状态,并且父进程和子进程的执行是异步的

Ø  僵尸进程

父进程没死,子进程还在,但是父进程不发送wait()/waitpid()给子进程,此时子进程就是僵尸进程,过多的僵尸进程将会造成服务器运行缓慢

Ø  孤儿进程

父进程死了,子进程还在,此时子进程就是孤儿进程,会被init(0)进程接管,所以系统中存在过多的孤儿进程并不会对服务器造成伤害

3.4等待子进程执行完毕的方法

from multiprocessingimport Process

import time

x=100
def task():
    print('子进程runing')
    globalx
    x=0
    time.sleep(2)

if __name__ == '__main__':
    p=Process(target=task)
    p.start()
    p.join()   #等待子进程执行完毕
    print(x)

3.5如何串行执行多个进程?

from multiprocessingimport Process

import time

x=100
def task(self):
    print('%s runing' %self)
    time.sleep(2)

if __name__ == '__main__':

    l=[]
    forinrange(5):
        p=Process(target=task,args=('子进程%s' %i,))
        l.append(p)
        p.start()

    forinl:
        p.join()

p.pid : 获取进程id
os.getpid() : 获取进程id
os.getppid() : 获取父进程id
p.name:进程的名称
p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置
p.is_alive():如果p仍然运行,返回True

4.1进程属性的使用

from multiprocessingimport Process

import time
import os

x=100
def task(x):
    print('%s runing %s' %(x, os.getppid() ))
    time.sleep(2)

if __name__ == '__main__':
    p=Process(target=task,name='进程',args=('子进程',))
    p.start()
    print(p.name)
    print(p.pid)
    print(p.is_alive())

进程间的数据是物理隔离的

5.1硬盘级别:互斥(模拟一个抢票功能,利用互斥锁保证数据的完整性)

import JSON
import os
from multiprocessing import Process,Lock
import time

def check():
    time.sleep(1)
    withopen('a.txt','r',encoding='utf-8') asf:
        dic=(json.load(f))
    print('%s查看剩余的票数%s' %(os.getpid(),dic['count']))

def getTicket():
    time.sleep(2)
    withopen('a.txt',mode='r',encoding='utf-8') as f:
        dic=json.load(f)

    ifdic['count'] > 0:
        dic['count'] -=1
        time.sleep(2)
        withopen('a.txt',mode='w',encoding='utf-8') as f:
            json.dump(dic,f)
        print('%s 购买成功' %os.getpid())
    else:
        print('购买失败')

def task(mutex):
    check()
    mutex.acquire()
    getTicket()
    mutex.release()

if __name__ == '__main__':
    mutex=Lock()
    forinrange(5):
        p=Process(target=task, args=(mutex, ))
        p.start()

Ø  join方法

等待所有的子进程执行完毕,程序会变成串行的

Ø  mutex方法

互斥锁会将共享的数据操作变成串行,降低了效率,但是保证了数据的一致性

5.2内存级别:IPC机制

栈:先进后出

队列:先进来的先出去

Ø  队列的基本操作:

from multiprocessingimport Queue

q=Queue(3)  #代表只能放三条数据
q.put('jiang')
q.put('hello')

print(q.get(block=True,timeout=2))
print(q.get(block=True,timeout=2))
print(q.get(block=True,timeout=2))

Ø  生产者消费者模型:(非常重要的概念,不同语言都可以实现其自己的生产消费模型)

import random
from multiprocessingimport Queue,Process

import time


def produter(q,name,food):
    forinrange(4):
        res='%s%s'%(food,i)
        time.sleep(random.randint(1,3))
        q.put(res)
        print('%s 生产了%s' %(name,res))
    q.put(None)

def consumer(q,name):
    while True:
        res=q.get()
        ifres == None:break
        
time.sleep(random.randint(1,3))
        print('%s 消费力%s' %(name,res))

if __name__ == '__main__':
    q=Queue()
    p=Process(target=produter,args=(q,'姜伯洋','包子'))
    c=Process(target=consumer,args=(q,'tom'))
    p.start()
    c.start()

Ø  开启多个生产者和消费这进程

import random
from multiprocessingimport Queue,Process

import time

def produter(q,name,food):
    forinrange(4):
        res='%s%s'%(food,i)
        time.sleep(random.randint(1,3))
        q.put(res)
        print('%s 生产了%s' %(name,res))
    q.put(None)

def consumer(q,name):
    while True:
        res=q.get()
        ifres == None:break
        
time.sleep(random.randint(1,3))
        print('%s 消费力%s' %(name,res))

if __name__ == '__main__':
    q=Queue()
    p1=Process(target=produter,args=(q,'姜伯洋','包子'))
    p2=Process(target=produter,args=(q,'a','饺子'))
    p3=Process(target=produter,args=(q,'b','饺子'))

    c1=Process(target=consumer,args=(q,'tom'))
    c2=Process(target=consumer,args=(q,'tom1'))

    p1.start()
    p2.start()
    p3.start()

    c1.start()
    c2.start()

    p1.join()
    p2.join()
    p3.join()

    q.put(None)
    q.put(None)

6.1为什么要有线程

进程的缺点:

1.     非常消耗资源

2.     如果开了过多的进程,cpu在上下文的切换非常耗时

线程的出现就是为了解决进程的以上两个缺点:

1.     线程的实体基本不拥有系统资源,只拥有可独立运行的资源即可,开销少

2.     线程是轻量级的,cpu切换速度非常快

6.2什么是线程?

1.     线程=====>轻量级的进程

2.     一个进程至少有一个线程

3.     线程是具体来执行任务的

6.3开启线程的两种方式

Ø  函数方式开启线程:

from threading import Thread
import time

def task(agrs):
    print('%s is running' % agrs)
    time.sleep(1)
    print('%s is done' % agrs)

if __name__ == '__main__':
    t=Thread(target=task,args=('子线程',))
    t.start()
    print('主进程')

Ø  类方式开启线程:

from threading import Thread
import time

class MyThread(Thread):
    def task(args):
        print('%s is running' % args)
        time.sleep(1)
        print('%s is done' % args)

if __name__ == '__main__':
    t=Thread(target=MyThread.task,args=('子线程',))

    t.start()
    print('主进程')

6.4进程和线程的速度对比

from threading import Thread
from multiprocessing import Process
import time

def task(agrs):
    print('%s is running' %agrs)
    time.sleep(1)
    print('%s is done' %agrs)
if __name__ == '__main__':
    t=Thread(target=task,args=('子线程',))
    p=Process(target=task,args=('子进程',))
    t.start()
    p.start()
    print('主进程')

进程和线程的使用场景:

线程:IO密集型操作(例如操作数据库,读写文件)

进程:CPU 密集型操作(例如大量计算,充分利用多核CPU)

6.5同一进程下,线程数据是共享的

from threading import Thread
import time

x=100
def task():
    global x
    x=0
if __name__ == '__main__':
    t=Thread(target=task,)
    t.start()
    print(x)

6.6线程的属性:

t.join()
print(t.is_alive())
print(t.getName())
print(t.name)
print('主',active_count())
print(enumerate())

6.7计算线程的执行时间(开启10个线程,统计总的运行时间)

from threading import Thread
import time

def task(agrs):
    print('%s is running' %agrs)
    time.sleep(1)
    print('%s is done' %agrs)
if __name__ == '__main__':
    start=time.time()
    l=[]
    for in range(10):
        t = Thread(target=task, args=('子线程',))
        t.start()
        l.append(t)
        
    for in l:
        t.join()
    stop=time.time()
    print(stop-start)

6.8守护线程

进程中的所有非守护线程执行完毕,守护线程才结束

from threading import Thread,current_thread
import time

def task():
    print('%s is running'%current_thread().name)
    time.sleep(3)
    print('%s is done'%current_thread().name)#因为是守护线程,所以其他非守护线程运行结束,守护线程也随之结束,所以,done并没有打印

if __name__ == '__main__':
    t=Thread(target=task,name='守护进程')
    t.daemon=True
    
t.start()
    print('主进程')

6.9线程的互斥锁

 

 

7.1开启进程池的方法

from multiprocessingimport Pool,Process
import time,os

def task(i):
    i+=1

if __name__ == '__main__':
    p=Pool(os.cpu_count())
    start=time.time()
    p.map(task,range(2000))
    p.close()
    p.join()
    stop=time.time()
    print(stop-start)

    start=time.time()
    l=[]
    forinrange(2000):
        p=Process(target=task,args=(i,))
        l.append(p)
        p.start()
    forinl:
        p.join()
    stop=time.time()
    print(stop-start)

7.2开启线程池的方法

from concurrent.futuresimport ThreadPoolExecutor,ProcessPoolExecutor

def task(i):
    print(i)

if __name__ == '__main__':
    t=ThreadPoolExecutor(20)#在线程池中开启20个线程来处理任务
    l=[]
    forinrange(10):
        res=t.submit(task,i)#相当于start方法
        l.append(res)
    t.shutdown()#停止提交任务
    forinl:
        t.result()

 

8.1什么事协程?

就是一个协程

8.2为什么要使用协程?

1.     单线程实现并发的效果

2.     我们可以在应用程序里控制多个任务的切换+保存状态

 

协程的优缺点:

优点:

1.     应用程序级别速度要远远高于操作系统的切换

缺点:

1.     多个任务一旦有一个阻塞没有切换,整个线程都将阻塞现在原地

2.     该线程内的其他任务都不能执行了

所以一旦引入协程的概念,就需要检测单线程下所有的IO行为

实现遇到IO就切换,少一个都不行,因为一旦一个任务造成了阻塞,整个现车给你就阻塞了,其他任务即便是可以计算,但是也无法运行了

协程的使用场景:

程序遇到IO的时候,使用协程可以节省时间

串行执行

import time

def func1():
    forinrange(100000):
        i + 1

def func2():
    forinrange(100000):
        i + 1

start=time.time()
func1()
func2()
stop=time.time()
print(stop-start)

基于yield并发执行

import time

def func1():
    while True:
        yield

def 
func2():
    g=func1()
    forinrange(100000):
        i+1
        next(g)

start=time.time()
func2()
stop=time.time()
print(stop -start)

gevent模块

import time
from gevent import monkey,spawn
monkey.patch_all()#监控所有IO事件

def eat(name):
    print('%s is eat 1' %name)
    time.sleep(3)
    print('%s is eat 2' %name)

def play(name):
    print('%s is play 1' %name)
    time.sleep(1)
    print('%s is play 2' %name)

start=time.time()
g1=spawn(eat,'eGon')
g2=spawn(play,'jiang')
g1.join()
g2.join()
stop=time.time()
print(stop-start)
print(g1)
print(g2)

 

 

 


相关文章