python asyncio协程
2021-06-10 09:05
标签:dna inter 其他 origin bec traceback zed something throw future是对task的封装, Task用来在事件循环中执行协程对象。 On any platform, when we want to do something asynchronously, it usually involves an event loop. An event loop is a loop that can register tasks to be executed, execute them, delay or even cancel them and handle different events related to these operations. Generally, we schedule multiple async functions to the event loop. The loop runs one function, while that function waits for IO, it pauses it and runs another. When the first function completes IO, it is resumed. Thus two or more functions can co-operatively run together. This the main goal of an event loop. 在一个平台,我们想要做一些异步的事情,总会牵扯到事件循环。一个事件循环可以注册一些任务去执行,延迟,取消,以及其他和这些操作有关的事情。通常来说,我们安排不同的异步函数到事件循环中去。事件循环会运行函数,当这个函数遇到IO操作时,事件循环会暂停这个函数转而运行另一个函数。当第一个函数完成了IO操作,这个函数会继续执行,这样,两个或者多个函数就可以一起执行,这就是事件循环的主要目的。 The event loop can also pass resource intensive functions to a thread pool for processing. The internals of the event loop is quite complex and we don’t need to worry much about it right away. We just need to remember that the event loop is the mechanism through which we can schedule our async functions and get them executed. 事件循环也可以将资源密集型任务放到线程池中处理。事件循环的内部结构十分复杂,我们不必过多关注这个问题。我们只需要记住事件循环就是一个可以安排多个异步任务,并让它们执行起来的一个机制就好了。 If you are into Javascript too, you probably know about Promise. In Python we have similar concepts – Future/Task. A Future is an object that is supposed to have a result in the future. A Task is a subclass of Future that wraps a coroutine. When the coroutine finishes, the result of the Task is realized. 一个Future就是一个在未来能产出一个result的对象。一个Task是Future的子类,它封装了coroutine(协程),当coroutine完成后,这个Task的结果也就实现了。 We discussed Coroutines in our last blog post. It’s a way of pausing a function and returning a series of values periodically. A coroutine can pause the execution of the function by using the 协程,就是暂停一个函数并且定期返回一系列值的一种方法。一个协程可以通过使用关键字 It’s simple. We need an event loop and we need to register our future/task objects with the event loop. The loop will schedule and run them. We can add callbacks to our future/task objects so that we can be notified when a future has it’s results. Very often we choose to use coroutines for our work. We wrap a coroutine in Future and get a Task object. When a coroutine 为了将事件循环和Future/Task整合一起,我们需要用事件循环注册我们的Future/Task,这个循环会计划和运行这些任务。当这些任务有了结果时,我们可以加入回调来获取通知。 A lot of existing libraries are not ready to be used with The A asyncio_executor_thread.py A asyncio_executor_process.py The only change needed to move from threads to processes is to create a different type of executor. This example also changes the logging format string to include the process id instead of the thread name, to demonstrate that the tasks are in fact running in separate processes. Before answering this question we need to understand a few base terms, skip these if you already know any of them. Generators are objects that allow us to suspend the execution of a python function. User curated generators are implement using the keyword As you can see, calling By the third time A less-known feature of generators, is the fact that you can communicate with them using two methods: Upon calling Returning a value from a generator, results in the value being put inside the Python 3.4 came with the addition of a new keyword: I‘ve written an article to further elaborate on this topic. Upon introducing the new keyword Coroutines are functions that can be stopped and resumed while being run. In Python, they are defined using the Like every iterator or generator that implement the There‘s a nice sequence diagram inside the Python docs that you should check out. In asyncio, apart from coroutine functions, we have 2 important objects: tasks and futures. Futures are objects that have the The result, just like you have guessed, can either be a Python object, that will be returned, or an exception which may be raised. Another important feature of Task objects are special futures, which wrap around coroutines, and communicate with the inner-most and outer-most coroutines. Every time a coroutine Next, the task binds itself to the future. It does so by calling The final burning question we must answer is - how is the IO implemented? Deep inside asyncio, we have an event loop. An event loop of tasks. The event loop‘s job is to call tasks every time they are ready and coordinate all that effort into one single working machine. The IO part of the event loop is built upon a single crucial function called When you try to receive or send data over a socket through asyncio, what actually happens below is that the socket is first checked if it has any data that can be immediately read or sent. If its When all available tasks are waiting for futures, the event loop calls Now all the magic happens. The future is set to done, the task that added itself before with Method chain again, in case of In summary, asyncio uses generator capabilities, that allow pausing and resuming functions. It uses And the best of all? While one function is paused, another may run and interleave with the delicate fabric, which is asyncio. python asyncio协程 标签:dna inter 其他 origin bec traceback zed something throw 原文地址:https://www.cnblogs.com/wztshine/p/14460847.html1. 基础的协程
async def
开始声明一个函数async def test(): # 声明一个协程函数 test
asyncio.ensure_future(obj) # 将协程对象转变成 future
asyncio.gather(coroutines/futures) # 将协程对象或者future,打包成一个 future
asyncio.wait([task1,task2]) # 等待futures或coroutines完成,返回一个 coroutine
import asyncio
async def test(): # async开头,来定义协程函数
print(‘1‘)
await asyncio.sleep(2) # 模拟IO操作,await可等待的有:协程对象,Future,Task
print(‘3‘)
t = test() # 不会执行test(),t只是一个协程对象
loop = asyncio.get_event_loop() # 创建事件循环
loop.run_until_complete(t) # 将协程函数对象,放入到事件循环中执行
# loop.close() # 关闭循环,不关闭也没问题,只是关闭了以后,就不能再次执行 loop.run_until_complete(t) 了
# asyncio.run(t) # py3.7以后的功能,可以算作等同于上面三句话
2. 多个协程函数执行
import asyncio
async def test(): # async开头,来定义协程函数
print(‘1‘)
await asyncio.sleep(2) # 模拟IO操作,await可等待的有:协程对象,Future,Task
print(‘3‘)
async def test2():
print(‘2‘)
t = test() # 不会执行test(),t只是一个协程对象
t2 = test2()
loop = asyncio.get_event_loop() # 创建事件循环
all = asyncio.gather(t,t2) # gather可以将多个协程对象或多个future,打包成一个future
loop.run_until_complete(all) # 执行事件循环,run_until_complete 其实接受的参数是 future
3. Future
def done_callback(futu):
print(‘Done‘)
futu = asyncio.ensure_future(test())
futu.add_done_callback(done_callback)
loop.run_until_complete(futu)
4. Task
import asyncio
async def test(): # async开头,来定义协程函数
print(‘1‘)
await asyncio.sleep(2) # 模拟IO操作,await可等待的有:协程对象,Future,Task
print(‘3‘)
return ‘Finished.‘
async def test2():
# 创建task,创建的同时,test()就已经被添加到事件循环中了,此时loop有 [test2(),test(),test()]
task1 = asyncio.create_task(test())
task2 = asyncio.create_task(test())
ret1 = await task1
ret2 = await task2
print(ret1,ret2)
asyncio.run(test2()) # 首先创建事件循环,此时loop里面有个 test2()
5. 连接 协程 和 线程,进程
import asyncio
import requests
import functools
async def crawler(url):
print(‘Start crawling:‘, url)
headers = {
‘User-Agent‘: ‘Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36‘
}
# 利用BaseEventLoop.run_in_executor()可以在coroutine中执行第三方的命令,例如requests.get()
# 第三方命令的参数与关键字利用functools.partial传入
future = asyncio.get_event_loop().run_in_executor(None, functools.partial(requests.get, url, headers=headers))
response = await future
print(‘Response received:‘, url)
# 处理获取到的URL响应
with open(url[-6:], ‘wb‘) as f:
f.write(response.content)
url_list = [‘https://w.wallhaven.cc/full/57/wallhaven-57lzd8.jpg‘,
‘https://w.wallhaven.cc/full/zm/wallhaven-zm5lyg.jpg‘,
‘https://w.wallhaven.cc/full/28/wallhaven-28ey2g.jpg‘,
‘https://w.wallhaven.cc/full/p8/wallhaven-p8gvvp.jpg‘]
tasks = [crawler(url) for url in url_list]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
附录:
1. Future,Task and Event loop
Event Loop
Futures / Tasks
Coroutines
yield
yield from
or await
(python 3.5+) keywords in an expression. The function is paused until the yield
statement actually gets a value.yield
,yield from
,或者await
来暂停函数的执行。在yield
获得值之前程序会一直暂停。Fitting Event Loop and Future/Task Together
yield
s, it is paused. When it has a value, it is resumed. When it return
s, the Task has completed and gets a value. Any associated callback is run. If the coroutine raises an exception, the Task fails and not resolved.2. Combining Coroutines with Threads and Processes
asyncio
natively. They may block, or depend on concurrency features not available through the module. It is still possible to use those libraries in an application based on asyncio
by using an executor from concurrent.futures
to run the code either in a separate thread or a separate process.Threads
run_in_executor()
method of the event loop takes an executor instance, a regular callable to invoke, and any arguments to be passed to the callable. It returns a Future
that can be used to wait for the function to finish its work and return something. If no executor is passed in, a ThreadPoolExecutor
is created. This example explicitly creates an executor to limit the number of worker threads it will have available.ThreadPoolExecutor
starts its worker threads and then calls each of the provided functions once in a thread. This example shows how to combine run_in_executor()
and wait()
to have a coroutine yield control to the event loop while blocking functions run in separate threads, and then wake back up when those functions are finished.import asyncio
import concurrent.futures
import logging
import sys
import time
def blocks(n):
log = logging.getLogger(‘blocks({})‘.format(n))
log.info(‘running‘)
time.sleep(0.1)
log.info(‘done‘)
return n ** 2
async def run_blocking_tasks(executor):
log = logging.getLogger(‘run_blocking_tasks‘)
log.info(‘starting‘)
log.info(‘creating executor tasks‘)
loop = asyncio.get_event_loop()
blocking_tasks = [
loop.run_in_executor(executor, blocks, i)
for i in range(6)
]
log.info(‘waiting for executor tasks‘)
completed, pending = await asyncio.wait(blocking_tasks)
results = [t.result() for t in completed]
log.info(‘results: {!r}‘.format(results))
log.info(‘exiting‘)
if __name__ == ‘__main__‘:
# Configure logging to show the name of the thread
# where the log message originates.
logging.basicConfig(
level=logging.INFO,
format=‘%(threadName)10s %(name)18s: %(message)s‘,
stream=sys.stderr,
)
# Create a limited thread pool.
executor = concurrent.futures.ThreadPoolExecutor(
max_workers=3,
)
event_loop = asyncio.get_event_loop()
try:
event_loop.run_until_complete(
run_blocking_tasks(executor)
)
finally:
event_loop.close()
asyncio_executor_thread.py
uses logging
to conveniently indicate which thread and function are producing each log message. Because a separate logger is used in each call to blocks()
, the output clearly shows the same threads being reused to call multiple copies of the function with different arguments.$ python3 asyncio_executor_thread.py
MainThread run_blocking_tasks: starting
MainThread run_blocking_tasks: creating executor tasks
ThreadPoolExecutor-0_0 blocks(0): running
ThreadPoolExecutor-0_1 blocks(1): running
ThreadPoolExecutor-0_2 blocks(2): running
MainThread run_blocking_tasks: waiting for executor tasks
ThreadPoolExecutor-0_0 blocks(0): done
ThreadPoolExecutor-0_1 blocks(1): done
ThreadPoolExecutor-0_2 blocks(2): done
ThreadPoolExecutor-0_0 blocks(3): running
ThreadPoolExecutor-0_1 blocks(4): running
ThreadPoolExecutor-0_2 blocks(5): running
ThreadPoolExecutor-0_0 blocks(3): done
ThreadPoolExecutor-0_2 blocks(5): done
ThreadPoolExecutor-0_1 blocks(4): done
MainThread run_blocking_tasks: results: [0, 9, 16, 25, 1, 4]
MainThread run_blocking_tasks: exiting
Processes
ProcessPoolExecutor
works in much the same way, creating a set of worker processes instead of threads. Using separate processes requires more system resources, but for computationally-intensive operations it can make sense to run a separate task on each CPU core.# changes from asyncio_executor_thread.py
if __name__ == ‘__main__‘:
# Configure logging to show the id of the process
# where the log message originates.
logging.basicConfig(
level=logging.INFO,
format=‘PID %(process)5s %(name)18s: %(message)s‘,
stream=sys.stderr,
)
# Create a limited process pool.
executor = concurrent.futures.ProcessPoolExecutor(
max_workers=3,
)
event_loop = asyncio.get_event_loop()
try:
event_loop.run_until_complete(
run_blocking_tasks(executor)
)
finally:
event_loop.close()
$ python3 asyncio_executor_process.py
PID 40498 run_blocking_tasks: starting
PID 40498 run_blocking_tasks: creating executor tasks
PID 40498 run_blocking_tasks: waiting for executor tasks
PID 40499 blocks(0): running
PID 40500 blocks(1): running
PID 40501 blocks(2): running
PID 40499 blocks(0): done
PID 40500 blocks(1): done
PID 40501 blocks(2): done
PID 40500 blocks(3): running
PID 40499 blocks(4): running
PID 40501 blocks(5): running
PID 40499 blocks(4): done
PID 40500 blocks(3): done
PID 40501 blocks(5): done
PID 40498 run_blocking_tasks: results: [1, 4, 9, 0, 16, 25]
PID 40498 run_blocking_tasks: exiting
3. How does asyncio work?, Bharel的回答
Generators
yield
. By creating a normal function containing the yield
keyword, we turn that function into a generator:>>> def test():
... yield 1
... yield 2
...
>>> gen = test()
>>> next(gen)
1
>>> next(gen)
2
>>> next(gen)
Traceback (most recent call last):
File "
next()
on the generator causes the interpreter to load test‘s frame, and return the yield
ed value. Calling next()
again, cause the frame to load again into the interpreter stack, and continue on yield
ing another value.next()
is called, our generator was finished, and StopIteration
was thrown.Communicating with a generator
send()
and throw()
.>>> def test():
... val = yield 1
... print(val)
... yield 2
... yield 3
...
>>> gen = test()
>>> next(gen)
1
>>> gen.send("abc")
abc
2
>>> gen.throw(Exception())
Traceback (most recent call last):
File "
gen.send()
, the value is passed as a return value from the yield
keyword.gen.throw()
on the other hand, allows throwing Exceptions inside generators, with the exception raised at the same spot yield
was called.Returning values from generators
StopIteration
exception. We can later on recover the value from the exception and use it to our need.>>> def test():
... yield 1
... return "abc"
...
>>> gen = test()
>>> next(gen)
1
>>> try:
... next(gen)
... except StopIteration as exc:
... print(exc.value)
...
abc
Behold, a new keyword:
yield from
yield from
. What that keyword allows us to do, is pass on any next()
, send()
and throw()
into an inner-most nested generator. If the inner generator returns a value, it is also the return value of yield from
:>>> def inner():
... inner_result = yield 2
... print(‘inner‘, inner_result)
... return 3
...
>>> def outer():
... yield 1
... val = yield from inner()
... print(‘outer‘, val)
... yield 4
...
>>> gen = outer()
>>> next(gen)
1
>>> next(gen) # Goes inside inner() automatically
2
>>> gen.send("abc")
inner abc
outer 3
4
Putting it all together
yield from
in Python 3.4, we were now able to create generators inside generators that just like a tunnel, pass the data back and forth from the inner-most to the outer-most generators. This has spawned a new meaning for generators - coroutines.async def
keyword. Much like generators, they too use their own form of yield from
which is await
. Before async
and await
were introduced in Python 3.5, we created coroutines in the exact same way generators were created (with yield from
instead of await
).async def inner():
return 1
async def outer():
await inner()
__iter__()
method, coroutines implement __await__()
which allows them to continue on every time await coro
is called.Futures
__await__()
method implemented, and their job is to hold a certain state and result. The state can be one of the following:
fut.cancel()
fut.set_result()
or by an exception set using fut.set_exception()
future
objects, is that they contain a method called add_done_callback()
. This method allows functions to be called as soon as the task is done - whether it raised an exception or finished.Tasks
await
s a future, the future is passed all the way back to the task (just like in yield from
), and the task receives it.add_done_callback()
on the future. From now on, if the future will ever be done, by either being cancelled, passed an exception or passed a Python object as a result, the task‘s callback will be called, and it will rise back up to existence.Asyncio
select
. Select is a blocking function, implemented by the operating system underneath, that allows waiting on sockets for incoming or outgoing data. Upon receiving data it wakes up, and returns the sockets which received data, or the sockets which are ready for writing..send()
buffer is full, or the .recv()
buffer is empty, the socket is registered to the select
function (by simply adding it to one of the lists, rlist
for recv
and wlist
for send
) and the appropriate function await
s a newly created future
object, tied to that socket.select
and waits. When the one of the sockets has incoming data, or its send
buffer drained up, asyncio checks for the future object tied to that socket, and sets it to done.add_done_callback()
rises up back to life, and calls .send()
on the coroutine which resumes the inner-most coroutine (because of the await
chain) and you read the newly received data from a nearby buffer it was spilled unto.recv()
:
select.select
waits.future.set_result()
is called.add_done_callback()
is now woken up..send()
on the coroutine which goes all the way into the inner-most coroutine and wakes it up.yield from
capabilities that allow passing data back and forth from the inner-most generator to the outer-most. It uses all of those in order to halt function execution while it‘s waiting for IO to complete (by using the OS select
function).