V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
echome
V2EX  ›  Python

[求助] Python 多线程通信 (好心人救救孩子,点开工资超级加倍)

  •  
  •   echome · 2020-05-01 00:29:00 +08:00 · 4454 次点击
    这是一个创建于 1703 天前的主题,其中的信息可能已经有所发展或是发生改变。

    如何从一个需要执行很久的线程当中获取返回数据

    有一个线程当中的函数做 funcA,funcA 需要花费很多时间,并且 funcA 会返回我们需要的数据 我该如何获得其中的数据? 网上一般是

    1. 使用 queue 作为全局变量,但是如果在 funcA 里面 put 数据的话,在主线程当中 get 数据会返回 None,因为 funcA 会花费很多的时间,所以主线程获得的都是 None 。
    q = queue.Queue(10)
    def funcA():
         
         # 花费很多时间做一些事得到一个数组叫做 res
        for i in res:
            q.put(i)
    
    res=[]
    while not empty(q.get()):
       res.append(q.get())
    
    1. 是重构 thread 模块写一个自己的 thread 类,其中新建一个 get_result 方法。如果重构的话会发现也是 None,因为在 funcA 会使用很多的时间,然后使用 get_result 方法的话会返回 None,因为在 run 方法当中并没有返回数据。
    class MyThread(Thread):
        def __init__(self, target, args):
            super(MyThread, self).__init__()
            self.func = target
            self.args = args
    
    
        def run(self):
            print("I have done")
            self.result = self.func(*self.args)
    
        def get_result(self):
            while self.lock:
                pass
            return self.result
    
    .............
    .............
    thread = MyThread(target=funca, args=(domain,))
    thread.start()
    res=thread.get_result()
    

    我也想过方法,比如在 get_result 当中等待 run 方法执行完毕,但是这样就会导致线程阻塞的问题

    所以问题是如何在需要花费很多时间的线程当中得到数据并且不会阻塞。 跪求大神!!!

    第 1 条附言  ·  2020-05-01 22:21:04 +08:00

    先说说项目的背景

    自己用pyqt做了一个界面,大家都知道qt的主线程不能阻塞,一旦阻塞的话在不同的界面就会出现卡顿的问题
    我在一个tab当中有一个需要消耗很长时间的funcA,我之前的办法是将funcA放到子线程。 但是后面需要funcA的返回数据,那么我想到了之前说的两种方法,一个是重写线程类,一个是queue让子线程和父线程进行通信。
    但是实际的使用会发现,都会出现父线程等待子线程结束之后拿到数据的问题。不然的话,你接受queue的数据的时候就会是None!

    这里再说一下提的问题
    本质是一个子线程当中的函数funcA需要花费很久时间才能有结果。如何在不堵塞父线程的情况下从子线程当中获得返回子线程的结果

    如下代码,我现在需要获得funcA和funcB两个函数的返回值

    def funcA():
        #spend much time 
        return resa
    def funcB():
        #spend much time 
       return res
    

    如果我不使用线程的话那么执行funcA或者funcB都是只能执行一个,另外一个只有等

    funcA()
    funcB() #只有等待funA执行完之后才能执行funB ,堵塞
    

    后面我使用了多线程进行处理

    qa = queue.Queue(10)
    def funcA():
         
         # 花费很多时间做一些事得到一个数组叫做 res
        for i in res:
            qa.put(i)
    def funcB():
         
         # 花费很多时间做一些事得到一个数组叫做 res
        for i in res:
            qb.put(i)
    
    resa=[]
    
    threadA = Thread(target=funcA, args=(xx))
    threadA.start()
    
    threadB = Thread(target=funcB, args=(xx))
    threadB.start()
    
    while not empty(qa.get()):   #实际执行的时候你会发现如果没有写join的话,这里的res会得到None,因为线程A并没有执行完成,但是如果写了join的话,那么实际上还是和之前没有使用多线程一样,做了funcA等待funcA执行完成之后才能做funcB
       res.append(qa.get())  
    
    qb = queue.Queue(10)
    resb=[]
    while not empty(qb.get()):
       res.append(qb.get())
    

    写出上面这段代码就是我提出问题的节点。我发现如果想要得到其中线程的返回数据的时候还是会出现堵塞的情况?有没有什么写法能够得让funcA和funcB都能得到结果并且不会相互之间不会被堵塞呢?


    各位大哥提出的几种方法

    1.使用线程池
    这个方法在等待后面子线程数据返回的时候主线程是堵塞的,因为要调用get_result,就会一直堵塞。 但是这个方法给了我灵感十分感谢!

    2.放弃多线程使用异步
    仔细想了想,也是可以使用的,我们堵塞的本质是因为一直要监听子进程的返回,那么如果使用异步是可以的! 是否表明异步和多线程底层实现具有相关性呢


    最后的解决方法
    新建一个funC,在funC当中创建线程池funcA和funcB的线程,并在其中等待funcA和funcB的返回结果并且进行处理
    然后在主线程当中创建funC的线程,这样不管funcA和funcB如何堵塞都不会让主线程堵塞了!

    最后十分感谢大家的帮助,V2EX是一个有爱的地方,祝大家工资年年超级加倍!!

    22 条回复    2020-05-08 16:08:18 +08:00
    ClericPy
        1
    ClericPy  
       2020-05-01 00:34:56 +08:00
    你该学的是 Future 的用法...
    echome
        2
    echome  
    OP
       2020-05-01 00:36:03 +08:00
    啥意思大哥
    marquina
        3
    marquina  
       2020-05-01 00:38:29 +08:00 via Android
    自从接触了 concurrent.futures 后,就再也没有用过 thread 库了
    lithbitren
        4
    lithbitren  
       2020-05-01 00:44:03 +08:00
    没懂题目啥意思,不过 io 密集用协程更好,cpu 密集的用多进程更好,阻塞不管设在哪里,是否有回调,两者的 API 都比多线程丰富,多线程讲实话还是比较鸡肋的。
    billwsy
        5
    billwsy  
       2020-05-01 00:44:51 +08:00 via iPhone
    所以“网上的方法”问题在哪里…主程序总是要等 funcA put 数据了之后才能 get 到数据

    不会有什么神奇的办法让 funcA 在超长执行结束之前就产生出你想要的数据吧
    wuwukai007
        6
    wuwukai007  
       2020-05-01 00:56:26 +08:00
    from concurrent.futures import ThreadPoolExecutor
    pool = ThreadPoolExecutor()
    job = pool.submit(func)
    try:
    ---- func_result = job.result()
    except Exception as e:
    ----logger.exception(e)
    ---- raise e
    StephenKwen
        7
    StephenKwen  
       2020-05-01 01:04:21 +08:00 via Android
    线程之间使用条件锁,这样一个线程执行获得数据之后,可以通过条件锁唤醒另一个在等待的线程
    echome
        8
    echome  
    OP
       2020-05-01 07:19:13 +08:00
    @billwsy 的确是,但是如果要等待子线程执行完成得到其中的数据的话那么主线程就要一直等待。这样就形成了阻塞!
    如果不等待的话就会得到 None 的返回值!
    Les1ie
        9
    Les1ie  
       2020-05-01 07:44:03 +08:00
    (大概算是实现了楼主的接收返回数据的需求?
    这里用 func 代表消耗时间的操作,main 函数的线程池里面调用 result()方法将主线程阻塞,等待所有子线程执行完毕再返回结果

    https://i.loli.net/2020/05/01/mWN5jt2HpFfM6VL.png
    Les1ie
        10
    Les1ie  
       2020-05-01 07:47:19 +08:00
    那么我的工资超级加倍去哪里领 :( 穷.jpg
    crella
        11
    crella  
       2020-05-01 07:51:28 +08:00 via Android
    要么像楼上那样使用多进程;要么拆分 funcA,先处理必须在 get_result()之前要处理的数据,不怎么重要的数据在 get_result()之后开一个线程来完成
    noparking188
        12
    noparking188  
       2020-05-01 09:17:25 +08:00
    也许可以讲一下你想实现什么,或者有比多线程更好的方式
    之前看 redis 实现学到一个概念 - Reactor 模型,虽然没咋看懂,但是感觉题主的需求可以借鉴下这种思想
    有点没明白你的表达,是必须拿到另一个线程产出的数据才能执行接下来的操作,这是串行?或者另一个线程产出数据之前,其它线程做自己的事,有数据了再工作
    可能我理解能力差了点
    Sanko
        13
    Sanko  
       2020-05-01 10:17:56 +08:00 via Android
    join?
    guyskk0x0
        14
    guyskk0x0  
       2020-05-01 11:00:27 +08:00 via Android
    @echome 如果一件事情要 10 秒做完,你想 1 秒就拿到结果,要穿越时空?
    sudoer
        15
    sudoer  
       2020-05-01 12:09:00 +08:00
    我点开了,工资加倍在哪领。。。(阿巴阿巴阿巴)
    oahebky
        16
    oahebky  
       2020-05-01 13:14:59 +08:00
    你想让主(一个)线程(thr-A)取得子(另一个)线程(thr-B)的数据,又不想 thr-A 阻塞。

    那么你的 thr-A 在除了“创建 thr-B,并让它工作得到结果” 之外还有其它事情做吗?

    如果 thr-A 本来就有事情做,比如它有 func1, func2, func3, func4 的事情做,
    另外还有一个得到 thr-B 的结果后的 func_I 事情做。
    那么你本来只是需要
    func1 -> func2 -> func3 -> func4 -> get thr-B result --True? --Y--> func_I
    这么重复循环 LOOP 就可以了。

    如果你想 thr-A 做其它事情,然后 thr-B 产生结果的时候,thr-A 马上就去处理 thr-B 的结果,
    那么这就有中断的意思了。
    主要要解决:
    func1, func2, ...; 即 thr-A 中的事情是否依赖 thr-B 的 result (或者说 func_I 的 result )?

    是:
    那么这本来就是会阻塞等待结果的程序,不论你怎么设计它都是会阻塞的。
    比如你有 10 个 thr-B1,thr-B2, ... thr-B10; 那么即使 10s 之后每一秒 queue 中都有 result 存在; func1, func2, func3 ... 等依赖结果的任务,即使 10s 之后马不停蹄地执行(因为会不断地取得结果),但是它即使在忙碌,可是它本质上还是阻塞的。

    否:
    那你就大胆创建一个 thr-B: do jobs; 的同时;再创建一个 thr-C: handle result from thr-B 。
    thr-C 在等待 queue 时阻塞是(可以视为完全)不影响程序性能的(非 `while (1) ;;`),即 CPU (中的单核)不会 100% 占用。

    所以搞清楚了这些,还有没有必要纠结阻塞问题?
    superrichman
        17
    superrichman  
       2020-05-01 14:18:07 +08:00 via iPhone
    多线程换成异步 io
    lewinlan
        18
    lewinlan  
       2020-05-02 00:29:17 +08:00 via Android
    我记得 queue 的 get 应该会阻塞吧?不应该返回 None 啊。
    如果真的不阻塞的话,那就无限循环 sleep 去等呗……这种情况用协程才是对的。
    建议先学操作系统,理解一下线程,内存,上下文之类的东西。否则很难搞懂。
    echome
        19
    echome  
    OP
       2020-05-02 00:35:39 +08:00
    @lewinlan 在主线程当中对 queue 进行 get 之后保存到数组当中会得到 None,并不会堵塞。如果无限循环等待的话多线程的意义就丢失了。
    Marinej
        20
    Marinej  
       2020-05-07 15:33:44 +08:00
    用 deque 线程安全的双端队列字 节码级别 线程安全
    Marinej
        21
    Marinej  
       2020-05-07 15:36:54 +08:00
    @Marinej 妈个鸡没看完, 用 asyncio, 里面也可以用线程池,但是可以用异步的写法
    ps1aniuge
        22
    ps1aniuge  
       2020-05-08 16:08:18 +08:00
    看得我脑仁疼,一个头 2 个大。

    楼主是 cpu 密集型,还是 io 密集型啊?
    cpu 密集就用多进程,任你开 2 万个线程(池),也解决不了你的问题。因为多线程,只能用单核,对吧?
    io 密集就用 asyncio 啊。为啥么要折腾线程池?
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   2546 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 25ms · UTC 05:35 · PVG 13:35 · LAX 21:35 · JFK 00:35
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.