并发编程-多线程
2020-12-13 16:36
from threading import Thread from multiprocessing import Process import time #多线程 def task(name): print(f‘{name} is running‘) time.sleep(1) print(f‘{name} is gone‘) if __name__ == ‘__main__‘: t1=Thread(target=task,args=(‘liyr‘,)) t1.start() print(‘==主线程‘) #打印结果 # liyr is running # ==主线程 # liyr is gone #多进程(永远是主进程先打印) def task(name): print(f‘{name} is running‘) time.sleep(1) print(f‘{name} is gone‘) if __name__ == ‘__main__‘: t1=Process(target=task,args=(‘liyr‘,)) t1.start() print(‘==主线程‘) #打印结果 # ==主线程 # liyr is running # liyr is gone
from threading import Thread from multiprocessing import Process import time,os # 多线程 def task(name): print(os.getpid()) if __name__ == ‘__main__‘: t1=Thread(target=task,args=(‘liyr‘,)) t2=Thread(target=task,args=(‘liyr‘,)) t1.start() t2.start() print(f‘==主线程:{os.getpid()}‘) #打印结果 # 15692 # 15692 # ==主线程:15692 #多进程(子进程依赖于主进程) def task(name): print(os.getpid()) print(f‘=主线程:{os.getppid()}‘) if __name__ == ‘__main__‘: t1=Process(target=task,args=(‘liyr‘,)) t2=Process(target=task,args=(‘liye‘,)) t1.start() t2.start() print(f‘==主线程:{os.getpid()}‘) #打印结果 # ==主线程:8496 # 15600 # 14712 # =主线程:8496 # =主线程:8496
from threading import Thread x = 3 def task(name): global x x = 100 if __name__ == ‘__main__‘: t1 = Thread(target=task, args=(‘liyr‘,)) t1.start() t1.join() print(f‘==主线程:{x}‘) #同一个进程内的资源对于这个进程的多个线程来说是共享的
六.守护线程
对于主线程来说,运行完毕指的是主线程所在进程内所有非守护线程全部运行完毕,主线程才运行完毕
如果守护线程的生命周期小于其他线程,则会先结束,否则要等待其他非守护线程和主线程结束之后结束
from threading import Thread import time def task(name): print(f‘{name} is running‘) time.sleep(3) print(f‘{name} is gone‘) if __name__ == ‘__main__‘: t1=Thread(target=task,args=(‘li‘,)) # t2=Thread(target=task,args=(‘liyr‘,)) t1.daemon=True t1.start()#线程的开启速度要比进程快很多 # t2.start() print(‘==主线程‘) # 守护线程 等待非守护子线程以及主线程结束之后,结束. from threading import Thread import time def foo(): print(123) # 1 time.sleep(1) print("end123") # 4 def bar(): print(456) # 2 time.sleep(3) print("end456") # 5 t1=Thread(target=foo) t2=Thread(target=bar) t1.daemon=True t1.start() t2.start() print("main-------") # 3 ‘‘‘ 123 456 main------- end123 end456 ‘‘‘
七.线程的其他方法
Thread实例对象的方法 # isAlive(): 返回线程是否活动的。 # getName(): 返回线程名。 # setName(): 设置线程名。 threading模块提供的一些方法: # threading.currentThread(): 返回当前的线程变量。 # threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。 # threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。 from threading import Thread from threading import currentThread from threading import enumerate from threading import activeCount def task(name): print(666) if __name__ == ‘__main__‘: t1 = Thread(target=task, args=(‘liyr‘,),name=‘呵呵呵‘) t1.start() print(t1.isAlive())#判断线程是否活着 t1.setName(‘哈哈哈‘)#设置一个线程名 print(t1.getName())#获取线程名 print(t1.name)#获取线程名(重要) print(currentThread())#返回当前的线程对象 print(enumerate())#获取当前进程的所有的线程对象列表 print(activeCount())#获取当前活跃的线程数量(重要) print(f‘==主线程‘)
八.互斥锁
1.互斥锁
互斥锁是控制同步带来的竞争,保证数据的安全性的一种措施
from threading import Thread from threading import Lock import time import random x = 100 def task(lock): lock.acquire() # time.sleep(random.randint(1,2)) global x temp = x time.sleep(0.01) temp = temp - 1 x = temp lock.release() if __name__ == ‘__main__‘: mutex = Lock() l1 = [] for i in range(100): t = Thread(target=task,args=(mutex,)) l1.append(t) t.start() time.sleep(3) print(f‘主线程{x}‘)
2.死锁
死锁是指两个线程(进程)在运行过程中由于争夺资源而造成的一种阻塞状态。
产生死锁的原因:1.资源竞争 2.进程间推进顺序不合法
如图:线程A持有锁,线程B持有锁b,下一步线程A等待获取锁b,线程B等待获取锁a,两者都未释放,造成死锁现象。
from threading import Thread,Lock import time lock_A = Lock() lock_B = Lock() class MyThread(Thread): def run(self): self.f1() self.f2() def f1(self): lock_A.acquire() print(f"{self.name}抢到了A锁") lock_B.acquire() print(f"{self.name}抢到了B锁") lock_B.release() print(f"{self.name}释放了B锁") lock_A.release() print(f"{self.name}释放了A锁") def f2(self): lock_B.acquire() print(f"{self.name}抢到了B锁") time.sleep(0.1) lock_A.acquire() print(f"{self.name}抢到了A锁") lock_A.release() print(f"{self.name}释放了A锁") lock_B.release() print(f"{self.name}释放了B锁") if __name__ == ‘__main__‘: for i in range(3): t = MyThread() t.start()
3.递归锁
递归锁可以解决死锁现象,当业务需要多个锁时,优先考虑递归锁.指向同一把锁:lock_a = lock_b = Rlock()
1
|
mutexA = mutexB = threading.RLock() #一个线程拿到锁,counter加1,该线程内又碰到加锁的情况,则counter继续加1,这期间所有其他线程都只能等待,等待该线程释放所有锁,即counter递减到0为止
|
from threading import Thread,RLock import time lock_A = lock_B = RLock() class MyThread(Thread): def run(self): self.f1() self.f2() def f1(self): lock_A.acquire() print(f"{self.name}抢到了A锁") lock_B.acquire() print(f"{self.name}抢到了B锁") lock_B.release() print(f"{self.name}释放了B锁") lock_A.release() print(f"{self.name}释放了A锁") def f2(self): lock_B.acquire() print(f"{self.name}抢到了B锁") time.sleep(0.1) lock_A.acquire() print(f"{self.name}抢到了A锁") lock_A.release() print(f"{self.name}释放了A锁") lock_B.release() print(f"{self.name}释放了B锁") if __name__ == ‘__main__‘: for i in range(3): t = MyThread() t.start()
4.信号量
信号量也是一把锁,用来控制并发数量
1
2
3
4
5
6
|
Semaphore管理一个内置的计数器, 每当调用acquire()时内置计数器 - 1 ;
调用release() 时内置计数器 + 1 ;
计数器不能小于 0 ;当计数器为 0 时,acquire()将阻塞线程直到其他线程调用release()。
与进程池是完全不同的概念,进程池Pool( 4 ),最大只能产生 4 个进程,而且从头到尾都只是这四个进程,不会产生新的,而信号量是产生一堆线程 / 进程
|
from threading import Thread, Semaphore, current_thread import time import random sem = Semaphore(5) def task(): sem.acquire() print(f‘{current_thread().name} 厕所ing‘) time.sleep(random.randint(1,3)) sem.release() if __name__ == ‘__main__‘: for i in range(20): t = Thread(target=task,) t.start() #同一时间内,只有指定数目的线程数运行任务,当其中的某几个运行完了,立马有相应的线程再来执行。
九.GIL全局锁(the Global Interpreter Lock)
1.GIL全局锁
首先需要明确的一点是GIL
并不是Python的特性,它是在实现Python解析器(CPython)时所引入的一个概念,Python完全可以不依赖于GIL。
GIL本质就是一把互斥锁,互斥锁的本质都一样,都是将并发变成串行,保证数据的安全。Jpython,pypy都没有GIL锁。
1.为什么使用GIL全局锁 1.当时都是单核时代,而且cpu价格非常贵. 2.如果不加全局解释器锁, 开发Cpython解释器的程序员就会在源码内部各种主动加锁,解锁,非常麻烦,各种死锁现象等等.他为了省事儿,直接进入解释器时给线程加一个锁. 2.优缺点 优点:保证了Cpython解释器的数据资源的安全 缺点:单个进程的多线程不能使用多核 3.GIL全局锁是针对单个进程中线程的,同一时间内,只能有一个线程获取这把锁。 4.所有线程的任务,都需要将任务的代码当做参数传给解释器的代码去执行,即所有的线程要想运行自己的任务,首先需要解决的是能够访问到解释器的代码 5.一遇到IO,GIL锁就释放。
6.为什么有了GIL锁还要自己加锁,GIL锁不是已经保证了同一进程下线程是串行吗?
这个问题主要还是IO,一旦线程中有了IO,如果不加锁,GIL会自动释放,此时又有其他线程抢到,那么线程执行的任务中数据就不安全。
如图:python解释器分为虚拟机和编译器两部分,一个线程中的代码作为参数传给python解释器,由编译器将其转为c语言识别的字节码,然后由虚拟机将字节码转为机器语言,操作系统调用cpu来执行。
from threading import Thread import time x = 100 def task(): global x temp = x x = temp - 1 if __name__ == ‘__main__‘: for i in range(100): t = Thread(target=task,) t.start() print(x) #打印结果:0 #因为GIL锁,同一时间内只有一个线程执行任务,所以这是串行
from threading import Thread import time x = 100 def task(): global x temp = x time.sleep(0.001) x = temp - 1 if __name__ == ‘__main__‘: for i in range(100): t = Thread(target=task,) t.start() print(x) # 打印结果:94,95...不定,因为有阻塞(time.sleep()模仿),在0.001s之内可能有好几个线程拿到x值,剩下的再进行减1
2.GIL和Lock锁的区别
1
2
3
4
5
6
7
|
相同点: 都是互斥锁
不同点: 1.GIL 保护python解释器内部数据资源的安全
2.GIL 上锁、释放锁无需手动操作
3. 自己代码中定义的互斥锁保护进程中的资源数据的安全.
4. 自己定义的互斥锁必须自己手动上锁,释放锁.
|
3.验证计算密集型IO密集型的效率
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
#分析: 我们有四个任务需要处理,处理方式肯定是要玩出并发的效果,解决方案可以是: 方案一:开启四个进程 方案二:一个进程下,开启四个线程 #单核情况下,分析结果: 如果四个任务是计算密集型,没有多核来并行计算,方案一徒增了创建进程的开销,方案二胜 如果四个任务是I / O密集型,方案一创建进程的开销大,且进程的切换速度远不如线程,方案二胜
#多核情况下,分析结果: 如果四个任务是计算密集型,多核意味着并行计算,在python中一个进程中同一时刻只有一个线程执行用不上多核,方案一胜 如果四个任务是I / O密集型,再多的核也解决不了I / O问题,方案二胜
#结论:现在的计算机基本上都是多核,python对于计算密集型的任务开多线程的效率并不能带来多大性能上的提升,甚至不如串行(没有大量切换),但是,对于IO密集型的任务效率还是有显著提升的。 |
# import os # print(os.cpu_count()) # 计算密集型用多进程 # from multiprocessing import Process # import time # def task(): # i = 0 # for j in range(100000000): # i+=1 # # if __name__ == ‘__main__‘: # l = [] # start_time = time.time() # for i in range(4): # p = Process(target=task) # l.append(p) # p.start() # for j in l: # j.join() # print(time.time() - start_time) # 计算密集型用多线程 # from threading import Thread # import time # def task(): # i = 0 # for j in range(100000000): # i+=1 # # if __name__ == ‘__main__‘: # l = [] # start_time = time.time() # for i in range(4): # p = Thread(target=task) # l.append(p) # p.start() # for j in l: # j.join() # print(time.time() - start_time) # io密集型用多进程 # from multiprocessing import Process # import time # def task(): # time.sleep(3) # if __name__ == ‘__main__‘: # l = [] # start_time = time.time() # for i in range(4): # p = Process(target=task) # l.append(p) # p.start() # for j in l: # j.join() # print(time.time() - start_time) # io密集型用多线程 # from threading import Thread # import time # def task(): # time.sleep(3) # # if __name__ == ‘__main__‘: # l = [] # start_time = time.time() # for i in range(4): # p = Thread(target=task) # l.append(p) # p.start() # for j in l: # j.join() # print(time.time() - start_time)
十.多线程实现socket通信
from threading import Thread import socket phone = socket.socket() phone.bind((‘127.0.0.1‘,8888)) phone.listen(4) def communite(conn): while 1: try: ret = conn.recv(1024) print(ret.decode("utf-8")) msg = input(">>>") conn.send(msg.encode("utf-8")) except Exception: break conn.close() while 1: conn,addr = phone.accept() t = Thread(target=communite,args=(conn,)) t.start() # communite(conn) phone.close()
import socket client = socket.socket() client.connect((‘127.0.0.1‘,8888)) while 1: msg = input(">>>") client.send(msg.encode("utf-8")) ret = client.recv(1024) print(ret.decode("utf-8")) client.close()
十一..进程池、线程池
我们不能无休止的添加线程(进程),这样需要用到线程池(进程池),是容纳的最多的线程数(进程数)。
1.提交任务的两种方式: 同步调用(提交任务,等待任务执行完再继续向下执行) 异步调用(提交任务完成,就立马向下执行,不再等待。一般和回调配合使用,异步处理IO多的,回调处理IO少的) 2.任务执行的三种方式: 阻塞:(程序运行时,遇到IO,程序挂起,cpu被切走) 阻塞 非阻塞:(程序没有遇到IO,或遇到IO但通过某种手段让程序继续执行) 就绪 执行 3.回调函数:按顺序接受每个任务的结果,进行下一步的处理。
concurrent.futures 模块提供了高度封装的异步调用接口 ThreadPoolExecutor:线程池,提供异步调用 ProcessPoolExecutor:进程池:提供异步调用 基本方法: 1.submit() #异步提交任务 2.map(func,*iterables,timeout=None) 取代for循环的submit操作 3.shutdown(wait=True) 相当于进程池的pool.close()+pool.join()操作 wait=True,等待池内所有任务执行完毕回收资源再继续 wait=False,立即返回,并不会等到池内的任务执行完毕 但不管wait参数为何值,整个程序都会等到所有任务执行完毕 submit和map必须在shutdown之前 4.result() #取得submit的结果 5.add_done_callback(fn) #回调函数
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor import os import time import random # print(os.cpu_count()) def task(n): print(f‘{os.getpid()} 接客‘) time.sleep(random.randint(1,3)) if __name__ == ‘__main__‘: # 开启进程池 (并行(并行+并发)) # p = ProcessPoolExecutor() # 默认不写,进程池里面的进程数与cpu个数相等 # # # p.submit(task,1) # # p.submit(task,1) # # p.submit(task,1) # # p.submit(task,1) # # p.submit(task,1) # # p.submit(task,1) # # p.submit(task,1) # for i in range(20): # p.submit(task,i) # # 开启线程池 (并发) t = ThreadPoolExecutor() # 默认不写, cpu个数*5 线程数 # t = ThreadPoolExecutor(100) # 100个线程 for i in range(20): t.submit(task,i)
import time import os import threading from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor def func(n): time.sleep(2) print(‘%s打印的:‘%(threading.get_ident()),n) return n*n tpool = ThreadPoolExecutor(max_workers=5) #默认一般起线程的数据不超过CPU个数*5 # tpool = ProcessPoolExecutor(max_workers=5) #进程池的使用只需要将上面的ThreadPoolExecutor改为ProcessPoolExecutor就行了,其他都不用改 #异步执行 t_lst = [] for i in range(5): t = tpool.submit(func,i) #提交执行函数,返回一个结果对象,i作为任务函数的参数 def submit(self, fn, *args, **kwargs): 可以传任意形式的参数 t_lst.append(t) # # print(t.result()) #这个返回的结果对象t,不能直接去拿结果,不然又变成串行了,可以理解为拿到一个号码,等所有线程的结果都出来之后,我们再去通过结果对象t获取结果 tpool.shutdown() #起到原来的close阻止新任务进来 + join的作用,等待所有的线程执行完毕 print(‘主线程‘) for ti in t_lst: print(‘>>>>‘,ti.result()
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import threading import os,time,random def task(n): print(‘%s is runing‘ %threading.get_ident()) time.sleep(random.randint(1,3)) return n**2 if __name__ == ‘__main__‘: executor=ThreadPoolExecutor(max_workers=3) # for i in range(11): # future=executor.submit(task,i) s = executor.map(task,range(1,5)) #map取代了for+submit print([i for i in s])
十二:阻塞,非阻塞,异步,同步
阻塞(站在程序运行的角度):程序遇到IO立马会停止(挂起),CPU马上切换,等到IO结束之后,再执行
非阻塞(站在程序运行的角度):程序没有IO或者遇到IO后可以通过某种手段让CPU去执行其他的任务,尽可能的占用CPU(协程)
异步(站在程序运行的角度):所有的任务同时发出,我就继续执行下一个代码,等结果#所有的任务同时发出,我就继续执行下一个代码,等结果 #异步执行任务的方式一:将所有的任务结果统一回收 # 异步调用返回值如何接收? 未解决. from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor import time import random import os def task(i): print(f‘{os.getpid()}开始任务‘) time.sleep(random.randint(1,3)) print(f‘{os.getpid()}任务结束‘) return i if __name__ == ‘__main__‘: # 异步调用 pool = ProcessPoolExecutor() l1 = [] for i in range(10): obj = pool.submit(task,i) l1.append(obj) pool.shutdown(wait=True) # shutdown: 让我的主进程等待进程池中所有的子进程都结束任务之后,在执行. 有点类似与join. # shutdown: 在上一个进程池没有完成所有的任务之前,不允许添加新的任务. # 一个任务是通过一个函数实现的,任务完成了他的返回值就是函数的返回值. print(l1) for i in l1: print(i.result()) print(‘===主‘) # 统一回收结果: 我不能马上收到任何一个已经完成的任务的返回值,我只能等到所有的任务全部结束统一回收.
同步(站在程序运行的角度):任务发布出去之后,自任务开始运行,直到这个任务最终结束之后,给我一个结果,我再发布下个任务
#任务发布出去之后,自任务开始运行,直到这个任务最终结束之后,给我一个结果,我再发布下个任务 # 2. 同步调用 from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor import time import random import os def task(i): print(f‘{os.getpid()}开始任务‘) time.sleep(random.randint(1,3)) print(f‘{os.getpid()}任务结束‘) return i if __name__ == ‘__main__‘: # 同步调用 pool = ProcessPoolExecutor() for i in range(10): obj = pool.submit(task,i) # obj是一个动态对象,返回的当前的对象的状态,有可能运行中,可能(就绪阻塞),还可能是结束了. # obj.result() 必须等到这个任务完成后,返回了结果之后,在执行下一个任务. print(f‘任务结果:{obj.result()}‘) pool.shutdown(wait=True) # shutdown: 让我的主进程等待进程池中所有的子进程都结束任务之后,在执行. 有点类似与join. # shutdown: 在上一个进程池没有完成所有的任务之前,不允许添加新的任务. # 一个任务是通过一个函数实现的,任务完成了他的返回值就是函数的返回值. print(‘===主‘)
异步+回调函数(不是万能的)