首页   注册   登录
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
The Go Programming Language
http://golang.org/
Go Playground
Go Projects
Revel Web Framework
beego
宝塔
V2EX  ›  Go

golang master-worker 模式,有不懂的地方,求解答

  •  
  •   Bruin · 99 天前 · 819 次点击
    这是一个创建于 99 天前的主题,其中的信息可能已经有所发展或是发生改变。
    package main
    
    import (
    	"fmt"
    	"math/rand"
    	"strconv"
    	"time"
    )
    
    type Payload struct {
    	name string
    }
    
    //任务
    type Job struct {
    	Payload Payload
    }
    
    //任务队列
    var JobQueue chan Job
    
    //  执行者 消费者 工人
    type Worker struct {
    	WorkerPool chan chan Job //对象池
    	JobChannel chan Job      //通道里面拿
    	quit       chan bool     //
    	name       string        //工人的名字
    }
    
    // 调度器
    type Dispatcher struct {
    	name       string        //调度的名字
    	maxWorkers int           //获取 调试的大小
    	WorkerPool chan chan Job //注册和工人一样的通道
    }
    
    //打游戏
    func (p *Payload) Play() {
    	fmt.Printf("%s 打 LOL 游戏...当前任务完成\n", p.name)
    }
    
    // 新建一个工人
    func NewWorker(workerPool chan chan Job, name string) Worker {
    	fmt.Printf("创建了一个工人,它的名字是:%s \n", name)
    	//workerPool 确定 woker 的容量
    	return Worker{
    		name:       name,          
    		WorkerPool: workerPool,     
    		JobChannel: make(chan Job),
    		quit:       make(chan bool),
    	}
    }
    
    // 工人开始工作
    
    func (w *Worker) Start() {
    	//开一个新的协程
    	go func() {
    		for {
    			//注册到对象池中,
    			w.WorkerPool <- w.JobChannel
    			fmt.Printf("[%s]把自己注册到 对象池中 \n", w.name)
    			select {
    			//接收到了新的任务
    			case job := <-w.JobChannel:
    				fmt.Printf("[%s] 工人接收到了任务 当前任务的长度是[%d]\n", w.name, len(w.WorkerPool))
    				job.Payload.Play()
    				time.Sleep(time.Duration(rand.Int31n(1000)) * time.Millisecond)
    			//接收到了任务
    			case <-w.quit:
    				fmt.Println("结束任务", w.name)
    				return
    			}
    		}
    	}()
    }
    
    func (w Worker) Stop() {
    	go func() {
    		w.quit <- true
    	}()
    }
    
    func NewDispatcher(maxWorkers int) *Dispatcher {
    	//容量为{maxWorkers}的 channel
    	pool := make(chan chan Job, maxWorkers)
    	return &Dispatcher{
    		WorkerPool: pool,       // 将工人放到一个池中,可以理解成一个部门中
    		name:       "调度者",      //调度者的名字
    		maxWorkers: maxWorkers, //这个调度者有好多个工人
    	}
    }
    
    func (d *Dispatcher) Run() {
    	// 开始运行
    	for i := 0; i < d.maxWorkers; i++ {
    		worker := NewWorker(d.WorkerPool, fmt.Sprintf("work-%s", strconv.Itoa(i)))
    		//开始工作
    		worker.Start()
    	}
    	//监控
    	go d.dispatch()
    
    }
    
    func (d *Dispatcher) dispatch() {
    	for {
    		select {
    		case job := <-JobQueue:
    			fmt.Println("调度者,接收到一个工作任务")
    			time.Sleep(time.Duration(rand.Int31n(1000)) * time.Millisecond)
    			// 调度者接收到一个工作任务
    			go func(job Job) {
    				//从现有的对象池中拿出一个
    				jobChannel := <-d.WorkerPool
    				fmt.Println(jobChannel)
    				fmt.Println(job)
    				jobChannel <- job
    			}(job)
    		default:
    
    			//fmt.Println("ok!!")
    		}
    
    	}
    }
    
    func initialize() {
    	maxWorkers := 1
    	maxQueue := 20
    	//初始化一个调试者,并指定它可以操作的 工人个数
    	dispatch := NewDispatcher(maxWorkers)
    	JobQueue = make(chan Job, maxQueue) //指定任务的队列长度
    	//并让它一直接运行
    	dispatch.Run()
    }
    
    func main() {
    	//初始化对象池
    	initialize()
    	for i := 0; i < 1; i++ {
    		p := Payload{
    			fmt.Sprintf("玩家-[%s]", strconv.Itoa(i)),
    		}
    		JobQueue <- Job{
    			Payload: p,
    		}
    		time.Sleep(time.Second)
    	}
    	close(JobQueue)
    }
    
    

    不懂的地方如下:

    // w.WorkerPool <- w.JobChannel 这个流向不了解, 为什么下面的 select job := <-w.JobChannel: 还可以获得 channel 数据
    
    func (w *Worker) Start() {
    	go func() {
    		for {
    			w.WorkerPool <- w.JobChannel
    			fmt.Printf("[%s]把自己注册到 对象池中 \n", w.name)
    			select {
    			//接收到了新的任务
    			case job := <-w.JobChannel:
    				fmt.Printf("[%s] 工人接收到了任务 当前任务的长度是[%d]\n", w.name, len(w.WorkerPool))
    				job.Payload.Play()
    				time.Sleep(time.Duration(rand.Int31n(1000)) * time.Millisecond)
    			//接收到了任务
    			case <-w.quit:
    				fmt.Println("结束任务", w.name)
    				return
    			}
    		}
    	}()
    }
    
    // dispatch 
    go func(job Job) {
        //从现有的对象池中拿出一个
        jobChannel := <-d.WorkerPool
        fmt.Println(jobChannel)
        fmt.Println(job)
        jobChannel <- job    //job 留向 jobChannel ??? 不懂
      }(job)
    
    

    大佬们帮忙帮忙!

    4 回复  |  直到 2019-08-09 14:55:09 +08:00
        1
    Tomotoes   99 天前 via Android   ♥ 1
    你仔细看下 WorkerPool,JobChannel 的 类型 , 仔细看。
        2
    Tomotoes   99 天前 via Android
        3
    Bruin   99 天前
    @Tomotoes WorkerPool 通道的通道,JobChannel 通道,
    ```
    w.WorkerPool <- w.JobChannel 从 JobChannel 流向了 WorkerPool,

    初始化的时候 JobChannel 是 nil 怎么流向 WorkerPool, 注册到对象池的呢?

    ```
        4
    Bruin   99 天前
    @Tomotoes 我好像明白了
    jobChannel := <-d.WorkerPool
    jobChannel <- job //其实是 w.JobChannel <- job

    所以
    select {
    //会接受到新流入的 channel
    case job := <-w.JobChannel:

    感谢感谢!
    关于   ·   FAQ   ·   API   ·   我们的愿景   ·   广告投放   ·   感谢   ·   实用小工具   ·   2097 人在线   最高记录 5043   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.3 · 22ms · UTC 02:46 · PVG 10:46 · LAX 18:46 · JFK 21:46
    ♥ Do have faith in what you're doing.