Python 协程
2021-07-05 17:07
标签:对象 消息循环 装饰器 获得 forever mat ica name 3.2 不用for,占内存小,一边循环一边计算——时间换空间 next函数调用,到最后一个,报 生成: Step 1 注意:此时g是一个generator, next调用从上次yield后开始 1 定义:为非抢占式多任务产生子程序,可以暂停执行——像generator一样 关键字: -> start 分析:动脑子 协程终止:向上冒泡,发送哨符值,让协程退出 yield from:相当于加一个通道(协程与主线程间) 委派生成器:包含yield from的生成器函数 {‘boys_2‘: Res(count=9, average=40.422222222222224), ‘boys_1‘: Res(count=9, average=1.3888888888888888)} 解释: 步骤:创建消息循环(解决异步IO,有中转:相当于信箱,消息queue)-> 导入协程-> 关闭 更简洁,不用装饰器 介绍: 例: 注:查+理解 类似线程池 用multiprocessing实现真正并行计算——运行多个解释器 concurrent.furtures.Executor 例子: map(fn, *iterables, timeout=None): map和submit用一个就行 Future Python 协程 标签:对象 消息循环 装饰器 获得 forever mat ica name 3.2 原文地址:https://www.cnblogs.com/TuerLueur/p/9595731.html迭代器
isinstancle()
生成器
StopIteration
异常
g = (x * x for x in range(10)) # 中括号是列表生成器,小括号是生成器
def odd():
print('step 1')
yield 1
print('step 2')
yield(3)
print('step 3')
yield(5)
g = odd()
one = next(g)
print(one)
two = next(g)
print(two)
three = next(g)
print(three)
1
Step 2
2
Step 3
3
def fib(max):
n, a, b = 0, 0, 1
while n
1
2
3
5协程
yield
和send
def simple_coroutine(a):
print('-> start')
b = yield a
print('-> recived', a, b)
c = yield a + b
print('-> recived', a, b, c)
# runc
sc = simple_coroutine(5)
aa = next(sc) # 预激
print(aa)
bb = sc.send(6) # 5, 6
print(bb)
cc = sc.send(7) # 5, 6, 7
print(cc)
5
-> recived 5 6
11
-> recived 5 6 7def gen():
for c in 'AB':
yield c
print(list(gen()))
def gen_new():
yield from 'AB'
print(list(gen_new()))
from collections import namedtuple
ResClass = namedtuple('Res', 'count average')
# 子生成器
def averager():
total = 0.0
count = 0
average = None
while True:
term = yield
if term is None:
break
total += term
count += 1
average = total / count
return ResClass(count, average)
# 委派生成器
def grouper(storages, key):
while True:
# 获取averager()返回的值
storages[key] = yield from averager()
# 客户端代码
def client():
process_data = {
'boys_2': [39.0, 40.8, 43.2, 40.8, 43.1, 38.6, 41.4, 40.6, 36.3],
'boys_1': [1.38, 1.5, 1.32, 1.25, 1.37, 1.48, 1.25, 1.49, 1.46]
}
storages = {}
for k, v in process_data.items():
# 获得协程
coroutine = grouper(storages, k)
# 预激协程
next(coroutine)
# 发送数据到协程
for dt in v:
coroutine.send(dt)
# 终止协程
coroutine.send(None)
print(storages)
# run
client()
client()
函数开始,for k, v 循环内,每次创建一个新的grouper实例coroutinenext(coroutine)
预激协程,进入while True循环,调用averager()
,yield from处暂停for dt in v
结束后,grouper实例仍暂停,所以storages[key]的赋值还未完成coroutine.send(None)
后,term变为None,averager子生成器中止,抛出StopIteration,并将返回的数据包含在异常对象的value中,yield from 直接抓取 StopItration ,将异常对象的 value 赋值给 storages[key]asyncio
import threading
import asyncio
@asyncio.coroutine
def hello():
print('Hello world! (%s)' % threading.currentThread())
print('Starting......(%s)' % threading.currentThread())
yield from asyncio.sleep(3)
print('Done......(%s)' % threading.currentThread())
print('Hello again! (%s)' % threading.currentThread())
loop = asyncio.get_event_loop()
tasks = [hello(), hello()]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
async & await
import threading
import asyncio
async def hello():
print('Hello world! (%s)' % threading.currentThread())
print('Starting......(%s)' % threading.currentThread())
await asyncio.sleep(3)
print('Done......(%s)' % threading.currentThread())
print('Hello again! (%s)' % threading.currentThread())
loop = asyncio.get_event_loop()
tasks = [hello(), hello()]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
aiohttp
import asyncio
from aiohttp import web
async def index(request):
await asyncio.sleep(0.5)
return web.Response(body=b'
Index
')
async def hello(request):
await asyncio.sleep(0.5)
text = 'hello, %s!
' % request.match_info['name']
return web.Response(body=text.encode('utf-8'))
async def init(loop):
app = web.Application(loop=loop)
app.router.add_route('GET', '/', index)
app.router.add_route('GET', '/hello/{name}', hello)
srv = await loop.create_server(app.make_handler(), '127.0.0.1', 8000)
print('Server started at http://127.0.0.1:8000...')
return srv
loop = asyncio.get_event_loop()
loop.run_until_complete(init(loop))
loop.run_forever()concurrent.futures
from concurrent.futures import ThreadPoolExecutor
import time
def return_future(msg):
time.sleep(3)
return msg
# 创建一个线程池
pool = ThreadPoolExecutor(max_workers=2)
# 往线程池加入2个task
f1 = pool.submit(return_future, 'hello')
f2 = pool.submit(return_future, 'world')
print(f1.done())
time.sleep(3)
print(f2.done())
print(f1.result())
print(f2.result())
import time
import re
import os
import datetime
from concurrent import futures
data = ['1', '2']
def wait_on(argument):
print(argument)
time.sleep(2)
return "ok"
ex = futures.ThreadPoolExecutor(max_workers=2)
for i in ex.map(wait_on, data):
print(i)
Executor.submit
创建from concurrent.futures import ThreadPoolExecutor as Pool
from concurrent.futures import as_completed
import requests
URLS = ['http://qq.com', 'http://sina.com', 'http://www.baidu.com', ]
def task(url, timeout=10):
return requests.get(url, timeout=timeout)
with Pool(max_workers=3) as executor:
future_tasks = [executor.submit(task, url) for url in URLS]
for f in future_tasks:
if f.running():
print('%s is running' % str(f))
for f in as_completed(future_tasks):
try:
ret = f.done()
if ret:
f_ret = f.result()
print('%s, done, result: %s, %s' % (str(f), f_ret.url, len(f_ret.content)))
except Exception as e:
f.cancel()
print(str(e))