Python_并发编程(线程 进程 协程)
2021-06-19 11:04
标签:div 包含 closed class 退出 owa 死锁 __name__ coroutine 进程就是一个程序在一个数据集上的一次动态执行过程。 进程一般由程序、数据集、进程控制块三部分组成。我们编写的程序用来描述进程要完成哪些功能以及如何完成; 数据集则是程序在执行过程中所需要使用的资源; 进程控制块用来记录进程的外部特征,描述进程的执行变化过程,系统可以利用它来控制和管理进程,它是系 统感知进程存在的唯一标志。 进程的本质:本质上就是一段程序的运行过程(抽象的概念) 意义:为了能够同时运行多个任务的并发而不是一次只能干一件事情 线程的出现是为了降低上下文切换的消耗,提高系统的并发性,并突破一个进程只能干一样事的缺陷,
使到进程内并发成为可能。 线程也叫轻量级进程,它是一个基本的CPU执行单元,也是程序执行过程中的最小单元,由线程ID、程序
计数器、寄存器集合和堆栈共同组成。 线程的引入减小了程序并发执行时的开销,提高了操作系统的并发
性能。线程没有自己的系统资源。 线程的本质:提高切换的效率,提高系统的并发性,并突破一个进程只能干一件事的缺陷 意义:使得进程内并发成为可能 二者之间的关系 1、一个程序至少有一个进程,一个进程至少有一个线程(进程可以理解为线程的容器) 2、进程是最小的资源单位,线程是最小的执行单位 3、进程的作用是一个资源管理 4、进程在执行过程中拥有独立的内存单元,而多个线程共享内存,从而极大地提高了程序运行效率 5、一个线程可以创建和撤销另一个线程,同一个进程中的多个线程之间可以并发执行 并发&并行 并发: 是指系统具有处理多个任务(动作)的能力 可通过CPU的切换实现 线程的调用方式 threading 模块建立在thread 模块之上。thread模块以低级、原始的方式来处理和控制线程,而threading 模块通过对thread进行二次封装, 提供了更方便的api来处理线程。 一、直接调用 二、继承式调用 三、threading.Thread的实例方法 join&aemon方法 join():在子线程完成运行之前,这个子线程的父线程将一直被阻塞。 setDaemon(True): 将线程声明为守护线程,必须在start() 方法调用之前设置, 如果不设置为守护线程程序会被无限挂起。这个方法基本和join是相反的。 当我们 在程序运行中,执行一个主线程,如果主线程又创建一个子线程,主线程和子线程 就分兵两路,分别运行,那么当主线程完成 想退出时,会检验子线程是否完成。如 果子线程未完成,则主线程会等待子线程完成后再退出。但是有时候我们需要的是 只要主线程 完成了,不管子线程是否完成,都要和主线程一起退出,这时就可以 用setDaemon方法啦 其他方法 四、同步锁(Lock) 多个线程都在同时操作同一个共享资源,所以造成了资源破坏,怎么办呢?(join会造成串行,失去所线程的意义) 我们可以通过同步锁来解决这种问题 五、线程死锁和递归锁 在线程间共享多个资源的时候,如果两个线程分别占有一部分资源并且同时等待对方的资源,就会造成死锁,因为系统判断这部分资源都正在使用,所有这两个线程在无外力作用下将一直等待下去。下面是一个死锁的例子: 解决办法:使用递归锁,将 队列Queue是python标准库中的线程安全的队列(FIFO)实现,提供了一个适用于多线程编程的先进先出的数据结构,即队列,用来在生产者和消费者线程之间的信息传递 queue队列的方法: 其他模式: 为什么要使用生产者和消费者模式 在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。 什么是生产者消费者模式 生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。 这就像,在餐厅,厨师做好菜,不需要直接和客户交流,而是交给前台,而客户去饭菜也不需要不找厨师,直接去前台领取即可,这也是一个结耦的过程。 一、进程的调用 直接调用 继承式调用 二、Process类 构造方法: Process([group [, target [, name [, args [, kwargs]]]]]) group: 线程组,目前还没有实现,库引用中提示必须是None; 实例方法: is_alive():返回进程是否在运行。 join([timeout]):阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定的timeout(可选参数)。 start():进程准备就绪,等待CPU调度 run():strat()调用run方法,如果实例进程时未制定传入target,这star执行t默认run()方法。 terminate():不管任务是否完成,立即停止工作进程 属性: daemon:和线程的setDeamon功能一样 name:进程名字。 pid:进程号。 三、进程间通讯 3.1进程队列Queue 3.2管道 Queue和pipe只是实现了数据交互,并没实现数据共享,即一个进程去更改另一个进程的数据。 四、进程同步 在创建多个进程的过程中,可能会出现多个进程争相输出资源(打印,显示等)的情况,前一个进程没来得及输出换行符,该资源就被下一个进程调用,导致两个进程打印在一行之中,为了避免这种情况,我们可以使用加锁 五、进程池 进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。 进程池中有两个方法: 协程,又称微线程,纤程。英文名Coroutine。 优点1: 协程极高的执行效率。因为子程序切换不是线程切换,而是由程序自身控制,因此,没有线程切换的开销,和多线程比,线程数量越多,协程的性能优势就越明显。 优点2: 不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了,所以执行效率比多线程高很多。 因为协程是一个线程执行,那怎么利用多核CPU呢?最简单的方法是多进程+协程,既充分利用多核,又充分发挥协程的高效率,可获得极高的性能。 greenlet是一个用C实现的协程模块,相比与python自带的yield,它可以使你在任意函数之间随意切换,而不需把这个函数先声明为generator Python_并发编程(线程 进程 协程) 标签:div 包含 closed class 退出 owa 死锁 __name__ coroutine 原文地址:https://www.cnblogs.com/liutianyuan/p/9688638.html一、进程和线程
进程
线程
并行: 是指系统具有 同时 处理多个任务(动作)的能力
二者之间的关系:并行是并发的一个子集,并行的一定是并发,并发的不一定是并行Python的线程与threading模块
import threading
import time
def add():#定义每个线程要运行的函数
sum=0
for i in range(10000000):
sum+=i
time.sleep(1)
print("sum",sum)
def sayhi(num): # 定义每个线程要运行的函数
print("running on number:%s" % num)
time.sleep(3)
if __name__ ==‘__main__‘:
start=time.time()
t1=threading.Thread(target=add)#生成一个线程实例
t2=threading.Thread(target=sayhi,args=(1,))#生成另一个线程实例,args是参数
t1.start() #启动线程
t2.start()
print(t1.getName()) #获取线程名
print(t2.getName()) #获取线程名
import threading
import time
class Mythread(threading.Thread):
def __init__(self,num):
threading.Thread.__init__(self)
self.num = num
def run(self):#定义每个线程要运行的函数
print("running on number:%s" % self.num)
time.sleep(3)
if __name__ == ‘__main__‘:
t1 = Mythread(1)
t2 = Mythread(2)
t1.start()
t2.start()
print("ending......")
import threading
from time import ctime,sleep
import time
def ListenMusic(name):
print("Begin listening to %s. %s" % (name, ctime()))
sleep(2) # sleep等同于IO操作
print("end listening %s" % ctime())
def RecordBlog(title):
print("Begin recording the %s! %s" % (title, ctime()))
sleep(5)
print(‘end recording %s‘ % ctime())
l = []
t1 = threading.Thread(target=ListenMusic,args=("A",))
t2 = threading.Thread(target=RecordBlog,args=("B",))
l.append(t1)
l.append(t2)
if __name__ == ‘__main__‘:
for i in l :
# i.setDaemon(True) # 两个都只执行第一句然后结束
i.start()
# i.join() #线程一个一个执行
print ("all over %s" %ctime())
# run(): 线程被cpu调度后自动执行线程对象的run方法
# start():启动线程活动。
# isAlive(): 返回线程是否活动的。
# getName(): 返回线程名。
# setName(): 设置线程名。
threading模块提供的一些方法:
# threading.currentThread(): 返回当前的线程变量。
# threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
# threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
import time
import threading
def addNum():
global num #在每个线程中都获取这个全局变量
#num-=1
temp=num
#print(‘--get num:‘,num )
time.sleep(0.001)
num =temp-1 #对此公共变量进行-1操作
num = 100 #设定一个共享变量
thread_list = []
for i in range(100):
t = threading.Thread(target=addNum)
t.start()
thread_list.append(t)
for t in thread_list: #等待所有线程执行完毕
t.join()
print(‘final num:‘, num )
#执行结果不固定,不为0
import time
import threading
R = threading.Lock()
####
def sub():
global num
R.acquire() #同步锁开始
temp = num - 1
time.sleep(0.01)
num = temp
R.release() #同步锁结束
num = 100 #设定一个共享变量
thread_list = []
for i in range(100):
t = threading.Thread(target=sub)
t.start()
thread_list.append(t)
for t in thread_list: #等待所有线程执行完毕
t.join()
print(‘final num:‘, num )
#运行结果为0
import threading,time
class myThread(threading.Thread):
def doA(self):
lockA.acquire()
print(self.name,"gotlockA",time.ctime())
time.sleep(3)
lockB.acquire()
print(self.name,"gotlockB",time.ctime())
lockB.release()
lockA.release()
def doB(self):
lockB.acquire()
print(self.name,"gotlockB",time.ctime())
time.sleep(2)
lockA.acquire()
print(self.name,"gotlockA",time.ctime())
lockA.release()
lockB.release()
def run(self):
self.doA()
self.doB()
if __name__=="__main__":
lockA=threading.Lock()
lockB=threading.Lock()
threads=[]
for i in range(5):
threads.append(myThread())
for t in threads:
t.start()
for t in threads:
t.join()
lockA=threading.Lock()
lockB=threading.Lock()
#--------------
lock=threading.RLock()
import threading
import time
class MyThread(threading.Thread):
def actionA(self):
r_lcok.acquire() #count=1
print(self.name,"gotA",time.ctime())
time.sleep(2)
r_lcok.acquire() #count=2
print(self.name, "gotB", time.ctime())
time.sleep(1)
r_lcok.release() #count=1
r_lcok.release() #count=0
def actionB(self):
r_lcok.acquire()
print(self.name, "gotB", time.ctime())
time.sleep(2)
r_lcok.acquire()
print(self.name, "gotA", time.ctime())
time.sleep(1)
r_lcok.release()
r_lcok.release()
def run(self):
self.actionA()
self.actionB()
if __name__ == ‘__main__‘:
# A=threading.Lock() 同步锁会造成死锁
# B=threading.Lock()
r_lcok=threading.RLock() #使用递归锁解决这个问题
L=[]
for i in range(5):
t=MyThread()
t.start()
L.append(t)
for i in L:
i.join()
print("ending....")
多线程利器------队列
创建一个“队列”对象
import Queue
q = Queue.Queue(maxsize = 10)
Queue.Queue类即是一个队列的同步实现。队列长度可为无限或者有限。可通过Queue的构造函数的可选参数maxsize来设定队列长度。如果maxsize小于1就表示队列长度无限。
将一个值放入队列中
q.put(10)
调用队列对象的put()方法在队尾插入一个项目。put()有两个参数,第一个item为必需的,为插入项目的值;第二个block为可选参数,默认为
1。如果队列当前为空且block为1,put()方法就使调用线程暂停,直到空出一个数据单元。如果block为0,put方法将引发Full异常。
将一个值从队列中取出
q.get()
调用队列对象的get()方法从队头删除并返回一个项目。可选参数为block,默认为True。如果队列为空且block为True,
get()就使调用线程暂停,直至有项目可用。如果队列为空且block为False,队列将引发Empty异常。
Python Queue模块有三种队列及构造函数:
1、Python Queue模块的FIFO队列先进先出。 class queue.Queue(maxsize)
2、LIFO类似于堆,即先进后出。 class queue.LifoQueue(maxsize)
3、还有一种是优先级队列级别越低越先出来。 class queue.PriorityQueue(maxsize)
此包中的常用方法(q = Queue.Queue()):
q.qsize() 返回队列的大小
q.empty() 如果队列为空,返回True,反之False
q.full() 如果队列满了,返回True,反之False
q.full 与 maxsize 大小对应
q.get([block[, timeout]]) 获取队列,timeout等待时间
q.get_nowait() 相当q.get(False)
非阻塞 q.put(item) 写入队列,timeout等待时间
q.put_nowait(item) 相当q.put(item, False)
q.task_done() 在完成一项工作之后,q.task_done() 函数向任务已经完成的队列发送一个信号
q.join() 实际上意味着等到队列为空,再执行别的操作
import queue
#先进后出
q=queue.LifoQueue()
q.put(34)
q.put(56)
q.put(12)
#优先级
# q=queue.PriorityQueue()
# q.put([5,100])
# q.put([7,200])
# q.put([3,"hello"])
# q.put([4,{"name":"alex"}])
while 1:
data=q.get()
print(data)
生产者消费者模型
import time,random
import queue,threading
q = queue.Queue()
def Producer(name):
count = 0
while count #做十个包子
print("making........")
time.sleep(5) #正在做包子
q.put(count) #做好包子放入笼屉(队列)里
print(‘Producer %s has produced %s baozi..‘ %(name, count))
count +=1
q.join() #等到队列为空,再执行别的操作
print("ok......")
def Consumer(name):
count = 0
while count #吃包子
time.sleep(random.randrange(4)) #先等待包子
data = q.get() #拿包子 如果没有包子等待包子的到来
print("eating....")
time.sleep(3) #吃包子ing....
q.task_done()#在完成一项工作之后,q.task_done() 函数向任务已经完成的队列发送一个信号
print(‘\033[32;1mConsumer %s has eat %s baozi...\033[0m‘ %(name, data))
count +=1
# q.join()
p1 = threading.Thread(target=Producer, args=(‘A君‘,))
c1 = threading.Thread(target=Consumer, args=(‘B君‘,))
# c2 = threading.Thread(target=Consumer, args=(‘C君‘,))
# c3 = threading.Thread(target=Consumer, args=(‘D君‘,))
p1.start()
c1.start()
# c2.start()
# c3.start()
多进程模块multiprocessing
multiprocessing包是Python中的多进程管理包。与threading.Thread类似,它可以利用multiprocessing.Process对象来创建一个进程。该进程可以运行在Python程序内部编写的函数。该Process对象与Thread对象的用法相同,也有start(), run(), join()的方法。此外multiprocessing包中也有Lock/Event/Semaphore/Condition类 (这些对象可以像多线程那样,通过参数传递给各个进程),用以同步进程,其用法与threading包中的同名类一致。所以,multiprocessing的很大一部份与threading使用同一套API,只不过换到了多进程的情境。
from multiprocessing import Process
import time
def f(name):
time.sleep(1)
print(‘hello‘, name,time.ctime())
if __name__ == ‘__main__‘:
p_list=[]
for i in range(3):
p = Process(target=f, args=(‘alvin‘,))
p_list.append(p)
p.start()
for i in p_list:
p.join()
print(‘end‘)
from multiprocessing import Process
import time
class MyProcess(Process):
def __init__(self):
super(MyProcess, self).__init__()
#self.name = name
def run(self):
time.sleep(1)
print (‘hello‘, self.name,time.ctime())
if __name__ == ‘__main__‘:
p_list=[]
for i in range(3):
p = MyProcess()
p.start()
p_list.append(p)
for p in p_list:
p.join()
print(‘end‘)
target: 要执行的方法;
name: 进程名;
args/kwargs: 要传入方法的参数。from multiprocessing import Process, Queue
import queue
def f(q,n):
#q.put([123, 456, ‘hello‘])
q.put(n*n+1)
print("son process",id(q))
if __name__ == ‘__main__‘:
q = Queue() #try: q=queue.Queue()
print("main process",id(q))
for i in range(3):
p = Process(target=f, args=(q,i))
p.start()
print(q.get())
print(q.get())
print(q.get())
from multiprocessing import Process, Pipe
def f(conn):
conn.send([12, {"name":"yuan"}, ‘hello‘])
response=conn.recv()
print("response",response)
conn.close()
print("q_ID2:",id(child_conn))
if __name__ == ‘__main__‘:
parent_conn, child_conn = Pipe()
print("q_ID1:",id(child_conn))
p = Process(target=f, args=(child_conn,))
p.start()
print(parent_conn.recv()) # prints "[42, None, ‘hello‘]"
parent_conn.send("儿子你好!")
p.join()
3.3 Managers
from multiprocessing import Process, Manager
def f(d, l,n):
d[n] = ‘1‘ #{0:"1"}
d[‘2‘] = 2 #{0:"1","2":2}
l.append(n) #[0,1,2,3,4, 0,1,2,3,4,5,6,7,8,9]
#print(l)
if __name__ == ‘__main__‘:
with Manager() as manager:
d = manager.dict()#{}
l = manager.list(range(5))#[0,1,2,3,4]
p_list = []
for i in range(10):
p = Process(target=f, args=(d,l,i))
p.start()
p_list.append(p)
for res in p_list:
res.join()
print(d)
print(l)
from multiprocessing import Process, Lock
import time
def f(l, i):
l.acquire() #加锁
time.sleep(1)
print(‘hello world %s‘ % i)
l.release() #释放锁
if __name__ == ‘__main__‘:
lock = Lock()
for num in range(10):
Process(target=f, args=(lock, num)).start()
from multiprocessing import Process,Pool
import time,os
def Foo(i):
time.sleep(1)
print(i)
print("son",os.getpid())
return "HELLO %s"%i
def Bar(arg):
print(arg)
# print("hello")
# print("Bar:",os.getpid())
if __name__ == ‘__main__‘:
pool = Pool(5) #创建进程池 进程数为5
print("main pid",os.getpid())
for i in range(100): #开启100个任务
#pool.apply(func=Foo, args=(i,)) #同步接口
#pool.apply_async(func=Foo, args=(i,))
#回调函数: 就是某个动作或者函数执行成功后再去执行的函数
pool.apply_async(func=Foo, args=(i,),callback=Bar) #相当于threading.Thread实例化对象 func=target
# callback 是回调函数
pool.close()
pool.join() # 进程池中join与close调用顺序是固定的
print(‘end‘)
协程
import time
import queue
def consumer(name):
print(‘准备做包子‘)
while True:
new_baozi = yield #协程的核心
print("%s正在吃包子%s"%(name,new_baozi))
def producer():
n = 0
con.__next__() #执行函数到yield
con2.__next__()
while n:
time.sleep(1)
print("正在做包子%s 和 %s"%(n,n+1))
con.send(n) #把做的包子发送给yield
con2.send(n+1) #把做的包子发送给yield
n += 2
if __name__ == ‘__main__‘:
con = consumer(‘alex‘)
con2 = consumer(‘selina‘)
producer()
Greenlet
from greenlet import greenlet
def test1():
print(12)
gr2.switch() #切换到gr2 并保存状态
print(34)
gr2.switch() #切换到gr2 并保存状态
def test2():
print(56)
gr1.switch()#切换到gr1 并保存状态
print(78)
gr1 = greenlet(test1) #将test1封装为gr1
gr2 = greenlet(test2) #将test2封装为gr2
gr1.switch() #执行test1
#运行结果为
#12
#34
#56
#78