V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
The Go Programming Language
http://golang.org/
Go Playground
Go Projects
Revel Web Framework
deavorwei
V2EX  ›  Go 编程语言

各位大佬,请教一个 golang 多线程的阻塞问题

  •  
  •   deavorwei · 122 天前 · 1742 次点击
    这是一个创建于 122 天前的主题,其中的信息可能已经有所发展或是发生改变。

    编程小白,在写一个多线程目录文件遍历的时候,出现了阻塞问题,求教各位大佬~

    通过增大 var taskChan = make(chan string, 1000),chan 缓冲区为 100 万的时候程序不会阻塞

    但是我通过打印日志发现 taskChan 占用很小,只有十几,而且存在通道写入失败的情况

    taskChan 的缓冲区为 1000 时,阻塞的日志如下:

    [DEBUG]:增加目录,增加 wg, [1802], taskChan = [0]
    [ERROR]:写入通道失败...
    [DEBUG]:增加目录,增加 wg, [1842], taskChan = [0]
    [ERROR]:写入通道失败...
    [DEBUG]:增加目录,增加 wg, [1843], taskChan = [0]
    [ERROR]:写入通道失败...
    [DEBUG]:增加目录,增加 wg, [1844], taskChan = [0]
    [DEBUG]:增加目录,增加 wg, [1801], taskChan = [5]
    [DEBUG]:任务完成,减小 wg, [1798], taskChan = [1]
    [DEBUG]:任务完成,减小 wg, [1797], taskChan = [17]
    [DEBUG]:任务完成,减小 wg, [1816], taskChan = [4]
    [DEBUG]:增加目录,增加 wg, [1803], taskChan = [1]
    [DEBUG]:任务完成,减小 wg, [1843], taskChan = [1]
    [DEBUG]:任务完成,减小 wg, [1797], taskChan = [21]
    [DEBUG]:任务完成,减小 wg, [1841], taskChan = [24]
    [DEBUG]:增加目录,增加 wg, [1841], taskChan = [0]
    [DEBUG]:增加目录,增加 wg, [1841], taskChan = [0]
    [DEBUG]:增加目录,增加 wg, [1841], taskChan = [0]
    [DEBUG]:增加目录,增加 wg, [1841], taskChan = [0]
    [DEBUG]:任务完成,减小 wg, [1840], taskChan = [6]
    [DEBUG]:任务完成,减小 wg, [1798], taskChan = [0]
    [DEBUG]:任务完成,减小 wg, [1840], taskChan = [13]
    [ERROR]:写入通道失败...
    [DEBUG]:增加目录,增加 wg, [1840], taskChan = [0]
    [DEBUG]:增加目录,增加 wg, [1841], taskChan = [0]
    [DEBUG]:增加目录,增加 wg, [1842], taskChan = [0]
    [DEBUG]:增加目录,增加 wg, [1842], taskChan = [2]
    [DEBUG]:增加目录,增加 wg, [1842], taskChan = [0]
    [DEBUG]:增加目录,增加 wg, [1843], taskChan = [0]
    [ERROR]:写入通道失败...
    [DEBUG]:增加目录,增加 wg, [1844], taskChan = [0]
    [ERROR]:写入通道失败...
    [DEBUG]:增加目录,增加 wg, [1845], taskChan = [0]
    [ERROR]:写入通道失败...
    [DEBUG]:增加目录,增加 wg, [1846], taskChan = [0]
    [ERROR]:写入通道失败...
    [DEBUG]:增加目录,增加 wg, [1847], taskChan = [0]
    [ERROR]:写入通道失败...
    [DEBUG]:增加目录,增加 wg, [1848], taskChan = [0]
    [ERROR]:写入通道失败...
    [DEBUG]:增加目录,增加 wg, [1849], taskChan = [0]
    

    如果把 taskChan 的缓冲区为 100 万的时候,程序可以正常退出,日志如下:

    [DEBUG]:增加目录,增加 wg, [2], taskChan = [0]
    [DEBUG]:增加目录,增加 wg, [3], taskChan = [0]
    [DEBUG]:增加目录,增加 wg, [4], taskChan = [0]
    [DEBUG]:任务完成,减小 wg, [3], taskChan = [0]
    [DEBUG]:任务完成,减小 wg, [3], taskChan = [0]
    [DEBUG]:任务完成,减小 wg, [3], taskChan = [0]
    [DEBUG]:任务完成,减小 wg, [1], taskChan = [0]
    [DEBUG]:增加目录,增加 wg, [4], taskChan = [0]
    [DEBUG]:增加目录,增加 wg, [2], taskChan = [0]
    [DEBUG]:任务完成,减小 wg, [1], taskChan = [0]
    [DEBUG]:任务完成,减小 wg, [2], taskChan = [0]
    [DEBUG]:任务完成,减小 wg, [2], taskChan = [0]
    [DEBUG]:任务完成,减小 wg, [1], taskChan = [0]
    [DEBUG]:任务完成,减小 wg, [1], taskChan = [0]
    [DEBUG]:任务完成,减小 wg, [0], taskChan = [0]
    [INFO]:目录扫描完毕
    [DEBUG]:func GetAllFilePath end
    [DEBUG]:func StartScan end
    [DEBUG]:func btnStartScanOnclick end
    

    代码如下:

    package core
    
    import (
    	"DopliGo/logs"
    	"github.com/panjf2000/ants/v2"
    	"os"
    	"path/filepath"
    	"sync"
    	"sync/atomic"
    )
    
    func GetAllFilePath(rootPath string) {
    	//logs.IsLogDebug = false
    	logs.Debug("func GetAllFilePath start")
    	// 创建任务通道和结果通道
    	var taskChan = make(chan string, 1000000)
    	var resultChan = make(chan string, 1000000)
    	var wg sync.WaitGroup
    	var counter int64 = 0
    
    	// 创建生产者 goroutine 池
    	producerPool, _ := ants.NewPoolWithFunc(16, func(i interface{}) {
    		produceTasks(i.(string), taskChan, resultChan, &counter, &wg)
    	})
    
    	logs.Debug("cap:%d", producerPool.Cap())
    	defer producerPool.Release()
    
    	taskChan <- rootPath
    	wg.Add(1) // 这里增加计数器
    	atomic.AddInt64(&counter, 1)
    	logs.Debug("任务开始,增加 wg, [%d], taskChan = [%d]", atomic.LoadInt64(&counter), len(resultChan))
    
    	// 启动生产者
    	go func() {
    		//defer logs.Debug("生产者退出")
    		for task := range taskChan {
    			err := producerPool.Invoke(task)
    			if err != nil {
    				logs.Error("failed to producerPool Invoke, err: %s", err)
    				return
    			}
    		}
    	}()
    
    	// 启动结果处理 goroutine
    	go func() {
    		//defer logs.Debug("消费者退出")
    		for result := range resultChan {
    			_ = result
    		}
    	}()
    
    	// 等待所有任务完成
    	wg.Wait()
    	close(resultChan)
    	close(taskChan)
    	logs.Info("目录扫描完毕")
    	logs.Debug("func GetAllFilePath end")
    }
    
    func produceTasks(rootPath string, taskChan chan string, resultChan chan string, counter *int64, wg *sync.WaitGroup) {
    	defer wg.Done() // 确保每次 produceTasks 完成时,调用 Done
    	//	logs.Debug("func produceTasks start")
    
    	entries, err := os.ReadDir(rootPath)
    	if err != nil {
    		logs.Error("failed to read dir: %s , err: %s", rootPath, err)
    		return
    	}
    
    	for _, entry := range entries {
    		path := filepath.Join(rootPath, entry.Name())
    		if entry.IsDir() {
    			wg.Add(1)
    			atomic.AddInt64(counter, 1)
    			select {
    			case taskChan <- path:
    				// 发送成功
    			default:
    				// 发送失败,通道已满
    				logs.Error("写入通道失败...")
    			}
    			logs.Debug("增加目录,增加 wg, [%d], taskChan = [%d]", atomic.LoadInt64(counter), len(resultChan))
    		} else {
    			resultChan <- path
    		}
    	}
    
    	atomic.AddInt64(counter, -1)
    	logs.Debug("任务完成,减小 wg, [%d], taskChan = [%d]", atomic.LoadInt64(counter), len(resultChan))
    	//logs.Debug("func produceTasks end")
    }
    
    
    18 条回复    2024-08-23 15:40:21 +08:00
    zpfhbyx
        1
    zpfhbyx  
       122 天前
    select 的时候 写入到 chan 不阻塞 chan 满的时候会直接执行 default
    josexy
        2
    josexy  
       122 天前
    ```go
    if entry.IsDir() {
    atomic.AddInt64(counter, 1)
    select {
    case taskChan <- path:
    // 发送成功
    wg.Add(1)
    default:
    // 发送失败,通道已满
    logs.Error("写入通道失败...")
    }

    ```
    你把 wg.Add(1) 放到里面,然后 channel 容量设置大点就可以了,这样只有发送成功才处理
    matytan
        3
    matytan  
       122 天前
    produceTasks 中 for 循环 wg.add(1)多次,但是只 done 了一次(函数结束)为什么?而且你这个 wg 用的好奇怪
    DefoliationM
        4
    DefoliationM  
       122 天前 via Android
    default 删了,如果你想控制退出,把 default 换成 context 。
    matytan
        5
    matytan  
       122 天前   ❤️ 1
    分析了一下你这个代码打印一定是阻塞在了读取 taskChan ,为什么堵塞,大概率是协程池 invoke 的时候堵塞了,我换成 go 携程跑没问题。具体为什么可能需要探索下 ants
    go func() {
    for task := range taskChan {
    fmt.Printf("task: %s\n", task)
    // err := producerPool.Invoke(task)
    // if err != nil {
    // fmt.Printf("failed to producerPool Invoke, err: %s\n", err)
    // return
    // }
    go produceTasks(task, taskChan, resultChan, &counter, &wg)
    }
    }()
    matytan
        6
    matytan  
       122 天前
    @matytan #5 produceTasks 中另外 defalut 要删掉,通道满了应该也要等吧,不然可能会漏?
    deavorwei
        7
    deavorwei  
    OP
       122 天前
    @matytan 大佬牛逼,taskChan 设置为 1000 我用协程跑也没问题了,ants 我得再去看看怎么用比较合适; default 是我为了诊断是不是 taskChan 写不进去才加的,正常应该是没有。
    另外请教下,为什么 taskChan <- path 写不进去,我实时打印 len ,占用都只有几十,我长度设置的 1000🤣。
    deavorwei
        8
    deavorwei  
    OP
       122 天前
    @zpfhbyx 是的,但是我打印的日志 taskChan 占用只有几十,容量设置是 1000 ,应该不会满才对
    yianing
        9
    yianing  
       122 天前   ❤️ 1
    @deavorwei #7 #7 taskChan = [%d]", len(resultChan) 打印错变量了
    deavorwei
        10
    deavorwei  
    OP
       121 天前
    @yianing #9 感谢大佬,我这低级错误过分了,但是我修改正确之后,结果还是一样的😂
    pxllong
        11
    pxllong  
       121 天前
    用 runtime/pprof 。
    yann123
        12
    yann123  
       121 天前
    default:
    wg.Done()
    logs.Error("Failed to write to channel...")


    写入通过失败了你没有减少锁,所以一直卡住了。
    yann123
        13
    yann123  
       121 天前
    wg 计数的时候注意,一定要确保可以执行 wg.Done()操作,否则就卡住了。
    deavorwei
        14
    deavorwei  
    OP
       121 天前
    @yann123 #13 大佬,还是不行 ,我现在是使用的协程直接跑,taskChan 是 1000 ,然后 wg.done 也加了。跑起来就挂 ,如果把 taskChan 改成 100 万就很正常
    matytan
        15
    matytan  
       121 天前
    @deavorwei #7 并不是占用的问题,是用 ants 的时候你的 channel 大小小于了文件数量,导致死锁的,用 go 携程可以正常等待结束了继续往通道里面放。这种问题直接用极端的办法,把 channel 大小设置为 1 ,看会不会死锁。你用 go 原生协程跑,channel 为 1 都 ok 的,只是慢一点
    matytan
        16
    matytan  
       121 天前
    @deavorwei #7 跑不满通道的原因可能是处理比较快,一直都没有满过,只能说明设置为 1000 没必要哈哈哈
    deavorwei
        17
    deavorwei  
    OP
       121 天前
    @matytan #16
    明白了,大佬,感谢感谢!
    deavorwei
        18
    deavorwei  
    OP
       121 天前
    @yann123 #13 大佬,不好意思,可以了,添加了 wg.done 是 ok 的,我运行错程序了 ,感谢指点
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   2509 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 22ms · UTC 15:32 · PVG 23:32 · LAX 07:32 · JFK 10:32
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.