Python入门学习-DAY37-进程池与线程池、协程、gevent模块

2021-07-03 14:04

阅读:772

标签:OLE   程序   for   serve   基本   join()   while   技术   parse   

一、进程池与线程池

基本使用:

  进程池和线程池操作一样

提交任务的两种方式:

同步调用:提交完一个任务之后,就在原地等待,等待任务完完整整地运行完毕拿到结果后,再执行下一行代码,会导致任务是串行执行的

异步调用:提交完一个任务之后,不在原地等待,结果???,而是直接执行下一行代码,会导致任务是并发执行的

同步调用

技术分享图片技术分享图片
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import  time,random,os

def task():
    print(%s is running%os.getpid())
    i=random.randint(1,3)
    time.sleep(i)
    return i


if __name__ == __main__:
    p=ProcessPoolExecutor(4)

    l=[]
    for i in range(10):
        res = p.submit(task).result()#等待任务执行完毕,返回结果
        print(res)
    print()
View Code

异步调用

技术分享图片技术分享图片
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import  time,random,os

def task():
    print(%s is running%os.getpid())
    i=random.randint(1,3)
    time.sleep(i)
    return i


if __name__ == __main__:
    p=ProcessPoolExecutor(4)

    l=[]
    for i in range(10):
        future=p.submit(task)#只替提交任务
        l.append(future)
    p.shutdown(wait=True)#关闭进程池入口,并在原地等待所有进程任务执行完毕
    for i in l:
        print(i.result())
    print()
View Code

异步 + 回调函数

技术分享图片技术分享图片
from concurrent.futures import ProcessPoolExecutor
import time,os
import requests


def get(url):
    print(%s GET %s %(os.getpid(),url))
    time.sleep(3)
    response=requests.get(url)
    if response.status_code == 200:
        res=response.text
    else:
        res=下载失败
    return res

def parse(future):
    time.sleep(1)
    res=future.result()
    print(%s 解析结果为%s %(os.getpid(),len(res)))

if __name__ == __main__:
    urls=[
        https://www.baidu.com,
        https://www.sina.com.cn,
        https://www.tmall.com,
        https://www.jd.com,
        https://www.python.org,
        https://www.openstack.org,
        https://www.baidu.com,
        https://www.baidu.com,
        https://www.baidu.com,

    ]

    p=ProcessPoolExecutor(9)

    start=time.time()
    for url in urls:
        future=p.submit(get,url)
        future.add_done_callback(parse)
        #parse会在任务运行完毕后自动触发,然后接收一个参数future对象,回调函数的执行是在主进程里,而线程中的回调函数是由空闲的线程来执行

    p.shutdown(wait=True)

    print(,time.time()-start)
    print(,os.getpid())
View Code

 

基于线程池的套接字通讯

服务端

技术分享图片技术分享图片
from concurrent.futures import ThreadPoolExecutor
import socket
from threading import current_thread
IP=127.0.0.1
PORT=8085
ADDRESS=(IP,PORT)
BUFFSIZE=1024
t = ThreadPoolExecutor(4)


def communicate(conn,addr):
    while True:
        try:
            data=conn.recv(BUFFSIZE)
            if not data:
                print(%s客户端断开....%addr)
                break
            print(>>>>%s  端口:%s 线程:%s%(data.decode(utf-8),addr[1],current_thread().name))
            conn.send(data.upper())
        except ConnectionResetError:
            break
    conn.close()

if __name__ == __main__:
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.bind(ADDRESS)
    server.listen(2)
    print(current_thread().name)
    while True:
        conn,addr=server.accept()
        t.submit(communicate, conn,addr)
View Code

客户端

技术分享图片技术分享图片
import socket
IP=127.0.0.1
PORT=8085
ADDRESS=(IP,PORT)
BUFFSIZE=1024

client=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
client.connect(ADDRESS)

while True:
    msg=input(>>>>).strip()
    if len(msg)==0:continue
    if msg==q:break
    client.send(msg.encode(utf-8))
    data = client.recv(BUFFSIZE)
    print(data.decode(utf-8))

client.close()
View Code

 

 

二、协程

1. 目标:

在线程下实现并发
并发(多个任务看起来是同时执行就是并发):切换+保存状态

2. 协程:

协程是单线程实现并发
注意:协程是程序员意淫出来的东西,操作系统里只有进程和线程的概念(操作系统调度的是线程)

在单线程下实现多个任务间遇到IO就切换就可以降低单线程的IO时间,从而最大限度地提升单线程的效率

串行执行

技术分享图片技术分享图片
import time

def func1():
    for i in range(10000000):
        i+1

def func2():
    for i in range(10000000):
        i+1

start = time.time()
func1()
func2()
stop = time.time()
print(stop - start)#1.9774692058563232s
View Code

基于yield并发执行

技术分享图片技术分享图片
import time
def func1():
    while True:
        print(func1)
        yield

def func2():
    g=func1()
    for i in range(1000):
        print(func2)
        i+1
        next(g)

start=time.time()
func2()
stop=time.time()
print(stop-start)#0.014994382858276367s
View Code

 

三、gevent模块

1.使用

 

from gevent import monkey;monkey.patch_all()#用来识别IO阻塞,必须放到文件头
from gevent import spawn,joinall
import time


def foo1(name):
    print(%s  play1%name)
    time.sleep(2)#模拟IO操作,遇到IO切换任务
    print(%s  play2%name)


def foo2(name):
    print(%s  eat1%name)
    time.sleep(3)#模拟IO操作,遇到IO切换任务
    print(%s  eat2%name)

f1=spawn(foo1,egon)#提交任务
f2=spawn(foo2,egon)#提交任务

joinall([f1,f2])#主线程等待任务完成
print()

#结果:
  #egon play1
  #egon eat1
  #egon play2
  #egon eat2
  #

 

2.基于gevent的套接字通信

服务端

技术分享图片技术分享图片
from gevent import monkey;monkey.patch_all()
from gevent import spawn
import socket
from threading import current_thread
IP=127.0.0.1
PORT=8086
ADDRESS=(IP,PORT)
BUFFSIZE=1024

def communicate(conn,addr):
    while True:
        try:
            data=conn.recv(BUFFSIZE)
            if not data:
                print(%s客户端断开....%addr)
                break
            conn.send(data.upper())
        except ConnectionResetError:
            break


def server():
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.bind(ADDRESS)
    server.listen(2)
    print(current_thread().name)
    while True:
        conn,addr=server.accept()
        spawn(communicate,conn,addr)

if __name__ == __main__:
    s1=spawn(server)
    s1.join()
View Code

多个客户端并发

技术分享图片技术分享图片
import socket
from threading import Thread,current_thread

IP = 127.0.0.1
PORT = 8086
ADDRESS = (IP, PORT)
BUFFSIZE = 1024
def client():


    client=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    client.connect(ADDRESS)
    n=0
    while True:
        msg=%s say hello %s %(current_thread().name,n)
        n+=1
        client.send(msg.encode(utf-8))
        data=client.recv(BUFFSIZE)
        print(data.decode(utf-8))

if __name__ == __main__:
    for i in range(500):
        t=Thread(target=client)
        t.start()
View Code

 

Python入门学习-DAY37-进程池与线程池、协程、gevent模块

标签:OLE   程序   for   serve   基本   join()   while   技术   parse   

原文地址:https://www.cnblogs.com/xvchengqi/p/9621989.html


评论


亲,登录后才可以留言!