小门板儿

Menu

Python 异步编程—asyncio和await

一、为什么要异步编程?

要了解异步编程的动机,我们首先必须了解是什么限制了我们的代码运行速度。理想情况下,我们希望我们的代码以光速运行,立即跳过我们的代码,没有任何延迟。然而,由于两个因素,实际上代码运行速度要慢得多:

当我们的代码在等待 IO 时,CPU 基本上是空闲的,等待某个外部设备响应。通常,内核会检测到这一点并立即切换到执行系统中的其他线程。因此,如果我们想加快处理一组 IO 密集型任务,我们可以为每个任务创建一个线程。当其中一个线程停止,等待 IO 时,内核将切换到另一个线程继续处理。

这在实践中效果很好,但有两个缺点:

例如,如果我们想要执行 10,000 个任务,我们要么必须创建 10,000 个线程,这将占用大量 RAM,要么我们需要创建较少数量的工作线程并以较少的并发性执行任务。此外,最初生成这些线程会占用 CPU 时间。

由于内核可以随时选择在线程之间切换,因此我们代码中的任何时候都可能出现相互竞争。

因此在传统的基于同步线程的代码中,内核必须检测线程何时是IO绑定的,并选择在线程之间随意切换。使用 Python 异步,程序员使用关键字 await 确认声明 IO 绑定的代码行,并确认授予执行其他任务的权限,从而实现异步编程

二、协程

在了解异步编程之前,我们先来看看什么是协程?

协程(co-routine,又称微线程、纤程)是一种多方协同的工作方式。协程不是进程或线程,其执行过程类似于 Python 函数调用,Python 的 asyncio 模块实现的异步IO编程框架中,协程是对使用 async 关键字定义的异步函数的调用。当前执行者在某个时刻主动让出(yield)控制流,并记住自身当前的状态,以便在控制流返回时能从上次让出的位置恢复(resume)执行。

一个进程包含多个线程,类似于一个人体组织有多种细胞在工作,同样,一个程序可以包含多个协程。多个线程相对独立,线程的切换受系统控制。同样,多个协程也相对独立,但是其切换由程序自己控制。简而言之,协程的核心思想就在于执行者对控制流的 “主动让出” 和 “恢复”。相对于,线程此类的 “抢占式调度” 而言,协程是一种 “协作式调度” 方式,协程之间执行任务按照一定顺序交替执行。

例如:

def func1():
print(1)
  ...
print(2)

def func2():
print(3)
  ...
print(4)

func1()
func2()

上述代码是普通的函数定义和执行,按流程分别执行两个函数中的代码,并先后会输出:1、2、3、4。但如果介入协程技术那么就可以实现函数见代码切换执行,最终输入:1、3、2、4

1、协程的实现

在Python中有多种方式可以实现协程,例如:

1.1、greenlet

greentlet是一个第三方模块,需要提前安装 pip3 install greenlet才能使用。

from greenlet import greenlet


def func1():
   print(1)        # 第1步:输出 1
   gr2.switch()    # 第3步:切换到 func2 函数
   print(2)        # 第6步:输出 2
   gr2.switch()    # 第7步:切换到 func2 函数,从上一次执行的位置继续向后执行


def func2():
   print(3)        # 第4步:输出 3
   gr1.switch()    # 第5步:切换到 func1 函数,从上一次执行的位置继续向后执行
   print(4)        # 第8步:输出 4

gr1 = greenlet(func1)
gr2 = greenlet(func2)
gr1.switch() # 第1步:去执行 func1 函数

1.2、yield

基于Python的生成器的yield和yield form关键字实现协程代码。

def func1():
   yield 1
   yield from func2()
   yield 2


def func2():
   yield 3
   yield 4


f1=func1()
for item in f1:
   print(item)

1.3 asyncio

在Python3.4之前官方未提供协程的类库,一般大家都是使用greenlet等其他来实现。在Python3.4发布后官方正式支持协程,即:asyncio模块。

import asyncio

@asyncio.coroutine
def func1():
   print(1)
   yield from asyncio.sleep(2)# 遇到IO耗时操作,自动化切换到tasks中的其他任务
   print(2)


@asyncio.coroutine
def func2():
   print(3)
   yield from asyncio.sleep(2) # 遇到IO耗时操作,自动化切换到tasks中的其他任务
   print(4)


