V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
killva4624
V2EX  ›  Python

这个协程例子中, consumers 是怎么被执行的?

  •  
  •   killva4624 · 2020-11-23 16:45:52 +08:00 · 1655 次点击
    这是一个创建于 1467 天前的主题,其中的信息可能已经有所发展或是发生改变。
    import asyncio
    
    
    # 消费者
    async def consumer(n, q):
        print("consumer {}: 进入商店".format(n))
        while True:
            item = await q.get()
            print("consumer {}: 购买产品 {}".format(n, item))
            await asyncio.sleep(0.15)
            q.task_done()
        print("consumer {}: ending".format(n))
    
    
    # 生产者 A
    async def producer_a(q, num_workers):
        print("生产者 A: 开始生产")
        for i in range(num_workers * 2):
            await q.put("A " + str(i))
            print("生产者 A: 上架产品 A {}".format(i))
            await asyncio.sleep(0.01)
    
    
    # 生产者 B
    async def producer_b(q, num_workers):
        print("生产者 B: 开始生产")
        for i in range(num_workers * 2):
            await q.put("B " + str(i))
            print("生产者 B: 上架产品 B {}".format(i))
            await asyncio.sleep(0.02)
    
    
    # 任务调度
    async def main(num_consumers, num_workers):
        q = asyncio.Queue(maxsize=num_consumers)
        print("初始化生产者")
        prod_a = [asyncio.create_task(producer_a(q, num_workers))]
        prod_b = [asyncio.create_task(producer_b(q, num_workers))]
        print("初始化消费者")
        consumers = [asyncio.create_task(consumer(i, q)) for i in range(num_consumers)]
        print("asyncio 调度开始")
        await asyncio.gather(*prod_a, *prod_b)
        print("生产者 All: ending")
    
        await q.join()
        print("消费者 All: ending")
    
        for c in consumers:
            c.cancel()
    
    
    # main(消费者数量, 生产者数量)
    asyncio.run(main(3, 2))
    
    

    输出结果如下:

    初始化生产者
    初始化消费者
    asyncio 调度开始
    生产者 A: 开始生产
    生产者 A: 上架产品 A 0
    生产者 B: 开始生产
    生产者 B: 上架产品 B 0
    consumer 0: 进入商店
    consumer 0: 购买产品 A 0
    consumer 1: 进入商店
    consumer 1: 购买产品 B 0
    consumer 2: 进入商店
    生产者 A: 上架产品 A 1
    consumer 2: 购买产品 A 1
    生产者 B: 上架产品 B 1
    生产者 A: 上架产品 A 2
    生产者 A: 上架产品 A 3
    consumer 0: 购买产品 B 1
    consumer 1: 购买产品 A 2
    生产者 B: 上架产品 B 2
    consumer 2: 购买产品 A 3
    生产者 B: 上架产品 B 3
    生产者 All: ending
    consumer 0: 购买产品 B 2
    consumer 1: 购买产品 B 3
    消费者 All: ending
    

    我理解 await asyncio.gather(*prod_a, *prod_b) 执行了 prod_aprod_b ,但 consumers 是怎么被触发的呢?

    3 条回复    2020-11-24 08:28:44 +08:00
    mercurylanded
        1
    mercurylanded  
       2020-11-23 16:55:48 +08:00
    create_task 就触发了 gather 是等待结束
    killva4624
        2
    killva4624  
    OP
       2020-11-23 17:24:10 +08:00
    @mercurylanded 谢谢,重新去看了官方文档和源码。
    fasionchan
        3
    fasionchan  
       2020-11-24 08:28:44 +08:00   ❤️ 1
    create_task 的时候,协程被创建,但还没被调度执行;
    await gather 的时候,程序执行权被转移到 asyncio 内部的事件循环;
    这时,先前创建的协程如果达到执行条件,将被事件循环调度执行;
    直到 gather 等待的协程,也就是 producer 执行完毕,程序执行权回到当前代码;
    await q.join 的时候,程序执行权又被转移到事件循环,comsumer 继续执行;
    直到队列 q 为空,这时 consumer 肯定已经完成所有数据消费;

    可以自己动手写一个极简的协程库加深对协程运行原理的理解,只需 100 来行代码,即可一举拿下协程、事件循环、IO 多路复用等核心概念:

    https://www.imooc.com/read/76/article/1935

    我只看 asyncio 文档时,总有一种似懂非懂的感觉;自己折腾过一遍后就清晰了,各种概念对号入座。
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   2816 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 23ms · UTC 07:36 · PVG 15:36 · LAX 23:36 · JFK 02:36
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.