线程Queue、定时器、进程池和线程池、同步异步
2020-12-13 14:12
标签:方式 说明 lse 导致 end erro add 同步异步 call 目录 queue模块下提供了几个阻塞队列,这些队列主要用于实现线程通信。在queue模块下主要提供了三个类,分别代表三种队列,它们的主要区别就在于进队列、出队列的不同。 关于这三个队列类的简单介绍如下: 这三个队列类的属性和方法基本相同, 它们都提供了如下属性和方法: 下面看具体代码 Thread 类有一个 Timer子类,该子类可用于控制指定函数在特定时间内执行一次。例如如下程序: 需要说明的是,Timer 只能控制函数在指定时间内执行一次,如果要使用 Timer 控制函数多次重复执行,则需要再执行下一次调度。 用池的功能限制进程数或线程数。 为什么要限制:当并发的任务数量远远大于计算机所能承受的范围,即无法一次性开启过多的任务数量,就应该考虑去限制进程数或线程数,从而保证服务器不崩。 系统启动一个新线程的成本是比较高的,因为它涉及与操作系统的交互。在这种情形下,使用线程池可以很好的提升性能,尤其是当程序中需要创建大量生存期很短暂的线程时,更应该考虑使用线程池。 线程池在系统启动时即创建大量空闲的线程,程序只要将一个函数提交给线程池,线程池就会启动一个空闲的线程来执行它。当该函数执行结束后,该线程并不会死亡,而是再次返回到线程池中变成空闲状态,等待执行下一个函数。 此外,使用线程池可以有效地控制系统中并发线程的数量。当系统中包含有大量的并发线程时,会导致系统性能急剧下降,甚至导致 Python 解释器崩溃,而线程池的最大线程数参数可以控制系统中并发线程的数量不超过此数。 线程池的基类是 concurrent.futures 模块中的 Executor,Executor 提供了两个子类,即 ThreadPoolExecutor 和 ProcessPoolExecutor,其中 ThreadPoolExecutor 用于创建线程池,而 ProcessPoolExecutor 用于创建进程池。 如果使用线程池/进程池来管理并发编程,那么只要将相应的 task 函数提交给线程池/进程池,剩下的事情就由线程池/进程池来搞定。 Exectuor 提供了如下常用方法: 程序将 task 函数提交(submit)给线程池后,submit 方法会返回一个 Future 对象,Future 类主要用于获取线程任务函数的返回值。由于线程任务会在新线程中以异步方式执行,因此,线程执行的函数相当于一个“将来完成”的任务,所以 Python 使用 Future 来代表。 Future 提供了如下方法: 使用线程池来执行线程任务: 上述代码就是只有0,1,2,3四个线程来执行20次任务,同一时间只能有四个任务执行,只有能一个线程执行完一个任务,空出来了,才能执行下一个任务。 使用进程池来执行进程任务 不要和信号量用混了,线程池里面始终没有产生新的线程,比如ThreadPoolExecutor(4),所有的任务始终是由这4个线程去执行。 是提交任务的两种方式 提交了一个任务,必须等任务执行完 (拿到返回值)才能执行下一行代码 提交了一个任务,不要等任务执行完,可以直接执行下一行代码 服务端 客户端 线程Queue、定时器、进程池和线程池、同步异步 标签:方式 说明 lse 导致 end erro add 同步异步 call 原文地址:https://www.cnblogs.com/zhuangyl23/p/11553183.html
线程Queue、定时器、进程池和线程池、多线程socket通信
一、Queue队列实现线程通信
# 先进先出
import queue
q = queue.Queue()
q.put('123')
q.put('qweqwe')
print(q.get()) # 先拿到的是123
print(q.get())
# print(q.get()) # 两个值都取光了,队列为空,这里再取就会阻塞,直到队列中有值
q.task_done() # 结束任务
# q.join()
# 先进后出
import queue
q = queue.LifoQueue()
q.put('杨蓬蓬吃饭')
q.put('杨蓬蓬上厕所')
q.put('杨蓬蓬睡觉')
print(q.get()) # 后进先出,先取到杨蓬蓬睡觉
print(q.get())
print(q.get()) # 先进后出,最后取到杨蓬蓬吃饭
# 优先级
import queue
q = queue.PriorityQueue() # 可以根据优先级取数据
# 通常这个元组的第一个值是int类型,第一个值越小,优先级越高
q.put((50,'吃饭'))
q.put((80,'睡觉'))
q.put((1,'敲代码'))
print(q.get()) # 1最小,先取到敲代码 (1, '敲代码')
print(q.get()) # (50, '吃饭')
print(q.get()) # (80, '睡觉')
二、线程定时器(Timer)
from threading import Thread,Timer
import time
def task():
print('线程执行了')
time.sleep(2)
print('线程结束了')
t = Timer(4,task) # 指定4s后开启一个线程执行task
t.start()
三、进程池和线程池
from concurrent.futures import ThreadPoolExecutor
from threading import currentThread
import time
def task(i):
time.sleep(1.5)
print(f"{currentThread().name}在执行任务{i+1}")
return i**2
if __name__ == '__main__':
fu_list = []
pool = ThreadPoolExecutor(4) #规定线程池有4个线程
for i in range(20): #模拟20个线程,task要做20次,4个线程负责做这个事情
future = pool.submit(task,i) #for循环一次,提交一次
fu_list.append(future) #先把提交的数据意义放到这个列表里面
pool.shutdown() # 关闭池的入口,不让你往里面再放东西
for fu in fu_list: #依次循环列表里面的值
print(fu.result()) #打印返回值
-----------------------------------------------------------------------------
ThreadPoolExecutor-0_0在执行任务1
ThreadPoolExecutor-0_2在执行任务3
ThreadPoolExecutor-0_1在执行任务2
ThreadPoolExecutor-0_3在执行任务4
ThreadPoolExecutor-0_0在执行任务5
ThreadPoolExecutor-0_1在执行任务7
ThreadPoolExecutor-0_2在执行任务6
ThreadPoolExecutor-0_3在执行任务8
ThreadPoolExecutor-0_0在执行任务9
ThreadPoolExecutor-0_2在执行任务11
ThreadPoolExecutor-0_1在执行任务10
ThreadPoolExecutor-0_3在执行任务12
ThreadPoolExecutor-0_0在执行任务13
ThreadPoolExecutor-0_2在执行任务14
ThreadPoolExecutor-0_1在执行任务15
ThreadPoolExecutor-0_3在执行任务16
ThreadPoolExecutor-0_0在执行任务17
ThreadPoolExecutor-0_2在执行任务18
ThreadPoolExecutor-0_1在执行任务19
ThreadPoolExecutor-0_3在执行任务20
0
1
4
9
16
25
36
49
64
81
100
121
144
169
196
225
256
289
324
361
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import current_process
import time
def task(i):
time.sleep(1)
print(f"{current_process().name}在执行任务{i+1}")
time.sleep(1)
return i**2
if __name__ == '__main__':
fu_list = []
pool = ProcessPoolExecutor(4) #规定进程池有是个线程
for i in range(20): #模拟20个进程,task要做20次,4个进程负责做这个事情
future = pool.submit(task,i) #for循环一次,提交一次
fu_list.append(future) #先把提交的数据意义放到这个列表里面
pool.shutdown() # 关闭池的入口,不让你往里面再放东西
for fu in fu_list:
print(fu.result())
-----------------------------------------------------------------------------
Process-1在执行任务1
Process-2在执行任务2
Process-3在执行任务3
Process-4在执行任务4
Process-1在执行任务5
Process-2在执行任务6
Process-3在执行任务7
Process-4在执行任务8
Process-1在执行任务9
Process-2在执行任务10
Process-3在执行任务11
Process-4在执行任务12
Process-1在执行任务13
Process-2在执行任务14
Process-3在执行任务15
Process-4在执行任务16
Process-1在执行任务17
Process-2在执行任务18
Process-3在执行任务19
Process-4在执行任务20
0
1
4
9
16
25
36
49
64
81
100
121
144
169
196
225
256
289
324
361
四、同步和异步
4.1、同步
import os
import time
import random
from multiprocessing import Process
def work(n):
print(f'{n}: {os.getpid()} is running' )
time.sleep(random.randint(1,3))
print('%s:%s is done' %(n,os.getpid()))
if __name__ == '__main__':
for i in range(3): #这种就是同步了
p=Process(target=work,args=(i,))
p.start()
-----------------------------------------------------------------------------
1: 7504 is running
0: 15736 is running
2: 17896 is running
2:17896 is done
1:7504 is done
0:15736 is done
4.2 、异步
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import current_process
import time
def task(i):
time.sleep(1)
print(f"{current_process().name}在执行任务{i+1}")
time.sleep(1)
return i**2
def parse(future):
print(future.result())
if __name__ == '__main__':
fu_list = []
pool = ProcessPoolExecutor(4) #规定进程池有是个线程
for i in range(20): #模拟20个进程,task要做20次,4个进程负责做这个事情
future = pool.submit(task,i) #for循环一次,提交一次
future.add_done_callback(parse)
# 为当前任务绑定了一个函数,在当前任务执行结束的时候会触发这个函数,
# 会把future对象作为参数传给函数
# 这个称之为回调函数,处理完了回来就调用这个函数.
-----------------------------------------------------------------------------
Process-1在执行任务1
Process-2在执行任务2
Process-3在执行任务3
Process-4在执行任务4
1
0
4
9
Process-2在执行任务5
Process-1在执行任务6
Process-3在执行任务7
Process-4在执行任务8
16
25
36
49
Process-2在执行任务9
Process-1在执行任务10
Process-3在执行任务11
Process-4在执行任务12
64
81
100
121
Process-2在执行任务13
Process-1在执行任务14
Process-3在执行任务15
Process-4在执行任务16
144
169
196
225
Process-2在执行任务17
Process-1在执行任务18
Process-3在执行任务19
Process-4在执行任务20
256
289
324
361
五、多线程socket升级
import socket
from threading import Thread
def talk(conn):
while True:
try:
msg = conn.recv(1024)
if len(msg) == 0: break
conn.send(msg.upper())
except connectionResetError
print('客户端关闭了一个链接')
break
conn.close()
def serve_demo():
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(('192.168.11.199', 8010))
server.listen(5)
while True:
conn, addr = server.accept()
t = Thread(target=talk, args(conn,))
t.start()
if __name__ == '__main__':
server_demo()
import socket
from threading import Thread
def client_demo():
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(('192.168.11.199', 8010))
while True:
msg = f'{currentThread().name}'
if len(msg) == 0: break
client.send(msg.encode('utf-8'))
feedback = client.recv(1024)
print(feedback.decode('utf-8'))
client.close()
if __name__ == '__main__':
for i in range(5):
t = Thread(target=client_demo)
t.start()