V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
smdbh
V2EX  ›  Python

请教一个 Python 中线程共享数据的问题

  •  
  •   smdbh · 95 天前 · 1314 次点击
    这是一个创建于 95 天前的主题,其中的信息可能已经有所发展或是发生改变。

    我想在 main 和它建立的线程间共享数据,在线程中执行逻辑,更新数据,主线程中读取判断。

    1. 由于数据较多,使用 dataclass 当 struct 用
    2. 线程中写,main 中只读,所有没有加锁 实际使用发现,这个数据共享不是完全引用,变量地址(使用 id 查看两边地址)会有改变,导致 main 和 thread 中的变量不是一个东西了,监测失败。
    3. tricky 的是,第一次创建的线程没有问题。跑完一次,第二次再来一次就大概率出问题,后续再尝试就一直会出问题了,偶尔会成功。 请问如果要实现多线程共享数据的读写,有什么最佳实现和模板吗
    7 条回复    2024-11-13 18:32:53 +08:00
    smdbh
        1
    smdbh  
    OP
       95 天前
    补充下,我当前是将 dataclass 的结构当参数传入 threading.Thead 的参数中,这个操作是否有问题
    djangovcps
        2
    djangovcps  
       95 天前
    threading.lock ?
    qianchengv
        3
    qianchengv  
       95 天前
    ```python
    import threading
    import time
    from concurrent.futures import ThreadPoolExecutor
    import unittest
    from dataclasses import dataclass, field
    from threading import Lock
    import multiprocessing

    @dataclass
    class SharedData:
    value: int = 0
    # Using a Lock to ensure thread-safety when accessing shared data
    lock: Lock = field(default_factory=Lock, init=False, repr=False)

    def increment(self):
    with self.lock:
    self.value += 1

    def get_value(self):
    with self.lock:
    return self.value

    def worker(data: SharedData, num_iterations: int):
    local_sum = 0
    for _ in range(num_iterations):
    local_sum += 1
    # Use a lock to safely update the shared data
    with data.lock:
    data.value += local_sum

    class TestSharedDataThreadSafety(unittest.TestCase):
    def test_concurrent_increments(self):
    shared_data = SharedData()
    # Use 2x CPU count for threads to test both CPU-bound and I/O-bound scenarios
    num_threads = multiprocessing.cpu_count() * 2
    num_iterations = 1000000 // num_threads

    with ThreadPoolExecutor(max_workers=num_threads) as executor:
    futures = [executor.submit(worker, shared_data, num_iterations) for _ in range(num_threads)]
    for future in futures:
    future.result()

    expected_value = num_threads * num_iterations
    self.assertEqual(shared_data.get_value(), expected_value,
    f"Expected {expected_value}, but got {shared_data.get_value()}")

    def test_race_condition(self):
    shared_data = SharedData()
    race_detected = threading.Event()

    def racer():
    with shared_data.lock:
    initial_value = shared_data.value
    time.sleep(0.001) # Simulate some work
    # Check if the value has changed, which would indicate a race condition
    if initial_value == shared_data.value:
    shared_data.value += 1
    else:
    race_detected.set()

    threads = [threading.Thread(target=racer) for _ in range(100)]
    for t in threads:
    t.start()
    for t in threads:
    t.join()

    self.assertFalse(race_detected.is_set(), "Race condition detected")

    def test_stress_test(self):
    shared_data = SharedData()
    stop_flag = threading.Event()

    def stress_worker():
    local_sum = 0
    while not stop_flag.is_set():
    local_sum += 1
    # Use a lock to safely update the shared data after intensive local computation
    with shared_data.lock:
    shared_data.value += local_sum

    # Use CPU count for threads to maximize resource utilization
    threads = [threading.Thread(target=stress_worker) for _ in range(multiprocessing.cpu_count())]
    for t in threads:
    t.start()

    time.sleep(5) # Run for 5 seconds to simulate prolonged stress
    stop_flag.set()

    for t in threads:
    t.join()

    print(f"Stress test final value: {shared_data.get_value()}")

    if __name__ == '__main__':
    unittest.main()
    ```
    ClericPy
        4
    ClericPy  
       94 天前
    show me your code?
    milkpuff
        5
    milkpuff  
       94 天前
    修改 data.a, data 的 id 不会变。
    UN2758
        6
    UN2758  
       59 天前
    @qianchengv 佬,你这大段的代码还不如贴一个 gist 链接呢,比这个方便阅读多了
    UN2758
        7
    UN2758  
       59 天前
    多线程,要么加锁,要么用 queue 之类的库,至于你的问题二,地址会移动可能是因为对象大小改变导致的重新分配地址
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   2641 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 23ms · UTC 11:20 · PVG 19:20 · LAX 03:20 · JFK 06:20
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.