Python入门学习-DAY37-进程池与线程池、协程、gevent模块
2021-07-03 14:04
标签:OLE 程序 for serve 基本 join() while 技术 parse 进程池和线程池操作一样 同步调用:提交完一个任务之后,就在原地等待,等待任务完完整整地运行完毕拿到结果后,再执行下一行代码,会导致任务是串行执行的 异步调用:提交完一个任务之后,不在原地等待,结果???,而是直接执行下一行代码,会导致任务是并发执行的 在线程下实现并发 协程是单线程实现并发 在单线程下实现多个任务间遇到IO就切换就可以降低单线程的IO时间,从而最大限度地提升单线程的效率 Python入门学习-DAY37-进程池与线程池、协程、gevent模块 标签:OLE 程序 for serve 基本 join() while 技术 parse 原文地址:https://www.cnblogs.com/xvchengqi/p/9621989.html一、进程池与线程池
基本使用:
提交任务的两种方式:
同步调用
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import time,random,os
def task():
print(‘%s is running‘%os.getpid())
i=random.randint(1,3)
time.sleep(i)
return i
if __name__ == ‘__main__‘:
p=ProcessPoolExecutor(4)
l=[]
for i in range(10):
res = p.submit(task).result()#等待任务执行完毕,返回结果
print(res)
print(‘主‘)
异步调用
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import time,random,os
def task():
print(‘%s is running‘%os.getpid())
i=random.randint(1,3)
time.sleep(i)
return i
if __name__ == ‘__main__‘:
p=ProcessPoolExecutor(4)
l=[]
for i in range(10):
future=p.submit(task)#只替提交任务
l.append(future)
p.shutdown(wait=True)#关闭进程池入口,并在原地等待所有进程任务执行完毕
for i in l:
print(i.result())
print(‘主‘)
异步 + 回调函数
from concurrent.futures import ProcessPoolExecutor
import time,os
import requests
def get(url):
print(‘%s GET %s‘ %(os.getpid(),url))
time.sleep(3)
response=requests.get(url)
if response.status_code == 200:
res=response.text
else:
res=‘下载失败‘
return res
def parse(future):
time.sleep(1)
res=future.result()
print(‘%s 解析结果为%s‘ %(os.getpid(),len(res)))
if __name__ == ‘__main__‘:
urls=[
‘https://www.baidu.com‘,
‘https://www.sina.com.cn‘,
‘https://www.tmall.com‘,
‘https://www.jd.com‘,
‘https://www.python.org‘,
‘https://www.openstack.org‘,
‘https://www.baidu.com‘,
‘https://www.baidu.com‘,
‘https://www.baidu.com‘,
]
p=ProcessPoolExecutor(9)
start=time.time()
for url in urls:
future=p.submit(get,url)
future.add_done_callback(parse)
#parse会在任务运行完毕后自动触发,然后接收一个参数future对象,回调函数的执行是在主进程里,而线程中的回调函数是由空闲的线程来执行
p.shutdown(wait=True)
print(‘主‘,time.time()-start)
print(‘主‘,os.getpid())
基于线程池的套接字通讯
服务端
from concurrent.futures import ThreadPoolExecutor
import socket
from threading import current_thread
IP=‘127.0.0.1‘
PORT=8085
ADDRESS=(IP,PORT)
BUFFSIZE=1024
t = ThreadPoolExecutor(4)
def communicate(conn,addr):
while True:
try:
data=conn.recv(BUFFSIZE)
if not data:
print(‘%s客户端断开....‘%addr)
break
print(‘>>>>%s 端口:%s 线程:%s‘%(data.decode(‘utf-8‘),addr[1],current_thread().name))
conn.send(data.upper())
except ConnectionResetError:
break
conn.close()
if __name__ == ‘__main__‘:
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(ADDRESS)
server.listen(2)
print(current_thread().name)
while True:
conn,addr=server.accept()
t.submit(communicate, conn,addr)
客户端
import socket
IP=‘127.0.0.1‘
PORT=8085
ADDRESS=(IP,PORT)
BUFFSIZE=1024
client=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
client.connect(ADDRESS)
while True:
msg=input(‘>>>>‘).strip()
if len(msg)==0:continue
if msg==‘q‘:break
client.send(msg.encode(‘utf-8‘))
data = client.recv(BUFFSIZE)
print(data.decode(‘utf-8‘))
client.close()
二、协程
1. 目标:
并发(多个任务看起来是同时执行就是并发):切换+保存状态2. 协程:
注意:协程是程序员意淫出来的东西,操作系统里只有进程和线程的概念(操作系统调度的是线程)串行执行
import time
def func1():
for i in range(10000000):
i+1
def func2():
for i in range(10000000):
i+1
start = time.time()
func1()
func2()
stop = time.time()
print(stop - start)#1.9774692058563232s
基于yield并发执行
import time
def func1():
while True:
print(‘func1‘)
yield
def func2():
g=func1()
for i in range(1000):
print(‘func2‘)
i+1
next(g)
start=time.time()
func2()
stop=time.time()
print(stop-start)#0.014994382858276367s
三、gevent模块
1.使用
from gevent import monkey;monkey.patch_all()#用来识别IO阻塞,必须放到文件头
from gevent import spawn,joinall
import time
def foo1(name):
print(‘%s play1‘%name)
time.sleep(2)#模拟IO操作,遇到IO切换任务
print(‘%s play2‘%name)
def foo2(name):
print(‘%s eat1‘%name)
time.sleep(3)#模拟IO操作,遇到IO切换任务
print(‘%s eat2‘%name)
f1=spawn(foo1,‘egon‘)#提交任务
f2=spawn(foo2,‘egon‘)#提交任务
joinall([f1,f2])#主线程等待任务完成
print(‘主‘)
#结果:
#egon play1
#egon eat1
#egon play2
#egon eat2
#主
2.基于gevent的套接字通信
服务端
from gevent import monkey;monkey.patch_all()
from gevent import spawn
import socket
from threading import current_thread
IP=‘127.0.0.1‘
PORT=8086
ADDRESS=(IP,PORT)
BUFFSIZE=1024
def communicate(conn,addr):
while True:
try:
data=conn.recv(BUFFSIZE)
if not data:
print(‘%s客户端断开....‘%addr)
break
conn.send(data.upper())
except ConnectionResetError:
break
def server():
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(ADDRESS)
server.listen(2)
print(current_thread().name)
while True:
conn,addr=server.accept()
spawn(communicate,conn,addr)
if __name__ == ‘__main__‘:
s1=spawn(server)
s1.join()
多个客户端并发
import socket
from threading import Thread,current_thread
IP = ‘127.0.0.1‘
PORT = 8086
ADDRESS = (IP, PORT)
BUFFSIZE = 1024
def client():
client=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
client.connect(ADDRESS)
n=0
while True:
msg=‘%s say hello %s‘ %(current_thread().name,n)
n+=1
client.send(msg.encode(‘utf-8‘))
data=client.recv(BUFFSIZE)
print(data.decode(‘utf-8‘))
if __name__ == ‘__main__‘:
for i in range(500):
t=Thread(target=client)
t.start()
上一篇:算法竞赛训练指南2.1 计数方法
下一篇:操作系统-并发-线程-进程
文章标题:Python入门学习-DAY37-进程池与线程池、协程、gevent模块
文章链接:http://soscw.com/essay/101292.html