tasks=[
   asyncio.ensure_future( func1() ),
   asyncio.ensure_future( func2() )
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

注意:基于asyncio模块实现的协程比之前的要更厉害,因为他的内部还集成了遇到IO耗时操作自动切花的功能。

1.4 async & await

async & await 关键字在Python3.5版本中正式引入,基于他编写的协程代码其实就是 上一示例 的加强版,让代码可以更加简便。

Python3.8之后 @asyncio.coroutine 装饰器就会被移除,推荐使用async & awit 关键字实现协程代码。

import asyncio


async def func1():
   print(1)
   await asyncio.sleep(2)
   print(2)


async def func2():
   print(3)
   await asyncio.sleep(2)
   print(4)


tasks = [
   asyncio.ensure_future(func1()),
   asyncio.ensure_future(func2())
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

2、协程的意义

协程可以通过一个线程在多个上下文中进行来回切换执行。

但是,协程来回切换执行的意义何在呢?

计算型的操作,利用协程来回切换执行,没有任何意义,来回切换并保存状态 反倒会降低性能。 IO型的操作,利用协程在IO等待时间就去切换执行其他任务,当IO操作结束后再自动回调,那么就会大大节省资源并提供性能,从而实现异步编程(不等待任务结束就可以去执行其他代码)。

2.1、下载多个图片示例

方式一:同步编程实现

import requests
import time


def download_image(url):
   print("开始下载:", url)

   response = requests.get(url)
   print("下载完成")
   file_name = url.rsplit('_')[-1]
   print(file_name)
   with open(file_name, mode='wb') as file_object:
       file_object.write(response.content)


if __name__ == '__main__':
   url_list = [
       'https://www3.autoimg.cn/newsdfs/g26/M02/35/A9/120x90_0_autohomecar__ChsEe12AXQ6AOOH_AAFocMs8nzU621.jpg',
       'https://www2.autoimg.cn/newsdfs/g30/M01/3C/E2/120x90_0_autohomecar__ChcCSV2BBICAUntfAADjJFd6800429.jpg',
       'https://www3.autoimg.cn/newsdfs/g26/M0B/3C/65/120x90_0_autohomecar__ChcCP12BFCmAIO83AAGq7vK0sGY193.jpg'
      ]

   start_time=time.time()
   for item in url_list:
       download_image(item)
   end_time=time.time()
   print(end_time-start_time)
执行结果:
一个图片下载完成之后,再开启另外一个请求
开始下载: https://www3.autoimg.cn/newsdfs/g26/M02/35/A9/120x90_0_autohomecar__ChsEe12AXQ6AOOH_AAFocMs8nzU621.jpg
下载完成
AAFocMs8nzU621.jpg
开始下载: https://www2.autoimg.cn/newsdfs/g30/M01/3C/E2/120x90_0_autohomecar__ChcCSV2BBICAUntfAADjJFd6800429.jpg
下载完成
ChcCSV2BBICAUntfAADjJFd6800429.jpg
开始下载: https://www3.autoimg.cn/newsdfs/g26/M0B/3C/65/120x90_0_autohomecar__ChcCP12BFCmAIO83AAGq7vK0sGY193.jpg
下载完成
ChcCP12BFCmAIO83AAGq7vK0sGY193.jpg

方式二:基于协程的异步编程实现

import aiohttp
import asyncio
import time

async def fetch(session, url):
   print("发送请求:", url)
   async with session.get(url, verify_ssl=False) as response:
       content = await response.content.read()
       file_name = url.rsplit('_')[-1]
       with open(file_name, mode='wb') as file_object:
           file_object.write(content)
       print("下载完成:", url)


async def main():
   async with aiohttp.ClientSession() as session:
       url_list = [
           'https://www3.autoimg.cn/newsdfs/g26/M02/35/A9/120x90_0_autohomecar__ChsEe12AXQ6AOOH_AAFocMs8nzU621.jpg',
           'https://www2.autoimg.cn/newsdfs/g30/M01/3C/E2/120x90_0_autohomecar__ChcCSV2BBICAUntfAADjJFd6800429.jpg',
           'https://www3.autoimg.cn/newsdfs/g26/M0B/3C/65/120x90_0_autohomecar__ChcCP12BFCmAIO83AAGq7vK0sGY193.jpg'
      ]
       tasks = [asyncio.create_task(fetch(session, url)) for url in url_list]

       await asyncio.wait(tasks)


if __name__ == '__main__':
   start_time = time.time()
   asyncio.run(main())
   end_time = time.time()
   print(end_time-start_time)
执行结果:一个请求发送后,还没返回下,执行下一个请求
发送请求: https://www3.autoimg.cn/newsdfs/g26/M02/35/A9/120x90_0_autohomecar__ChsEe12AXQ6AOOH_AAFocMs8nzU621.jpg
发送请求: https://www2.autoimg.cn/newsdfs/g30/M01/3C/E2/120x90_0_autohomecar__ChcCSV2BBICAUntfAADjJFd6800429.jpg
发送请求: https://www3.autoimg.cn/newsdfs/g26/M0B/3C/65/120x90_0_autohomecar__ChcCP12BFCmAIO83AAGq7vK0sGY193.jpg
下载完成: https://www3.autoimg.cn/newsdfs/g26/M0B/3C/65/120x90_0_autohomecar__ChcCP12BFCmAIO83AAGq7vK0sGY193.jpg
下载完成: https://www3.autoimg.cn/newsdfs/g26/M02/35/A9/120x90_0_autohomecar__ChsEe12AXQ6AOOH_AAFocMs8nzU621.jpg
下载完成: https://www2.autoimg.cn/newsdfs/g30/M01/3C/E2/120x90_0_autohomecar__ChcCSV2BBICAUntfAADjJFd6800429.jpg
0.12200689315795898

上述两种的执行对比之后会发现,基于协程的异步编程 要比 同步编程的效率高了很多。因为:

3、异步编程

3.1事件循环

事件循环,可以把他当做是一个while循环,这个while循环在周期性的运行并执行一些任务,在特定条件下终止循环。

# 伪代码

任务列表 = [ 任务1, 任务2, 任务3,... ]

while True:
  可执行的任务列表,已完成的任务列表 = 去任务列表中检查所有的任务,将'可执行'和'已完成'的任务返回
for 就绪任务 in 已准备就绪的任务列表:
  执行已就绪的任务
   
for 已完成的任务 in 已完成的任务列表:
  在任务列表中移除 已完成的任务

如果 任务列表 中的任务都已完成,则终止循环

在编写程序时候可以通过如下代码来获取和创建事件循环。

import asyncio

loop = asyncio.get_event_loop()

3.2 协程和异步编程

协程函数,定义形式为 async def 的函数。

协程对象,调用 协程函数 所返回的对象。

# 定义一个协程函数
async def func():
   pass

# 调用协程函数,返回一个协程对象
result = func()

注意:调用协程函数时,函数内部代码不会执行,只是会返回一个协程对象。

3.2.1 基本应用

程序中,如果想要执行协程函数的内部代码,需要 事件循环协程对象 配合才能实现,如:

import asyncio


async def func():
   print("协程内部代码")

# 调用协程函数,返回一个协程对象。
result = func()

# 方式一
# loop = asyncio.get_event_loop() # 创建一个事件循环
# loop.run_until_complete(result) # 将协程当做任务提交到事件循环的任务列表中,协程执行完成之后终止。

# 方式二
# 本质上方式一是一样的,内部先 创建事件循环 然后执行 run_until_complete,一个简便的写法。
# asyncio.run 函数在 Python 3.7 中加入 asyncio 模块,
asyncio.run(result)

这个过程可以简单理解为:将协程当做任务添加到 事件循环 的任务列表,然后事件循环检测列表中的协程是否 已准备就绪(默认可理解为就绪状态),如果准备就绪则执行其内部代码。

3.2.2 await

await是一个只能在协程函数中使用的关键字,用于遇到IO操作时挂起 当前协程(任务),当前协程(任务)挂起过程中 事件循环可以去执行其他的协程(任务),当前协程IO处理完成时,可以再次切换回来执行await之后的代码。代码如下:

示例1:

import asyncio


async def others():
  print("start")
  await asyncio.sleep(2)
  print('end')
  return '返回值'


async def func():
  print("执行协程函数内部代码")

  # 遇到IO操作挂起当前协程(任务),等IO操作完成之后再继续往下执行。当前协程挂起时,事件循环可以去执行其他协程(任务)。
  response1 = await others()
  print("IO请求结束,结果为:", response1)
   
  response2 = await others()
  print("IO请求结束,结果为:", response2)

 
   
asyncio.run( func() )

执行结果:

执行协程函数内部代码
start
end
IO请求结束,结果为: 返回值
start
end
IO请求结束,结果为: 返回值

上述示例只是创建了一个任务,即:事件循环的任务列表中只有一个任务,所以在IO等待时无法演示切换到其他任务效果。

在程序想要创建多个任务对象,需要使用Task对象来实现。

3.2.3 Task对象

Tasks用于并发调度协程,通过asyncio.create_task(协程对象)的方式创建Task对象,这样可以让协程加入事件循环中等待被调度执行。除了使用 asyncio.create_task() 函数以外,还可以用低层级的 loop.create_task()ensure_future() 函数。不建议手动实例化 Task 对象。

本质上是将协程对象封装成task对象,并将协程立即加入事件循环,同时追踪协程的状态。

注意:asyncio.create_task() 函数在 Python 3.7 中被加入。在 Python 3.7 之前,可以改用低层级的 asyncio.ensure_future() 函数。

示例1:

import asyncio

async  def func():
   print(1)
   await asyncio.sleep(2)
   print(2)
   return '返回值'

async def main():
   print('main函数开始')
   # 创建协程,将协程封装到Task对象中并添加到事件循环的任务列表中,等待事件循环去执行(默认是就绪状态)。

   task_list = [
       asyncio.create_task(func()),
       asyncio.create_task(func())
  ]

   print('main结束')
   done, pending = await asyncio.wait(task_list, timeout=None)
   print('done')
   print(done)
   print('done')
   print(pending)

asyncio.run(main())

执行结果:

main函数开始
main结束
1
1
2
2
done
{<Task finished coro=<func() done, defined at D:/auto/asyncio/11.py:3> result='返回值'>, <Task finished coro=<func() done, defined at D:/auto/asyncio/11.py:3> result='返回值'>}
done
set()

注意:asyncio.wait 源码内部会对列表中的每个协程执行ensure_future从而封装为Task对象,所以在和wait配合使用时task_list的值为[func(),func()] 也是可以的。

示例2:

错误的使用方法:

import asyncio


async def func():
   print("执行协程函数内部代码")

   # 遇到IO操作挂起当前协程(任务),等IO操作完成之后再继续往下执行。当前协程挂起时,事件循环可以去执行其他协程(任务)。
   response = await asyncio.sleep(2)

   print("IO请求结束,结果为:", response)


coroutine_list = [ asyncio.create_task(func()), asyncio.create_task(func()) ]
print(coroutine_list)
# 错误:coroutine_list = [ asyncio.create_task(func()), asyncio.create_task(func()) ]
# 此处不能直接 asyncio.create_task,因为将Task立即加入到事件循环的任务列表,
# 但此时事件循环还未创建,所以会报错。


# 使用asyncio.wait将列表封装为一个协程,并调用asyncio.run实现执行两个协程
# asyncio.wait内部会对列表中的每个协程执行ensure_future,封装为Task对象。
done,pending = asyncio.run( asyncio.wait(coroutine_list) )

执行结果

Traceback (most recent call last):
File "D:/auto/asyncio/12.py", line 14, in <module>
  coroutine_list = [ asyncio.create_task(func()), asyncio.create_task(func()) ]
File "D:\Programs\Python\Python37\lib\asyncio\tasks.py", line 324, in create_task
  loop = events.get_running_loop()
RuntimeError: no running event loop
sys:1: RuntimeWarning: coroutine 'func' was never awaited

正确的使用方法:

coroutine_list = [func(), func()]

3.2.3 asyncio.Future对象

Task继承Future,Task对象内部await结果的处理基于Future对象来的。

示例1:

async def main():
   # 获取当前事件循环
   loop = asyncio.get_running_loop()

   # 创建一个任务(Future对象),这个任务什么都不干。
   fut = loop.create_future()

   # 等待任务最终结果(Future对象),没有结果则会一直等下去。
   await fut

asyncio.run( main() )

示例2:

import  asyncio

async def set_after(fut):
   await asyncio.sleep(2)
   fut.set_result("666")

async def main():
   # 获取当前事件循环
   loop=asyncio.get_running_loop()

   # 创建一个任务(Future对象),没绑定任何行为,则这个任务永远不知道什么时候结束。
   fut = loop.create_future()

   # 创建一个任务(Task对象),绑定了set_after函数,函数内部在2s之后,会给fut赋值。
   # 即手动设置future任务的最终结果,那么fut就可以结束了。
   await loop.create_task(set_after(fut))

   # 等待 Future对象获取 最终结果,否则一直等下去
   data = await fut
   print(data)

asyncio.run( main() )

Future对象本身函数进行绑定,所以想要让事件循环获取Future的结果,则需要手动设置。而Task对象继承了Future对象,其实就对Future进行扩展,他可以实现在对应绑定的函数执行完成之后,自动执行set_result,从而实现自动结束。

虽然,平时使用的是Task对象,但对于结果的处理本质是基于Future对象来实现的。

扩展:支持 await 对象语 法的对象可成为可等待对象,所以 协程对象Task对象Future对象 都可以被成为可等待对象。

3.2.4 futures.Future对象

在Python的concurrent.futures模块中也有一个Future对象,这个对象是基于线程池和进程池实现异步操作时使用的对象。

import time
from concurrent.futures.thread import ThreadPoolExecutor

def func(value):
   time.sleep(2)
   print('~~~~~~~~~~')
   print(value)
   print('!!!!!!!!!!')
   return value

pool=ThreadPoolExecutor(max_workers=4)

for i in range(10):
   fut=pool.submit(func,i)
   print(fut)

两个Future对象是不同的,他们是为不同的应用场景而设计,例如:concurrent.futures.Future不支持await语法 等。

3.3、总结

在程序中只要看到asyncawait关键字,其内部就是基于协程实现的异步编程,这种异步编程是通过一个线程在IO等待时间去执行其他任务,从而实现并发

官方文档:https://docs.python.org/zh-cn/3.8/library/asyncio.html

原文链接:https://blog.csdn.net/c_lanxiaofang/article/details/126394229

原文链接:https://blog.csdn.net/m0_72557783/article/details/126847728

— 于 共写了11049个字
— 标签:

评论已关闭。