Python Day9
s.recvfrom() 接收UDP数据
s.sendto() 发送UDP数据
UDP(user datagram protocol,用户数据报协议)是无连接的,面向消息的,提供高效率服务。
不会使用块的合并优化算法, 由于UDP支持的是一对多的模式,所以接收端的skbuff(套接字缓冲区)采用了链式结构来记录每一个到达的UDP包
在每个UDP包中就有了消息头(消息来源地址,端口等信息),这样,对于接收端来说,就容易进行区分处理了。
即面向消息的通信是有消息保护边界的。
udp的recvfrom是阻塞的,一个recvfrom(x)必须对唯一一个sendinto(y),收完了x个字节的数据就算完成
若是y>x数据就丢失,这意味着udp根本不会粘包,但是会丢数据,不可靠
udp是无链接的,先启动哪一端都不会报错
udp发送数据,对端是不会返回确认信息的,因此不可靠
服务端:
from Socket import *
server = socket(AF_INET, SOCK_DGRAM) #创建一个服务器的套接字
server.bind(('127.0.0.1', 8080)) #绑定服务器套接字
while True:
data, client_addr = server.recvfrom(1024)
server.sendto(data.upper(),client_addr)
server.close()
客户端:
from socket import *
client=socket(AF_INET,SOCK_DGRAM)
while True:
msg=input('>>: ').strip()
if not msg:continue
client.sendto(msg.encode('utf-8'),('127.0.0.1',8080))
data,server_addr=client.recvfrom(1024)
print(data.decode('utf-8'))
client.close()
进程:正在进行的一个过程或者说一个任务。而负责执行任务则是cpu。
程序仅仅只是一堆代码而已,而进程指的是程序的运行过程。
multiprocessing模块介绍
multiprocessing模块用来开启子进程,并在子进程中执行我们定制的任务(比如函数),该模块与多线程模块threading的编程接口类似。
与线程不同,进程没有任何共享状态,进程修改的数据,改动仅限于该进程内
Process类的介绍
创建进程的类:
Process是一个类
class Process(object):
def init(self, group=None, target=None, name=None, args=(), kwargs={}):
group是保留的,target指定起一个进程要做什么事,name是指定进程的名字,args和kwargs是为前面任务传参的方式
参数介绍:
args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号
group参数未使用,值始终为None
target表示调用对象,即子进程要执行的任务
args表示调用对象的位置参数元组,args=(1,2,'eGon',)
kwargs表示调用对象的字典,kwargs={'name':'egon','age':18}
name为子进程的名称
方法介绍:
p.start():启动进程,并调用该子进程中的p.run()
p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法
p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁
p.is_alive():如果p仍然运行,返回True
p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程
属性介绍:
p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置
p.name:进程的名称
p.pid:进程的pid
p.exitcode:进程在运行时为None、如果为–N,表示被信号N结束(了解即可)
开启进程的两种方式
方式一:
from multiprocessing import Process
import time
def task(name):
print('%s is running' %name)
time.sleep(3) #模拟任务运行了一段时间后结束
print('%s is done' %name)
if __name__ == '__main__': #注意,在windows下开进程的代码必须写到main下面,这个windows开进程的原理有关
p=Process(target=task,args=('alex',)) #注意需要加逗号,因为这里是元祖
p.start() #上面实例化出一个对象,这个start只是向操作系统发出一个请求,操作系统会把父进程里面的数据拷贝到子进程
print('主')
结果:
主
alex is running
(等了三秒)
alex is do
方式二:
Process既然是一个类,那我们可以继承它写一个自己的类
from multiprocessing import Process
import time
class MyProcess(Process):
def __init__(self,name):
super(MyProcess,self).__init__()
self.name=name
注意,自定义类这种方式里面必须写一个run方法,实例化后调用的start()实际就是run()方法
def run(self): #这个run取代了方式一种的task方法
print('%s is running' %self.name)
time.sleep(3) #模拟任务运行了一段时间后结束
print('%s is done' %self.name)
if __name__ == '__main__':
p=MyProcess('进程1')
p.start() #p.start()调用的就是类中的run()
print('主')
运行结果与方式一一样
join方法
主进程等待子进程结束
from multiprocessing import Process
import time
import os
def task(n):
print('%s is running' %os.getpid()) #查看pid
time.sleep(n)
print('%s is doen' %os.getpid())
if __name__ == '__main__':
start_time = time.time()
p1 = Process(target=task, args=(1,))
p2 = Process(target=task, args=(2,))
p3 = Process(target=task, args=(3,))
p1.start()
p1.join() #主线程等待p1终止再执行下面的内容
p2.start()
p2.join() #主线程等待p2终止再执行下面的内容
p3.start()
p3.join() #主线程等待p3终止再执行下面的内容
stop_time = time.time()
print('主', (stop_time - start_time))
结果:
is running
is doen
is running
is doen
is running
is doen
主 6.226877450942993
进程对象的其它属性或方法
from multiprocessing import Process
import os,time
def task():
print('子进程pid: %s 父进程ppid: %s' %(os.getpid(),os.getppid())) #ppid是父进程pid
if __name__ == '__main__':
p = Process(target=task,name='进程1')
p.start()
#p.terminate() #关闭进程,不会立即关闭,所以is_alive立刻查看的结果可能还是存活
time.sleep(0.5)
print('父进程pid: %s' %os.getpid())
print('子进程pid: %s' %p.pid) #打印进程pid,跟os.getpid一样
print(p.name) #可以打印子进程名称
#p.name='xxxx' #也可以直接设置name属性
结果:
子进程pid: 6692 父进程ppid: 7652
父进程pid: 7652
子进程pid: 6692
进程1
僵尸进程与孤儿进程
僵尸进程(有害)
僵尸进程:一个进程使用fork创建子进程,如果子进程退出,而父进程并没有调用wait或waitpid获取子进程的状态信息,那么子进程的进程描述符仍然保存在系统中。
这种进程称之为僵死进程。
任何一个子进程(init除外)在exit()之后,并非马上就消失掉,而是留下一个称为僵尸进程(Zombie)的数据结构,等待父进程处理。
这是每个子进程在结束时都要经过的阶段。
小知识:
在linux系统中使用ps命令查看的话,状态为Z的进程就是僵尸进程 ps aux |grep Z
使用top命令 zombie代表当前有多少个僵尸进程
解决方法:
- 杀死父进程
- 对开启的子进程应该记得使用join,join会回收僵尸进程,join方法中调用了wait,告诉系统释放僵尸进程
- 使用signal(SIGCHLD, SIG_IGN)处理僵尸进程 https://blog.51cto.com/dzm911/2069063
孤儿进程(无害)
一个父进程退出,而它的一个或多个子进程还在运行,那么那些子进程将成为孤儿进程。孤儿进程将被init进程(进程号为1)所收养,并由init进程对它们完成状态收集工作。
当子进程执行的任务在父进程代码运行完毕后就没有存在的必要了,
那该子进程就应该被设置为守护进程
守护进程要在子进程开启前进行设置(再start之前)
守护进程里面没办法再开子进程
设置成守护进程后,主进程一旦运行完代码的最后一行,守护进程就会跟着结束,不会管主进程是否还等待非守护子进程结束
示例代码:
from multiprocessing import Process
import time
def foo():
print(123)
time.sleep(1)
print("end123") #这行永远不可能打印
def bar():
print(456)
time.sleep(3)
print("end456")
if __name__ == '__main__':
p1=Process(target=foo)
p2=Process(target=bar)
p1.daemon=True
p1.start()
p2.start()
print("main-------") #这行执行结束后p1(守护进程)就会跟着结束
进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的,
而共享带来的是竞争,竞争带来的结果就是错乱,如何控制,就是加锁处理
与join的区别
join是把整体变成了串行,互斥锁可以只把需要的地方变成串行
代码示例:
这里使用抢票进行模拟,进程之间通过文件进行通信,保证只有一个进程能够抢到票
from multiprocessing import Process,Lock
import JSON
import time
import random
import os
def search():
time.sleep(random.random()) #默认(0,1)取大于0且小于1之间的小数
dic=json.load(open('db.txt','r',encoding='utf-8'))
print('%s 查看剩余票数: %s' %(os.getpid(),dic['count']))
def get():
dic = json.load(open('db.txt', 'r', encoding='utf-8'))
if dic['count'] > 0:
dic['count']-=1
time.sleep(random.randint(1,3))
json.dump(dic,open('db.txt','w',encoding='utf-8'))
print('%s 购票成功' %os.getpid())
def task(mutex):
mutex.acquire() #加锁
search()
get()
mutex.release() #释放锁
#只有锁被释放后其它进程才能抢到锁并运行这段代码
# with mutex: #另外一种写法,等同于mutex.acquire()
# search()
# get()
if __name__ == '__main__':
mutex = Lock() #实例化一把锁
for i in range(10):
p=Process(target=task,args=(mutex,)) #别忘了args后面要跟一个数组
p.start()
总结:
加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。
虽然可以用文件共享数据实现进程间通信,但问题是:
- 效率低(共享数据基于文件,而文件是硬盘上的数据)
- 需要自己加锁处理
因此我们最好找寻一种解决方案能够兼顾:1、效率高(多个进程共享一块内存的数据)2、帮我们处理好锁问题。这就是mutiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道。
- 队列和管道都是将数据存放于内存中
- 队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,
我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性。
进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的
创建队列的类(底层就是以管道和锁定的方式实现):
Queue([maxsize]):创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。
参数介绍:
maxsize是队列中允许最大项数,省略则无大小限制。
主要方法:
- q.put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。
如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。
如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。- q.get方法可以从队列读取并且删除一个元素,get方法有两个可选参数:blocked和timeout。
如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。
如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常.
示例代码:
from multiprocessing import Queue
q=Queue(3)
q.put('first')
q.put(2)
q.put({'count':3})
# q.put('fourth',block=False) #等同于q.put_nowait('fourth')
# q.put('fourth',block=True,timeout=3) #block代表阻塞(blocked)
print(q.get())
print(q.get())
print(q.get())
# print(q.get(block=False)) #等同于q.get_nowait()
print(q.get(block=True,timeout=3)) #在等待时间内没有取到任何元素,会抛出Queue.Empty异常
q.get_nowait():同q.get(False)
q.put_nowait():同q.put(False)
生产者消费者模型
为什么要使用生产者和消费者模式
在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。
在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。
同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。
代码示例
from multiprocessing import Process,Queue
import time
import random
def producer(name,food,q): #生产者
for i in range(3):
res='%s%s' %(food,i)
time.sleep(random.randint(1,3))
q.put(res) #生产者把生产的内容放入队列
print('厨师[%s]生产了<%s>' %(name,res))
def consumer(name,q): #消费者
while True:
res=q.get()
if res is None:break #如果接收到None,则break掉整个循环
time.sleep(random.randint(1,3))
print('吃货[%s]吃了<%s>' % (name, res))
if __name__ == '__main__':
#队列
q=Queue()
#生产者们
p1=Process(target=producer,args=('egon1','泔水',q))
#消费者们
c1=Process(target=consumer,args=('管廷威',q))
p1.start()
c1.start()
p1.join() #生产者结束运行后执行下面的代码
q.put(None) #给队列中发送一个None
print('主')
生产者消费者模型总结
程序中有两类角色
一类负责生产数据(生产者)
一类负责处理数据(消费者)引入生产者消费者模型为了解决的问题是:
平衡生产者与消费者之间的工作能力,从而提高程序整体处理数据的速度如何实现:
生产者<-->队列<——>消费者
生产者消费者模型实现类程序的解耦和
JoinableQueue与守护进程应用
上面主动向队列发特定关键词当信号,再让消费者判断数据内容的方式很low,因为有几个消费者就需要发送几次结束信号。
multiprocessing模块提供了类似原理的类JoinableQueue,即等待队列完成
JoinableQueue([maxsize]):
这就像是一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。
参数介绍:
maxsize是队列中允许最大项数,省略则无大小限制。
方法介绍:
JoinableQueue的实例p除了与Queue对象相同的方法之外还具有:
q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止
示例代码
from multiprocessing import Process,JoinableQueue
import time
import random
def producer(name,food,q):
for i in range(3):
res='%s%s' %(food,i)
time.sleep(random.randint(1,3))
q.put(res)
print('厨师[%s]生产了<%s>' %(name,res))
def consumer(name,q):
while True:
res=q.get()
if res is None:break
time.sleep(random.randint(1,3))
print('吃货[%s]吃了<%s>' % (name, res))
q.task_done() #消费者每消费一个就给q.join()发信号,告诉它管道内的数据少了一个
if __name__ == '__main__':
#队列
q=JoinableQueue()
#生产者们
p1=Process(target=producer,args=('egon1','泔水',q))
p2=Process(target=producer,args=('egon2','骨头',q))
#消费者们
c1=Process(target=consumer,args=('管廷威',q))
c2=Process(target=consumer,args=('oldboy',q))
c3=Process(target=consumer,args=('oldgirl',q))
c1.daemon=True #把三个消费者都设置成守护进程,主进程结束后跟着结束
c2.daemon=True #这里提供了一个守护进程的应用场景
c3.daemon=True
p1.start()
p2.start()
c1.start()
c2.start()
c3.start()
p1.join()
p2.join()
#此时生产者已经完成任务,q.join获取当前管道中的数据数量
q.join()
#等管道内数据为0后,执行下一行代码,这也意味着消费者取走了管道内的所有数据,此时消费者也完成了使命
print('主')
在传统操作系统中,每个进程有一个地址空间,而且默认就有一个控制线程
所以,进程只是用来把资源集中到一起(进程只是一个资源单位,或者说资源集合),而线程才是cpu上的执行单位
多线程(即多个控制线程)的概念是,在一个进程中存在多个控制线程,多个控制线程共享该进程的地址空间,相当于一个车间内有多条流水线,都共用一个车间的资源。
没有子线程的概念,在一个进程内多个线程是平等的,下文中的“主线程”仅为了区分
开线程资源消耗小
Http://www.cnblogs.com/linhaifeng/articles/7428877.html
开线程的两种方式
第一种:
from threading import Thread
import time
import random
def piao(name):
print('%s is piao start' % name)
time.sleep(random.randint(1, 3))
print('%s is piao end' % name)
if __name__ == '__main__':
t1 = Thread(target=piao, args=('alex',)) # 实例化一个t1线程,args后面是传参,以元祖形式,所以要有个一个逗号
t1.start()
print('主线程')
第二种(自己写一个类):
from threading import Thread
import time
import random
class MyThread(Thread):
def __init__(self, name): # 在初始化时定制
super().__init__() #继承父类的init
self.name = name
def run(self):
print('%s is piao start' % self.name)
time.sleep(random.randint(1, 3))
print('%s is piao end' % self.name)
if __name__ == '__main__':
t1 = MyThread('alex')
t1.start() #start执行的就是自定义函数中的run方法
print('主线程')
多线程实现并发的套接字通信
相关内容请参考上一篇文章day8
服务端:
from threading import Thread,current_thread
from socket import *
def comunicate(conn):
print('子线程:%s' %current_thread().getName()) #查看当前线程名字
while True:
try:
data=conn.recv(1024)
if not data:break #针对linux系统,linux系统中客户端单方便断开连接,服务端不会报错会一直收空
conn.send(data.upper())
except ConnectionResetError: #在windows上客户端单方面断开链接会报错,这里将报错处理掉
break
conn.close()
def server(ip,port):
print('主线程:%s' %current_thread().getName()) #查看当前线程名字
server = socket(AF_INET, SOCK_STREAM)
server.bind((ip,port))
server.listen(5)
while True:
conn, addr = server.accept() #主线程负责建立连接,获得conn
print(addr)
# comunicate(conn)
t=Thread(target=comunicate,args=(conn,))
t.start()
server.close()
if __name__ == '__main__':
server('127.0.0.1', 8081)
客户端:
from socket import *
client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8081))
while True:
msg=input('>>: ').strip()
if not msg:continue
client.send(msg.encode('utf-8'))
data=client.recv(1024)
print(data.decode('utf-8'))
client.close()
这里有个问题,现在的实现效果意味着每连接一个客户端都要起一个线程,然而开线程(或进程)是要消耗系统资源的,不能无限开
这种实现方式是有弊端的,解决办法要用到池的概念,看后面的文章day10
线程相关的其他方法
Thread实例对象的方法
isAlive(): 返回线程是否活动的
getName(): 返回线程名
setName(): 设置线程名
守护线程
主线程代表了这个进程的生命周期,进程内所有的线程结束后主线程才能结束
主线程执行完毕后(运行完最后一行代码)要等其他线程执行完才能结束
详细解释:
主进程在其代码结束后就已经算运行完毕了(守护进程在此时就被回收),然后主进程会一直等非守护的子进程都运行完毕后回收子进程的资源(否则会产生僵尸进程),才会结束,
- 主线程在其他非守护线程运行完毕后才算运行完毕(守护线程在此时就被回收)。因为主线程的结束意味着进程的结束,进程整体的资源都将被回收,而进程必须保证非守护线程都运行完毕后才能结束。
守护线程在主线程执行完最后一行代码后立即终止,即守护xxx会等待主xxx运行完毕后被销毁
迷惑人的例子
from threading import Thread
import time
def foo():
print(123)
time.sleep(1)
print("end123")
def bar():
print(456)
time.sleep(3)
print("end456")
if __name__ == '__main__':
t1=Thread(target=foo)
t2=Thread(target=bar)
t1.daemon=True #t1是守护线程
t1.start()
t2.start() #t2是非守护线程,主线程要等它结束后才能结束
print("main-------")
结果:
123
456
main------- #虽然执行到最后一行代码了,但t2这个非守护线程还没结束,主线程要等待
end123 #等待t2的过程中守护线程t1已经执行完了
end456
GIL是Cpython解释器的特性
在CPython解释器中,同一个进程下开启的多线程,同一时刻只能有一个线程执行,无法利用多核优势
结论:
多线程用于IO密集型,如socket,爬虫,WEB
多进程用于计算密集型,如金融分析
paramiko是一个用于做远程控制的模块,使用该模块可以对远程服务器进行命令或文件操作
值得一说的是,fabric和ansible内部的远程管理就是使用的paramiko来现实
pip3 install paramiko #在python3中安装
http://www.cnblogs.com/linhaifeng/articles/6817679.html#_label2
使用
sshClient
用于连接远程服务器并执行基本命令
-
基于用户名密码连接:
import paramiko #创建SSH对象 ssh = paramiko.SSHClient() #允许连接不在know_hosts文件中的主机 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) #连接服务器 ssh.connect(hostname='10.0.0.101', port=22, username='root', passWord='123456') #执行命令 stdin, stdout, stderr = ssh.exec_command('df -h') #获取命令结果 result = stdout.read() print(result.decode('utf-8')) #关闭连接 ssh.close()
- 基于公钥密钥连接:
客户端文件名:id_rsa
服务端必须有文件名:authorized_keys(在用ssh-keygen时,必须制作一个authorized_keys,可以用ssh-copy-id来制作)
import paramiko
private_key = paramiko.RSAKey.from_private_key_file('/tmp/id_rsa')
#创建SSH对象
ssh = paramiko.SSHClient()
#允许连接不在know_hosts文件中的主机
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
#连接服务器
ssh.connect(hostname='120.92.84.249', port=22, username='root', pkey=private_key)
#执行命令
stdin, stdout, stderr = ssh.exec_command('df')
#获取命令结果
result = stdout.read()
print(result.decode('utf-8'))
#关闭连接
ssh.close()
SFTPClient
用于连接远程服务器并执行上传下载
- 基于用户名密码上传下载
import paramiko
transport = paramiko.Transport(('120.92.84.249',22))
transport.connect(username='root',password='xxx')
sftp = paramiko.SFTPClient.from_transport(transport)
#将location.py 上传至服务器 /tmp/test.py
sftp.put('/tmp/id_rsa', '/etc/test.rsa')
#将remove_path 下载到本地 local_path
sftp.get('remove_path', 'local_path')
transport.close()
- 基于公钥密钥上传下载
import paramiko
private_key = paramiko.RSAKey.from_private_key_file('/tmp/id_rsa')
transport = paramiko.Transport(('120.92.84.249', 22))
transport.connect(username='root', pkey=private_key )
sftp = paramiko.SFTPClient.from_transport(transport)
#将location.py 上传至服务器 /tmp/test.py
sftp.put('/tmp/id_rsa', '/tmp/a.txt')
#将remove_path 下载到本地 local_path
sftp.get('remove_path', 'local_path')
transport.close()
相关文章