第一种想法如上图: main goroutine 遍历这个 chan, 将所有数据读出. 但是问题是. 左边那三个 goroutine 如果结束了. main goroutine 如何结束? 岂不是阻塞了? 也没法关闭, 如果左边的其中一个关闭了. 那另外两个的数据就读不出来了.
第二种思路是创建三个 chan
这个直接 close 就行了.
我的问题是: 第一种想法中的问题如何解决?
是不是实践中第二种想法比较符合套路?
确实是新手. 大佬勿喷.
1
bruce0 2022-08-17 18:27:43 +08:00
第一个用 sync.waitGroup 应该能解决吧
|
2
ch2 2022-08-17 18:28:52 +08:00
waitgroup
|
3
chaleaochexist OP |
4
qq1009479218 2022-08-17 18:31:48 +08:00 1
也可以再起一个 chan, 在想结束的时候通知 main 该结束了
|
5
chaleaochexist OP @qq1009479218 明白了. 用 chan 或者 context
在主线程中用 select 检测那三个 goroutine. 好像可行. 不知道算不算最佳实践. |
6
ch2 2022-08-17 18:39:11 +08:00
@chaleaochexist #3 我记得有第三方写的 infinite chan ,无限大缓冲区的 chan
|
7
lxdlam 2022-08-17 18:41:43 +08:00 4
Reading Material: https://go.dev/blog/pipelines
拆成两个问题: 1. 多个 goroutine 如何读取消息 - 使用 fan-in 和 fan-out pattern ,将其结果汇总到一个 channel 里,此时原始 goroutine 关闭 channel 不影响; - 直接 select 多个 channel 。 2. 当某个 gorutine 退出时如何通知其他的 goroutine 退出: a. (可选)如果需要等待其他 goroutine 退出的话,使用 sync.WaitGroup 等待; b. 使用一个 exitChannel ( chan struct{} 就行),接收到退出信号的时候直接由 main close ,其他 goroutine 使用 `for { select { case <- exitChannel: return default: logic} }` 的形式来正确接受退出信号 |
9
qq1009479218 2022-08-17 18:46:51 +08:00 1
@chaleaochexist 三个 goroutinue 在想结束的时候发一个消息到用来结束的 chan 里,在 main 里面 select 监听,在监听到三次之后,说明三个 groutinue 全部执行完了,return main 就好了
这种方法,是其他协程通知主协程自己结束了,主协程收到这个通知,再决定下一步怎样做 而 context 其实是 main 协程管理其他协程的,就是 main 想让其他协程结束时调用 cancel ,其他协程通过监听 ctx.Done(),就可以 return 了 复杂并发应用中 goutinue 之间的关系,其实是树状的,你想在一个树的节点,结束这个数下面的所有的子 goroutinue 时,就用 context ,在子 goutinue 中传递值也可以 |
10
ilylx2008 2022-08-17 18:52:36 +08:00
你们真强,我都没看明白楼主在说啥。
|
11
jitongxi 2022-08-17 18:54:29 +08:00
一个 tcp 连接一个 goroutine , 结束, 不管客户端还是服务端都是。
加个 channel 就是脱裤子放屁 |
12
rrfeng 2022-08-17 20:10:59 +08:00
考虑下 main goroutine 为啥要结束??
|
13
chaleaochexist OP @rrfeng 也不是一定要结束, 而是继续往下走.
|
14
nmap 2022-08-17 20:45:58 +08:00
第二种吧,逻辑清晰,实现简单
|
15
fds 2022-08-17 21:09:39 +08:00
|
16
nuk 2022-08-17 21:24:49 +08:00
往 channel 里写个结束的标记就行了吧,要不然就加个生存期管理
|
17
haoliang 2022-08-17 21:33:14 +08:00
第一种消耗比较小啊,可以考虑增加规定个独特的终止信息在生产端退出时发出,消费端识别处理下(比较类似于 waitgroup ,消费端处理终止信息时也可以用 atomic 计数)
|
18
joesonw 2022-08-17 22:52:55 +08:00 via iPhone
用 waitgroup 为什么会死锁?只是 routine 里因为 channel 满了,阻塞住,要等 channel 用空位了,才会塞入,然后继续运作。
|
19
chaleaochexist OP @joesonw 三个生产者堵死. 因为 chan 满了.
消费者堵死, 因为一直在 wait. |
20
joesonw 2022-08-18 09:19:02 +08:00 via iPhone
@chaleaochexist 都满了消费者怎么堵死,直接消费啊
|
21
wisej 2022-08-18 09:27:19 +08:00
@chaleaochexist 你对 waitgroup 的使用有误解,add 和 wait 都是生产者侧调用的。跟 consumer 没关系,consumer 只需要循环读取 chan 消费。
另外,如果 producer 并发很高,建议多个 chan ;毕竟 chan 底层依赖 mutex ,main 里再 for select 消费 |
22
index90 2022-08-18 09:35:32 +08:00 1
最简单就是加一个 waitgroup ,再起一个 goroutine 去 wait ,wait 到了就 close channel ,main routine 用 for range 去读。channel 。
|
23
index90 2022-08-18 09:37:25 +08:00
套路都是 main routine 去 range channel ,剩下的问题就是如何 close channel ,这个建议你搜索“如何优雅关闭 channel”,学习 channel 使用的几个套路。
|
24
ns09005264 2022-08-18 09:37:43 +08:00 1
```
ch := make(chan any) var wg sync.WaitGroup for i := 0; i < 10; i++ { wg.Add(1) go func(done func()) { defer done() time.Sleep(time.Second * 3) ch <- rand.Int() }(wg.Done) } go func() { wg.Wait() close(ch) }() for v := range ch { log.Printf("v:%v", v) } log.Printf("这里等待执行") ``` |
25
ns09005264 2022-08-18 09:42:32 +08:00 1
|
26
joesonw 2022-08-18 09:58:54 +08:00 via iPhone
@ns09005264 Done 应该要放在消费者里,不然假如消费者处理时间比较长的,会漏掉最后一条。
|
27
joesonw 2022-08-18 10:00:26 +08:00 via iPhone
@ns09005264 忽略我上面说的,没看到是 unbuffered
|
28
BingoXuan 2022-08-18 10:51:02 +08:00
这个方案我研究过,头疼得很。最后选择手写 CAS 得 ringbuffer 的方案。
|
29
loveuer 2022-08-18 10:55:53 +08:00
第一种写法按照官方的说法就是不要过分关注 chan 的关闭了, 毕竟没有数据了, chan 的占用很小, 程序如果结束自然也就交还操作系统了
|
30
seth19960929 2022-08-18 11:18:26 +08:00 2
close chan 两个原则
1. 不要在接收端关闭, 也就是你代码里的 main goroutine 2. 有多个同时写, 不要在写的地方关闭, 也就是你的代码中 goroutine1,2,3 所以最好的做法, 就是楼上给的那个代码, 先 waitGroup 够三个之后, 直接在 main 关闭就行了 |
31
chaleaochexist OP |
32
seth19960929 2022-08-18 11:24:42 +08:00
package main
import ( "fmt" "math/rand" "sync" "time" ) func main() { // init var ch := make(chan int) wg := &sync.WaitGroup{} // goroutine1, 2, 3 for i := 0; i < 3; i++ { wg.Add(1) go task(wg, ch) } // read chan data go func() { for val := range ch { fmt.Println(val) } }() // wait group wg.Wait() close(ch) // close fast, can`t read all chan data time.Sleep(time.Second) } func task(wg *sync.WaitGroup, ch chan int) { defer wg.Done() ts := rand.Intn(3) + 1 time.Sleep(time.Second * time.Duration(ts)) ch <- ts } |
33
lessMonologue 2022-08-18 11:35:44 +08:00
channel 为什么要关闭?
|
34
index90 2022-08-18 20:41:43 +08:00
这里可以延伸一个问题,你上述例子用了 4 个 goroutine ,如果用 waitgroup ,就要用 5 个 goroutine ,问能否用 3 个 goroutine 解决问题?
|
35
paceewang1 2022-08-31 19:24:19 +08:00
1 、chan 关闭了里面的数据可以继续读,只是不可以写
2 、多个 goroutine 关闭用 context |