V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
V2EX 提问指南
resolvewang
V2EX  ›  问与答

aiohttp 怎么复用连接池

  •  
  •   resolvewang · 2017-06-11 16:14:48 +08:00 · 6088 次点击
    这是一个创建于 2503 天前的主题,其中的信息可能已经有所发展或是发生改变。

    最近,在看 python 的异步编程( asyncio )部分,在使用 aiomysql 的时候遇到了困难,已经困惑我两三天了。可能是自己资质愚钝,看了 aiomysql 的官网例子( https://github.com/aio-libs/aiomysql/tree/master/examples ),我还是没能弄懂怎么才能多个数据库操作中复用 pool。下面是我的代码

    import aiomysql
    import asyncio
    
    
    async def select(loop, sql):
        pool = await aiomysql.create_pool(host='127.0.0.1', port=3306,
                                          user='root', password='123456',
                                          db='test', loop=loop)
        async with pool.acquire() as conn:
            async with conn.cursor() as cur:
                await cur.execute(sql)
                r = await cur.fetchone()
                print(r)
    
    
    async def insert(loop, sql):
        pool = await aiomysql.create_pool(host='127.0.0.1', port=3306,
                                          user='root', password='123456',
                                          db='test', loop=loop)
        async with pool.acquire() as conn:
            async with conn.cursor() as cur:
                await cur.execute(sql)
                await conn.commit()
    
    
    async def main(loop):
        c1 = select(loop=loop, sql='select * from minifw')
        c2 = insert(loop=loop, sql="insert into minifw (name) values ('hello')")
        tasks = [
            asyncio.ensure_future(c1),
            asyncio.ensure_future(c2)
        ]
        return await asyncio.gather(*tasks)
    
    if __name__ == '__main__':
        cur_loop = asyncio.get_event_loop()
        cur_loop.run_until_complete(main(cur_loop))
    
    
    
    

    上面这样做的话,每次做数据库操作的时候,应该都会执行一次 create_pool这个操作。现在我的问题是应该怎么改上面的代码,让连接池可以复用啊?

    以前没怎么接触异步编程,希望大家能解答一下我的疑惑,感谢

    12 条回复    2017-06-12 16:40:48 +08:00
    qs
        1
    qs  
       2017-06-11 18:46:22 +08:00   ❤️ 1
    pool 放到全局变量里 创建前先判断下 pool 是否可用 具体可以看看单例的写法
    resolvewang
        2
    resolvewang  
    OP
       2017-06-11 18:59:53 +08:00
    @qs 能否改一下给我看看,我用全局变量试过,应该是我能力不够吧,改过后调试不通。感谢
    messense
        3
    messense  
       2017-06-11 21:18:32 +08:00   ❤️ 1
    把 pool 放到 main 里面?

    import aiomysql
    import asyncio


    async def select(pool, sql):
    async with pool.acquire() as conn:
    async with conn.cursor() as cur:
    await cur.execute(sql)
    r = await cur.fetchone()
    print(r)


    async def insert(pool, sql):
    async with pool.acquire() as conn:
    async with conn.cursor() as cur:
    await cur.execute(sql)
    await conn.commit()


    async def main(loop):
    pool = await aiomysql.create_pool(host='127.0.0.1', port=3306,
    user='root', password='123456',
    db='test', loop=loop)
    c1 = select(pool, sql='select * from minifw')
    c2 = insert(pool, sql="insert into minifw (name) values ('hello')")
    tasks = [
    asyncio.ensure_future(c1),
    asyncio.ensure_future(c2)
    ]
    return await asyncio.gather(*tasks)

    if __name__ == '__main__':
    cur_loop = asyncio.get_event_loop()
    cur_loop.run_until_complete(main(cur_loop))
    resolvewang
        4
    resolvewang  
    OP
       2017-06-11 22:18:54 +08:00
    @messense 感谢,这样确实是可以的。我咋就没想到。
    resolvewang
        5
    resolvewang  
    OP
       2017-06-11 22:34:11 +08:00
    @messense 大神,是否还有别的方法,可以把 pool 从 main() 中抽出来,这样写我又遇到问题了,比如我要写单元测试用例,应该怎么写啊,我写的运行使用不行

    <pre>
    import asyncio
    import unittest
    import aiomysql
    from minifw.db import base_db


    class TestDB(unittest.TestCase):
    def setUp(self):
    self.loop = asyncio.new_event_loop()
    asyncio.set_event_loop(self.loop)
    self.pool = aiomysql.create_pool(host='127.0.0.1', port=3306,
    user='root', password='123456',
    db='test', loop=self.loop)

    def test_select(self):
    sql = 'select * from minifw where id = (%s)'
    rs = self.loop.run_until_complete(base_db.select(self.pool, sql, args=(1,), size=1))
    self.assertEqual(len(rs), 1)

    def test_insert(self):
    sql = 'insert into `minifw` (`name`) values (?)'
    rs = self.loop.run_until_complete(base_db.insert(self.pool, sql, args=('test_val',)))
    self.assertEqual(rs, 1)

    def tearDown(self):
    self.loop.close()
    del self.loop

    if __name__ == '__main__':
    unittest.main()
    </pre>

    这里的问题就是这个 pool 我不知道咋处理,如果我要运行 unnittest.main(),就会报错 `AttributeError: '_PoolContextManager' object has no attribute 'acquire'`
    messense
        6
    messense  
       2017-06-11 23:40:15 +08:00   ❤️ 1
    aiomysql.create_pool 需要 await 吧?没有 await 它返回的是个 coroutine,试试:

    self.pool = self.loop.run_until_complete(aiomysql.create_pool(...))
    resolvewang
        7
    resolvewang  
    OP
       2017-06-12 09:55:40 +08:00
    @messense 感谢您的耐心回复!通过你的回复,我对 asyncio 的理解进了一步,前两天看了差不多一天的文档,都还是有点懵。现在还有两个问题,我还是不是很懂,希望您能再帮忙解一下惑。

    `
    async def select(pool, sql, args=(), size=None):
    async with pool.acquire() as conn:
    async with conn.cursor() as cur:
    await cur.execute(sql.replace('?', '%s'), args)
    if size:
    r = await cur.fetchmany(size)
    else:
    r = await cur.fetchall()
    return r
    `

    我在做关于这段代码的单元测试的时候,虽然执行正确,但是有一个警告,` ResourceWarning: Unclosed connection <aiomysql.connection.Connection object at 0x1030d1710>
    ResourceWarning)`

    大概意思就是数据库连接没有关闭吧。但是这里用了上下文管理器啊,它不应该会关闭连接吗?我如果在 return 语句之前,加上 conn.close(), 就不会报这个警告了。

    `
    def test_select(self):
    sql = 'select * from minifw where id = (%s)'
    rs = self.loop.run_until_complete(base_db.select(self.pool, sql, args=(1,), size=1))
    self.assertEqual(len(rs), 1)
    `
    这段代码是测试 select。


    另外还有一个问题,我想问问
    self.loop = asyncio.new_event_loop()
    asyncio.set_event_loop(self.loop)


    self.loop = asyncio.get_event_loop()
    的区别,因为在我执行 unnittest.main()的时候,用前者就可以执行,用后者就会报错,`Event loop is closed`,如果是测试单个数据库操作,后者就不会报错。

    期望您的回复,感恩
    messense
        8
    messense  
       2017-06-12 12:08:54 +08:00   ❤️ 1
    按道理说会自动关闭连接,不清楚 aiomysql 的具体实现细节。

    ef tearDown(self):
    self.loop.close()
    del self.loop

    你 tearDown 里面把 loop close 了。。。
    messense
        9
    messense  
       2017-06-12 12:10:19 +08:00
    Python 3.6 以后已经不推荐传递 event loop 了,需要用到 event loop 的时候调用 asyncio.get_event_loop() 就好了。
    resolvewang
        10
    resolvewang  
    OP
       2017-06-12 13:42:41 +08:00
    @messense 我一直以为 teardown 和 setup 在 unnittest.main()中只会运行一遍。。。

    event loop 在程序的整个生命流程中,如果被 close 了,那么就不能使用 asyncio.get_event_loop()获取了吗?根据上面的代码,感觉是这样的。那这也就是说,我们只能在程序不用 event loop 的时候再关吗?

    感觉自己对 event loop 模型还不是特别清楚。
    messense
        11
    messense  
       2017-06-12 15:16:26 +08:00
    一般来说不需要手动 close event loop
    resolvewang
        12
    resolvewang  
    OP
       2017-06-12 16:40:48 +08:00
    @messense 好的,明白了,感谢
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   我们的愿景   ·   实用小工具   ·   3640 人在线   最高记录 6543   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 27ms · UTC 04:19 · PVG 12:19 · LAX 21:19 · JFK 00:19
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.