V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
waibunleung
V2EX  ›  Python

不死心,再来问一遍关于 Python 的 asyncio 问题

  •  
  •   waibunleung · 2019-07-22 17:42:42 +08:00 · 6453 次点击
    这是一个创建于 1946 天前的主题,其中的信息可能已经有所发展或是发生改变。

    有两段代码,是关于 asyncio 的。
    代码段一:

    import asyncio
    
    async def worker_1():
        print('worker_1 start')
        await asyncio.sleep(1)
        print('worker_1 done')
    
    async def worker_2():
        print('worker_2 start')
        await asyncio.sleep(2)
        print('worker_2 done')
    
    async def main():
        print('before await')
        await worker_1()
        print('awaited worker_1')
        await worker_2()
        print('awaited worker_2')
    
    %time asyncio.run(main())
    
    ########## 输出 ##########
    
    before await
    worker_1 start
    worker_1 done
    awaited worker_1
    worker_2 start
    worker_2 done
    awaited worker_2
    Wall time: 3 s
    

    代码段二:

    import asyncio
    
    async def worker_1():
        print('worker_1 start')
        await asyncio.sleep(1)
        print('worker_1 done')
    
    async def worker_2():
        print('worker_2 start')
        await asyncio.sleep(2)
        print('worker_2 done')
    
    async def main():
        task1 = asyncio.create_task(worker_1())
        task2 = asyncio.create_task(worker_2())
        print('before await')
        await task1
        print('awaited worker_1')
        await task2
        print('awaited worker_2')
    
    %time asyncio.run(main())
    
    ########## 输出 ##########
    
    before await
    worker_1 start
    worker_2 start
    worker_1 done
    awaited worker_1
    worker_2 done
    awaited worker_2
    Wall time: 2.01 s
    

    问题:代码段一里面的协程(coroutine)换成代码段二的任务(task)后,为什么执行顺序就变了?这个过程中发生了什么事情?

    说说我的猜想:
    发现调用 asyncio.run(main()) 或者 [asyncio.gather()->asyncio.get_event_loop()->loop.run_until_complete()]都会将一个 coroutine 转化成 task/future 再放 event loop 里面去, 交由 event loop 去管理这些 task/future。代码段一只将 main()这个 coroutine 封装成了 task 加入到 event loop 中,所以整个 event loop 中只有一个 task 在走,在这个 task 中代码是顺序执行的,所以最后呈现出同步执行的结果;
    但是代码段二调用了两次 asyncio.create_task(),这个方法会将一个 coroutine 转换成一个 task 并且放到 event loop 中,所以整个 event loop 其实有三个 task ( main,task1,task2 ),之后程序就交给 event loop 来调度,执行顺序就变不同了。 这个假设目前来看好像能解释得通

    最后,希望各位能指点一下~

    第 1 条附言  ·  2019-07-24 01:26:09 +08:00
    将两次提问的内容总结了一下,写成了一篇文章,大家可以看一下。有什么不对的地方欢迎指正~~
    https://blog.csdn.net/qq_29785317/article/details/97054589
    90 条回复    2019-07-24 01:29:06 +08:00
    waibunleung
        1
    waibunleung  
    OP
       2019-07-22 17:46:56 +08:00
    顺便贴一下之前的问题 https://www.v2ex.com/t/583601#reply46
    cwjokaka
        2
    cwjokaka  
       2019-07-22 17:55:30 +08:00
    好奇,先收藏
    BBCCBB
        3
    BBCCBB  
       2019-07-22 18:01:34 +08:00
    你代码 1 是顺序执行的,
    代码 2 加入 create_task 实际上已经把 worker1 和 worker2 这两个函数创建成 task 加入到了 eventLoop 里,你 worker1 函数里有 await sleep(1) , 让出 cpu 后 eventLoop 会继续调度下一个,这里基本就是 worker2. 所以第二个看起来像并行的, 这里的并行是指你 worker1 里主动让出了 cpu, 所以 worker2 在你 worker1 sleep 的过程中可以运行了。

    大概应该就是这个道理
    BBCCBB
        4
    BBCCBB  
       2019-07-22 18:02:20 +08:00
    create_task 创建后就加入到了调度器,就会被执行, 不是说等你 await 他的时候他才执行,await 只是等待他执行完成。
    zdnyp
        5
    zdnyp  
       2019-07-22 18:05:31 +08:00
    zdnyp
        6
    zdnyp  
       2019-07-22 18:06:33 +08:00
    @BBCCBB 这是单线程吧...应该没有让出 CPU 的操作吧...
    ipwx
        7
    ipwx  
       2019-07-22 18:09:47 +08:00 via Android   ❤️ 1
    @zdnyp 你以为协程是为什么被发明出来的?就是为了单线程里面,应用程序层面上,可以最大效率地执行多个可能阻塞(比如 io 和 sleep )的任务,避免 cpu 空置呀。所以让出 cpu 这个说法很准确
    waibunleung
        8
    waibunleung  
    OP
       2019-07-22 18:14:34 +08:00
    @BBCCBB 你说的话不就是我的猜想么?另外你也是跟我一样的猜想,实际上我稍微去找过源码,我只看到了函数 create_task 里面的注释说将协程加入 event loop 调度并返回一个 task 对象,但是我并没有找到将协程加入 event loop 的代码在哪里。
    另外你可以解释一下为什么代码 1 是顺序执行的吗?跟代码 2 对比起来不同的原因是什么?
    BBCCBB
        9
    BBCCBB  
       2019-07-22 18:16:33 +08:00
    我都没看你的猜想 =_=
    BBCCBB
        10
    BBCCBB  
       2019-07-22 18:18:45 +08:00
    你代码 1 里明显是一个一个创建的 task。你又没有显示的加入到 eventloop, 那就只能是在 await 的时候才加进去执行


    @zdnyp 我是说这个 task 让出 cpu,让其他的 task 执行。。
    so1n
        11
    so1n  
       2019-07-22 18:20:23 +08:00 via Android
    @zdnyp 协程就是非阻塞,处理 io 时会通知协程,协程休眠,不占用 cpu 等到 io 结束协程在占用 cpu,这一步可以说让出 cpu
    waibunleung
        12
    waibunleung  
    OP
       2019-07-22 18:21:24 +08:00
    @ipwx 所以你可以解释一下吗~为什么将协程换成 task 之后执行顺序就不一样了?
    guokeke
        13
    guokeke  
       2019-07-22 18:24:47 +08:00
    代码 2 执行顺序不一样是因为 create_task 的两个 task 运行不相互阻塞。
    https://github.com/python/asyncio/blob/master/asyncio/base_events.py
    create_task:227
    call_soon: 562
    waibunleung
        14
    waibunleung  
    OP
       2019-07-22 18:24:47 +08:00
    @BBCCBB 首先代码一里面只有 main()这个主异步函数会被转换成 task,其他的还是 coroutine,另外 create_task 调用之后,转换出来的 task 也没有明显加入 event loop,不过你能找到加进去 event loop 的代码的话,更能支持你的说法...
    so1n
        15
    so1n  
       2019-07-22 18:26:28 +08:00 via Android
    create_task 相当于之前的 ensure_future,实际上就是把任务放入 event_loop,并安排他执行。asynico 3.4 之前是 python 编写的,3.7 应该都是 c 了
    waibunleung
        17
    waibunleung  
    OP
       2019-07-22 18:28:21 +08:00
    @guokeke 我翻源码的时候也看到了这里,但是 create_task 这个函数看上去只是返回了创建的 task 对象,这个函数的注释是"""Schedule a coroutine object.
    Return a task object.
    """
    请问 Schedule a coroutine object.怎么体现出来?在哪里 schedule 的?
    BBCCBB
        18
    BBCCBB  
       2019-07-22 18:30:07 +08:00
    create_task 返回的是已经被加入到调度器的 task, 你不用 await, 也可以直接用 task.done(), task.result()等方法来获取该 task 执行的结果。

    而且你对代码 1 里只有 main()这个函数会被转换成 task 这个理解也不正确,async 函数 也会被创建成 task 加入到 eventloop 中。 eventloop 是不会直接执行 async 函数的
    so1n
        19
    so1n  
       2019-07-22 18:31:13 +08:00
    同时 3.7 支持 asyncio.all_tasks 来返回事件循环所运行的未完成的 Task 对象的集合 和 asyncio.current_task 返回当前运行的 Task 实例你可以自己调试下
    guokeke
        20
    guokeke  
       2019-07-22 18:31:54 +08:00
    @waibunleung 你要再深入 tasks.py 那个文件去看,然后又会回到这个 baseEvent 下的 call_soon 函数,中点不是返回了什么,而是 Task 在初始化的时候做了什么。
    waibunleung
        21
    waibunleung  
    OP
       2019-07-22 18:32:05 +08:00
    @so1n 我还是同样的问题,我能看到注释说 Schedule the execution of a coroutine object in a spawn task.
    Return a Task object.
    问题是 Schedule the execution of a coroutine object in a spawn task.这个 Schedule 在哪里体现的?
    dbow
        22
    dbow  
       2019-07-22 18:32:13 +08:00
    await 协程时, 协程才会开始执行, 执行返成结果返回给 await
    create_task 时会先直接 await 协程, 相当于 task = await 协程; await task, 由于你两个 create_task 是连在一起的, 它们就没有顺序关系, 直接开始交错执行了。

    看官方文档, https://docs.python.org/zh-cn/3/library/asyncio-task.html
    asyncio.create_task(coro)
    将 coro 协程 打包为一个 Task 排入日程准备执行。返回 Task 对象。

    该任务会在 get_running_loop() 返回的循环中执行,如果当前线程没有在运行的循环则会引发 RuntimeError。
    waibunleung
        23
    waibunleung  
    OP
       2019-07-22 18:33:00 +08:00
    @guokeke 所以再 task 初始化的时候,那个__init__函数你看了吗?就几行代码...
    guokeke
        24
    guokeke  
       2019-07-22 18:35:16 +08:00
    @waibunleung
    ```
    self._loop.call_soon(self._step)
    self.__class__._all_tasks.add(self)
    ```
    waibunleung
        25
    waibunleung  
    OP
       2019-07-22 18:35:36 +08:00
    @BBCCBB 按照你的说法,async 函数也会被转换成 task 而放到 eventloop 中,那其实代码一和代码二的 async 函数最后都会变成 task 才被加进 event loop,那为什么代码一是顺序的,代码二不是?
    waibunleung
        27
    waibunleung  
    OP
       2019-07-22 18:37:01 +08:00
    @guokeke 可以把你的查看路径发一下给我吗?从哪个函数跳到哪个函数这样的路径
    BBCCBB
        28
    BBCCBB  
       2019-07-22 18:38:25 +08:00
    因为你代码 1 里调用 await 一个协程函数的时候他才加进去啊,又不是你写了 await, 在开始运行的时候就加进去的。。。你 await worker2 的时候, 你 await 的 worker1 早就执行完了。毕竟你 await worker2 的顺序在 await worker1 之后。
    dbow
        29
    dbow  
       2019-07-22 18:39:13 +08:00
    协程的启动
    async def print_msg(msg):
    print(msg)

    coro = print_msg()
    coro.send(None) 执行至等待处(await)
    coro.send(xxx) 恢复执行, xx 作为函数里面 await 的结果
    一直 send, 一直执行, 直到 StopIteration,协程执行完毕。

    create_task 时直接 send(None) 启动了协程执行, 导致两个 task 没有顺序关系。
    BBCCBB
        30
    BBCCBB  
       2019-07-22 18:39:28 +08:00
    建议你先仔细看看官方文档。不然你理解起来很费劲。我们解释起来也费劲。
    waibunleung
        31
    waibunleung  
    OP
       2019-07-22 18:41:35 +08:00
    @BBCCBB 我已经看了不少资料了...你看看我上个问题,你肯定又没仔细看,当然会费劲...
    janxin
        32
    janxin  
       2019-07-22 18:42:18 +08:00
    第一个 await 过程因为是阻塞等待结果的,其实等同于不包含 async/await 的同步写法
    so1n
        33
    so1n  
       2019-07-22 18:43:17 +08:00
    waibunleung
        34
    waibunleung  
    OP
       2019-07-22 18:44:00 +08:00
    @so1n 我用的 pycharm 查看的源码,但是不知道它为什么跳不到你们说的 call_soon 的函数那里去,我点 create_task 之后,跳去了 task.pyi 而不是 task.py ,task.pyi 的__init__方法只有一行...是我看得有问题吗。。。
    dbow
        35
    dbow  
       2019-07-22 18:44:22 +08:00
    create_task 创建了一个 task , task 构造函数里, 使用了 call_soon, call_soon 的意思 event loop 在下一批调度时立刻执行这个 task, 不再等待.
    dbow
        36
    dbow  
       2019-07-22 18:45:31 +08:00
    waibunleung
        37
    waibunleung  
    OP
       2019-07-22 18:46:27 +08:00
    @dbow 看不到你的图片...
    so1n
        38
    so1n  
       2019-07-22 18:48:10 +08:00
    @waibunleung pycharm 跳进去的不一定是正确的, 特别是当这个函数是用实现时,pycharm 只会留下一个文档和一句 pass
    dbow
        39
    dbow  
       2019-07-22 18:48:19 +08:00
    meik2333
        40
    meik2333  
       2019-07-22 18:49:00 +08:00
    换个思路来看,你 await worker_1() 的时候,Python 解释器根本不知道你待会儿还要 await worker_2(),就更无从谈起两个并行执行了。只有等你 await worker_1() 结束后,才能发现还要 await worker_2()。
    guokeke
        41
    guokeke  
       2019-07-22 18:49:22 +08:00
    我发不了外链了。。。
    waibunleung
        42
    waibunleung  
    OP
       2019-07-22 18:49:23 +08:00
    @dbow 还是看不到的
    dbow
        43
    dbow  
       2019-07-22 18:49:29 +08:00
    看上图,call_soon 把 task 加入 loop._ready, loop.run_once 把_ready 的东西 popleft 出来, 开始执行。
    waibunleung
        44
    waibunleung  
    OP
       2019-07-22 18:50:09 +08:00
    @guokeke 直接贴应该可以吧? github 的都可以发?
    guokeke
        45
    guokeke  
       2019-07-22 18:50:20 +08:00
    base_events create_task
    tasks __init__
    base_events call_soon
    base_events _call_soon
    _call_soon 的时候创建好 handle events
    dbow
        46
    dbow  
       2019-07-22 18:50:32 +08:00
    图床是 imgur 可能被墙掉了
    waibunleung
        47
    waibunleung  
    OP
       2019-07-22 18:50:36 +08:00
    @dbow 老哥我真的看不到图...
    guokeke
        48
    guokeke  
       2019-07-22 18:51:17 +08:00
    v2 这个判断逻辑有问题,只是文件名字,都没有 http 就说看起来像垃圾链接。。。
    dbow
        49
    dbow  
       2019-07-22 18:52:39 +08:00
    def call_soon(self, callback, *args, context=None):
    """Arrange for a callback to be called as soon as possible.

    This operates as a FIFO queue: callbacks are called in the
    order in which they are registered. Each callback will be
    called exactly once.

    Any positional arguments after the callback will be passed to
    the callback when it is called.
    """
    self._check_closed()
    if self._debug:
    self._check_thread()
    self._check_callback(callback, 'call_soon')
    handle = self._call_soon(callback, args, context)
    if handle._source_traceback:
    del handle._source_traceback[-1]
    return handle
    在事件循环里
    while self._scheduled:
    handle = self._scheduled[0]
    if handle._when >= end_time:
    break
    handle = heapq.heappop(self._scheduled)
    handle._scheduled = False
    self._ready.append(handle)

    # This is the only place where callbacks are actually *called*.
    # All other places just add them to ready.
    # Note: We run all currently scheduled callbacks, but not any
    # callbacks scheduled by callbacks run this time around --
    # they will be run the next time (after another I/O poll).
    # Use an idiom that is thread-safe without using locks.
    ntodo = len(self._ready)
    for i in range(ntodo):
    handle = self._ready.popleft()
    if handle._cancelled:
    continue
    if self._debug:
    try:
    self._current_handle = handle
    t0 = self.time()
    handle._run()
    dt = self.time() - t0
    if dt >= self.slow_callback_duration:
    logger.warning('Executing %s took %.3f seconds',
    _format_handle(handle), dt)
    finally:
    self._current_handle = None
    else:
    handle._run()
    handle = None # Needed to break cycles when an exception occurs.
    waibunleung
        50
    waibunleung  
    OP
       2019-07-22 18:52:45 +08:00
    @guokeke 创建好 handle events 之后是要等到下一次循环的时候才开始执行吧?什么时候会进入到事件循环的下一轮?
    fzzff
        52
    fzzff  
       2019-07-22 18:54:17 +08:00
    好好理解理解 create_task...看的我有点尴尬
    waibunleung
        53
    waibunleung  
    OP
       2019-07-22 18:55:59 +08:00
    @dbow 按照你的意思,结合代码二,什么时候会进入下一批调度?
    waibunleung
        54
    waibunleung  
    OP
       2019-07-22 18:58:40 +08:00
    @fzzff 嗯嗯,我会努力的。同时也祝愿您下次评论的时候除了这种没有营养的话也多能说说有建设性的东西呢~
    可以接受批评,但拒绝无实际帮助的评论呢,亲
    waibunleung
        55
    waibunleung  
    OP
       2019-07-22 18:59:00 +08:00
    @meik2333 谢谢你啦~
    cxyfreedom
        56
    cxyfreedom  
       2019-07-22 19:14:35 +08:00
    task 部分上面说了,如果你还要再深入的话,事件循环部分细节需要去看 _asynciomodule.c 里面的内容了
    fzzff
        57
    fzzff  
       2019-07-22 19:34:45 +08:00
    首先对刚刚评论的不礼貌向你道歉,这个 creat_task 跟 tornado 中的 add_callback 有些相似,这个 creat_task 再被调用时会直接执行传入的协程对象,并且默认不等待协程结果的执行完毕,你在调用 await worker 的时候在会开始等待 Task 对象执行完成,下面这段代码应该会对理解有些帮助,可以单步走下:
    import asyncio


    async def worker_1():
    print('worker_1 start')
    print('worker_1 done')
    return '123'


    async def worker_2():
    print('worker_2 start')
    # await asyncio.sleep(0)
    print('worker_2 done')
    return '234'


    async def main():
    task1 = asyncio.create_task(worker_1())
    task2 = asyncio.create_task(worker_2())
    print('before await')
    print('awaited worker_1')
    print('awaited worker_2')
    print(task1)
    print(task2)

    asyncio.run(main())
    ilucio
        58
    ilucio  
       2019-07-22 19:44:38 +08:00
    说说我的理解,这两段代码都创建了 3 个 task,但是对于代码 1,这三个 task 的执行顺序是 main-> work1->work2,对于代码 2,这三个 task 的执行顺序是 main-> work1,main->work2,并且可以认为 work1 和 work2 是同时执行的。
    waibunleung
        59
    waibunleung  
    OP
       2019-07-22 19:45:41 +08:00
    @cxyfreedom 相关注释我也看到了,但是由于没有 C 基础,就先不勉强自己,谢谢你啦
    waibunleung
        60
    waibunleung  
    OP
       2019-07-22 19:48:05 +08:00
    @fzzff 好的谢谢你,我试一下
    waibunleung
        61
    waibunleung  
    OP
       2019-07-22 19:54:02 +08:00
    @ilucio 不知道你是看了源码还是看了程序输出得出来的结论,我也不能确定你说的就是对的
    wwqgtxx
        62
    wwqgtxx  
       2019-07-22 20:52:15 +08:00 via iPhone   ❤️ 1
    讲道理,我在你上一个问题的#14 和#16 已经分析过 task 在 init 是怎么把自己放进 eventloop 了,核心点还是在于 call_soon

    另外你提到的 pycharm 跳转到 pyi 文件的问题,那个 pyi 只是用来做类型标记的,并没有具体的实现,如果你想看到实现可以临时去 pycharm 的用户目录把包含 pyi 的目录暂时移走就能进入真正的.py 文件了。
    上面提到了有一部分 eventloop 会涉及到 c 代码,其实那大部分是涉及到怎么 pull 一个 fd 的,并不影响你阅读了解 asyncio 模块的工作原理。对于一个 SelectorEventLoop 是可以纯 python 代码实现的( python3.5 的就是纯 py 代码实现),c 代码只是为了优化性能,大部分情况下源码中还有有纯 python 的替代实现,你可以仔细看看
    waibunleung
        63
    waibunleung  
    OP
       2019-07-22 21:17:42 +08:00
    @wwqgtxx 说实话,发现了更核心点的代码应该是在 call_soon 中传进去的 self.__step 这个函数中,有一段:
    try:
    if exc is None:
    # We use the `send` method directly, because coroutines
    # don't have `__iter__` and `__next__` methods.
    result = coro.send(None)
    else:
    result = coro.throw(exc)
    except StopIteration as exc:
    if self._must_cancel:
    # Task is cancelled right before coro stops.
    self._must_cancel = False
    super().set_exception(futures.CancelledError())
    else:
    super().set_result(exc.value)

    里面的 coro.send(None)是启动协程的关键,#29 有提到了
    另外你之前的答案我也有看,但是一下子没理解下来,现在打算重新梳理一下,无论如何,十分感谢你对我的帮助~
    wwqgtxx
        64
    wwqgtxx  
       2019-07-22 22:51:04 +08:00
    @waibunleung 关于__step 这个地方可能看一下 Python 的生成器那块会有更深的了解,最初的 asyncio 就是用生成器完成的,只是在后续版本中才逐步把 coroutines 和 generators 分离了,后续版本还有了异步生成器的概念,这些内容你可以看一下相关 PEP 文档,那里有解释为什么会采用这样的设计。
    https://www.python.org/dev/peps/pep-0492/
    https://www.python.org/dev/peps/pep-0525/
    renmu
        65
    renmu  
       2019-07-23 07:47:47 +08:00 via Android
    遇到 await 就阻塞了,只有等 await 那个变量执行完才会继续往下运行
    congeec
        66
    congeec  
       2019-07-23 07:59:15 +08:00 via iPhone
    上次没跟你讲清楚我觉得很失败

    你还是拿起编辑器抄一个协程的实现吧,也就几百行,本质状态机
    waibunleung
        67
    waibunleung  
    OP
       2019-07-23 09:44:42 +08:00 via iPhone
    @congeec 不要有这样的想法呀,可能只是我反应比较慢一点。其实大家对我的帮助很多了,真的谢谢你们
    lieh222
        68
    lieh222  
       2019-07-23 09:48:41 +08:00
    我个人的理解
    代码一
    event_loop 里面只有一个任务 main,CPU 执行代码顺序就是
    main()
    main: print('before await')
    main: await worker_1()
    worker_1()
    worker_1: await asyncio.sleep(1)到这里的时候有 IO 事件,让出 CPU 给 event_loop,但是 event_loop 没有其他的任务,所说 CPU 会空置等待 asyncio.sleep(1)完成再切换到 worker_1 中继续执行,接下来都是这样,所以这整个过程是同步执行的

    代码二
    一开始有一个 main 任务
    main()
    main: task1 = asyncio.create_task(worker_1())
    main: task2 = asyncio.create_task(worker_2())
    main: print('before await') 这里 main 并没有让出 CPU,所以先打印 before await
    main: await task1 这时 CPU 直接切换到了 task1,接下来
    worker_1: print('worker_1 start')
    worker_1: await asyncio.sleep(1)直到这里出现 IO 事件才会把 CPU 让出给 event_loop,event_loop 中 main 和 task1 都是 await 状态,CPU 切换到 task2 任务
    worker_2: print('worker_2 start')
    worker_2: await asyncio.sleep(2)出现 IO 时间切回 event_loop,三个任务都在 await,所以 CPU 会空置 1S 等待 asyncio.sleep(1),然后
    worker_1: print('worker_1 done')
    main: print('awaited worker_1')
    main: await task2 等待 asyncio.sleep(2)结束
    worker_2: print('worker_2 done')
    main: print('awaited worker_2')
    rocketman13
        69
    rocketman13  
       2019-07-23 10:50:19 +08:00
    @lieh222 代码二你把 await task1 注释了再跑一次就会发现自己理解错了,await 不是你所理解的 cpu 切换或者启动任务,是主进程等待任务完成
    waibunleung
        70
    waibunleung  
    OP
       2019-07-23 11:23:49 +08:00
    @wwqgtxx
    @dbow
    @guokeke

    讲真的,我试着整理了一下过程:

    create_task->tasks.Task()的__init__->self._loop.call_soon[self.__step 函数作为参数传进去,里面的 result = coro.send(None)是启动协程的关键]
    ->_call_soon[self._ready.append(handle),这表示准备执行的任务队列]
    在事件循环里:
    base_events.py 里 run_forever->_run_once[handle = self._ready.popleft()取出一个任务(task/future),然后 handle._run()执行任务]

    像之前说的 create_task 在将一个 coroutine 转化成 task 之后,将自己放进去了 event loop 中准备在下一轮循环中执行,那问题是是什么时候会进入下一轮循环的呢?

    是不是这样的过程:
    asyncio.run(main())之后,首先将 main()这个协程 coroutine 转化成 task 并加入 event loop,事件循环的第一轮执行 main 这个 task,执行期间将在 main 中创建的
    两个 task(worker_1,worker_2)加入到 event loop 中,运行到 await task1 后让出控制权并检查事件循环里有没有其他任务,发现有刚刚新添加的两个任务,就转而去执行其他任务,
    在其他任务中遇到了 await asyncio.sleep()再跳出来去执行另外的任务....直到所有任务执行完毕。

    但如果是这样的话,好像只是在一轮循环里面就执行完了呀...总感觉哪里不对,是我理解错了吗?
    wwqgtxx
        71
    wwqgtxx  
       2019-07-23 11:37:38 +08:00 via iPhone
    @waibunleung 作为事件循环,当你 await task1 把控制权交还的时候,这一轮循环已经结束,随后立即开始下一轮的事件循环,检查有没有其他任务这已经是下一轮循环干的事了
    wwqgtxx
        72
    wwqgtxx  
       2019-07-23 11:45:01 +08:00 via iPhone
    @rocketman13 根据 python 的协程实现原理,只有在一个 coroutine 主动调用 await 或者 return 的时候才会发生 task 切换,虽然这里的 await task1 的目的是为了等待 task1 结束,但如果你在主 task 中永远不 await 任何一个 future 或者 return 的话(比如跑个死循环),其他 task 是永远不会被执行的。这点和线程可以剥夺式调度完全不同。( ps: python3.6 开始是可以检测一个 coroutine 执行了太长时间而发出警告的,但仍然不可主动剥夺)
    wwqgtxx
        73
    wwqgtxx  
       2019-07-23 11:52:42 +08:00 via iPhone
    @waibunleung 有个建议,你可以尝试自己实现一下如何在单线程下完成“多任务”,基本上就是楼上所说有限状态机,很多单片机程序就是一个大的 while 循环套几组 switch,一组 switch 就是一个 task,每个 switch 的 flag 是一个标志量,内部执行到需要等待的时候就把 flag 值改成下一个代码块,随后主动退出,去让下一组 task 执行,这样实现“伪并发”。当然由于单片机的程序单一,硬件资源极为有限(很多单片机 ram 只有 1kb 甚至更小),这里的 task 是写死的。而 python 中相当于每次 while 循环的时候从队列头取出一组 switch(task)来执行,执行完后再进行下一轮 while (还有些 io 相关的操作暂时省略)
    waibunleung
        74
    waibunleung  
    OP
       2019-07-23 13:42:12 +08:00
    @wwqgtxx 十分感谢!!
    lieh222
        75
    lieh222  
       2019-07-23 13:45:53 +08:00
    @rocketman13 测试了一下,确实没有切换或启动任务的意思
    j0hnj
        76
    j0hnj  
       2019-07-23 14:38:06 +08:00
    赞楼主刨根问底的精神,有点像我当初为了搞清楚 asyncio 工作原理莽着源码看的样子。当看到 Task.__step 之后就明白整个链条是怎么串起来的了,有种醍醐灌顶的感觉,后来用 asyncio 写异步就得心应手了。
    wwqgtxx
        77
    wwqgtxx  
       2019-07-23 15:24:31 +08:00 via iPhone
    @j0hnj 我自己学习的时候是用 yield 和 yield from 手写了一个协程的实现(在 py3.4 的时候,那是 asyncio 还未加入标准库),不过理解的根源还是写“多任务”的单片机程序😂
    waibunleung
        78
    waibunleung  
    OP
       2019-07-23 17:03:16 +08:00
    @j0hnj 我看到 Task.__step 的时候也是跟你一样的感觉啊!!
    waibunleung
        79
    waibunleung  
    OP
       2019-07-23 17:08:07 +08:00
    @wwqgtxx
    @j0hnj
    其实最近提问的主题都被不少人收藏了,可能大家都会对这个感兴趣但是没人问出来并且刨进去,所以真的很感谢大家的帮忙,在这里的解答不仅是帮助到了我,还能照顾到其他收藏了或者点击进来的人,所以才希望有多点有建设性的评论,现在真的是莽着来看源码的,希望看着看着就没那么莽了吧。

    借楼感谢上面这么多的评论的帮助!
    zzth370
        80
    zzth370  
       2019-07-23 22:45:57 +08:00
    async def main():
    task1 = asyncio.create_task(worker_1())
    task2 = asyncio.create_task(worker_2())
    print('before await')
    await asyncio.sleep(2)
    print('awaited worker_1')
    await asyncio.sleep(1)
    print('awaited worker_2')
    结果:
    before await
    worker_1 start
    worker_2 start
    worker_1 done
    awaited worker_1
    worker_2 done
    awaited worker_2


    我把 main 里面的稍微改了下,得到跟你一样的结果,我感觉是 create_task 时就已经把两个任务添加到协程的任务列表里面去了,然后后面遇到阻塞就切换,然后就会得到跟 1 不一样的结果
    zzth370
        81
    zzth370  
       2019-07-23 22:52:56 +08:00
    而你代码二中 await task 造成了一定的误解,造成主动 await 才会执行 task 的假象
    个人见解
    waibunleung
        82
    waibunleung  
    OP
       2019-07-23 22:58:32 +08:00
    @zzth370 你只改了 main 没有改 worker 里面的?
    zzth370
        83
    zzth370  
       2019-07-23 23:04:29 +08:00
    import asyncio


    async def worker_1():
    print('worker_1 start')
    await asyncio.sleep(1)
    print('worker_1 done')

    async def worker_2():
    print('worker_2 start')
    await asyncio.sleep(2)
    print('worker_2 done')

    async def main():
    task1 = asyncio.create_task(worker_1())
    task2 = asyncio.create_task(worker_2())
    print('before await')
    await asyncio.sleep(2)
    print('awaited worker_1')
    await asyncio.sleep(1)
    print('awaited worker_2')


    if __name__ == '__main__':
    asyncio.run(main())

    源码是这样的,就修改了下 main()里面的
    wwqgtxx
        84
    wwqgtxx  
       2019-07-23 23:54:25 +08:00
    @zzth370 其实可以用一个比较简单的方式解释,await 导致了 task 切换,但是 create_task 之后这个 task 已经放进等待列表,所以在 main task 中 await 任何 future 都会导致 worker 被执行,至于 worker_1 和 worker_2 的执行循序,这个就要看 EventLoop 的具体实现了
    waibunleung
        85
    waibunleung  
    OP
       2019-07-23 23:55:35 +08:00
    @zzth370 明白你的意思了,其实只要一个协程里面 await 了,就会进行切换,至于切换到哪一个就看 event loop 的调度了
    waibunleung
        86
    waibunleung  
    OP
       2019-07-23 23:57:09 +08:00
    @wwqgtxx 啊相差了一分钟,其实想表达的是同样的意思,我等下整理一篇博客出来你可以帮忙看看吗
    wwqgtxx
        87
    wwqgtxx  
       2019-07-24 00:35:52 +08:00 via iPhone
    @waibunleung 如果你仔细看过 EventLoop 的实现代码,你会发现其实 asyncio 内部很喜欢用在一个函数结尾调用 call_soon 的方式实现循环,比如
    def _run(a,loop):
    ____if a == XXX:
    ________干一些事
    ____else:
    ________干另一些事
    ____loop.call_soon(a,loop)
    ____return
    这样实现的代码虽然看起来复杂,可能能让你更明白事件循环的执行过程
    wwqgtxx
        88
    wwqgtxx  
       2019-07-24 00:39:50 +08:00 via iPhone
    @wwqgtxx 修正一下,是
    ____loop.call_soon(_run,a,loop)
    这里会发现 call_soon 传入的参数是个普通函数而不是一个 coroutine,为什么这么设计就很有意思了
    waibunleung
        89
    waibunleung  
    OP
       2019-07-24 01:25:12 +08:00
    @wwqgtxx
    文章写好啦 https://blog.csdn.net/qq_29785317/article/details/97054589
    大家看看有什么需要修改的地方~~多多指正噢
    waibunleung
        90
    waibunleung  
    OP
       2019-07-24 01:29:06 +08:00
    @wwqgtxx 我看到了 _run_once 这一部分,不知道是不是你说的 EventLoop 的实现,我觉得是其中一部分
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   3657 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 32ms · UTC 10:44 · PVG 18:44 · LAX 02:44 · JFK 05:44
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.