V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
• 请不要在回答技术问题时复制粘贴 AI 生成的内容
dataman
V2EX  ›  程序员

一言不合秀代码 | 怎样写好 Mesos Framework

  •  
  •   dataman · 2017-03-27 20:21:48 +08:00 · 2688 次点击
    这是一个创建于 2845 天前的主题,其中的信息可能已经有所发展或是发生改变。

    “调度”这个词近两年被提到的比较多,资源调度管理应用生命周期等,带来了极大的便利,数人云开源的 Mesos 调度器 Swan ,基于 Mesos 实现应用调度框架,帮助用户轻松发布应用,实现应用的滚动更新,并根据用户指定的策略做应用的健康检测和故障转移。

    授之以鱼不如授之以渔,小数带来工程师的代码级文章,透彻分析如何写 Mesos Framework :

    运行任务

    当我们接受到 Mesos 发来的 offer 以后,可以选择接受或拒绝 offer 。如果要运行任务,那么就 Accept offer, 否则的话应该 decline offer 。代码示例如下:

       
        for _, offer := range offers {
                    cpus, mem, disk := OfferedResources(offer)
                    var tasks []*mesos.TaskInfo
                    for taskLaunched < tasksCount &&
                            cpus >= NeededCpus &&
                            mem >= NeededMem &&`
                            disk >= NeededDisk {
                            task, err := BuildTask(offer) 
                            if err != nil {
                                    return fmt.Errorf("Build task failed: %s", err.Error())`
                            }   
     
             
                            taskInfo := BuildTaskInfo(offer, task)
                            tasks = append(tasks, taskInfo)
     
                             taskLaunched++
                            cpus -= version.Cpus
                            mem -= version.Mem
                            disk -= version.Disk
                     }
                     LaunchTasks(offer, tasks)
          }  
    

    OfferedResources(offer) 用来计算 offer 提供的资源,包括 cpu, memory 和 disk 的大小. BuildTaskInfo 用来构造一个 mesos 所识别的 TaskInfo 结构体, LaunchTasks 用来给 Mesos 发命令以创建 task 。 LauchTasks 的过程实际上是一个接受 Mesos offer 的过程,示例代码如下:

            logrus.Infof("Launch %d tasks with offer %s", len(tasks), *offer.GetId().Value)
            call := &sched.Call{
                    FrameworkId: s.framework.GetId(),
                    Type:        sched.Call_ACCEPT.Enum(),
                    Accept: &sched.Call_Accept{
                            OfferIds: []*mesos.OfferID{
                                    offer.GetId(),
                            },
                            Operations: []*mesos.Offer_Operation{
                                    &mesos.Offer_Operation{
                                            Type: mesos.Offer_Operation_LAUNCH.Enum(),
                                            Launch: &mesos.Offer_Operation_Launch{
                                                    TaskInfos: tasks,
                                            },
                                    },
                            },
                            Filters: &mesos.Filters{RefuseSeconds: proto.Float64(1)},
                    },
            }
     
            return s.send(call)
    

    之后,如果创建 tasks 成功的话,应该就可以在 Mesos ui 上看到刚才创建的 task 了。

    Task 状态更新

    Task 状态是通过 Mesos statusUpdate 事件来更新的。 Mesos task 状态大概分以下几种:

    TaskState_TASK_STARTING TaskState = 0
    TaskState_TASK_RUNNING  TaskState = 1
    TaskState_TASK_KILLING  TaskState = 8
    TaskState_TASK_FINISHED TaskState = 2
    TaskState_TASK_FAILED   TaskState = 3
    TaskState_TASK_KILLED   TaskState = 4
    TaskState_TASK_ERROR    TaskState = 7
    TaskState_TASK_LOST TaskState = 5
    TaskState_TASK_DROPPED TaskState = 9
    TaskState_TASK_UNREACHABLE TaskState = 10 
    TaskState_TASK_GONE TaskState = 11 
    TaskState_TASK_GONE_BY_OPERATOR TaskState = 12 
    TaskState_TASK_UNKNOWN TaskState = 13}}}
    
     
    更新状态的示例代码如下:
     
    
    func (s *Scheduler) status(status *mesos.TaskStatus) {
        state := status.GetState()
        taskId := status.TaskId.GetValue()
        switch state {
            case mesos.TaskState_TASK_STAGING:
                 doSometing()                  
            case mesos.TaskState_TASK_STARTING:
                 doSometing()                 
            case mesos.TaskState_TASK_RUNNING:
                 doSometing()
            case mesos.TaskState_TASK_FINISHED:
                 doSometing()                
            case mesos.TaskState_TASK_FAILED:
                 doSometing()                
            case mesos.TaskState_TASK_KILLED:
                 doSometing()                
            case mesos.TaskState_TASK_LOST:
                 doSometing()
        }
    

    上面只是示例代码,具体的处理细节可以看 https://github.com/Dataman-Cloud/swan

    删除任务

    删除任务是通过给 Mesos 发送 Call_KILL 类型的消息来实现的,消息中指定了需要杀死的 task 的 ID ,具体示例代码如下:

            call := &sched.Call{
                    FrameworkId: s.framework.GetId(),
                    Type:        sched.Call_KILL.Enum(),
                    Kill: &sched.Call_Kill{
                            TaskId: &mesos.TaskID{
                                    Value: proto.String(task.ID),
                            },  
                            AgentId: &mesos.AgentID{
                                    Value: task.AgentId,
                            },  
                    },  
            }   
     
            duration := proto.Int64(task.KillPolicy.Duration * 1000 * 1000)
            if task.KillPolicy != nil {
                    if task.KillPolicy.Duration != 0 {
                            call.Kill.KillPolicy = &mesos.KillPolicy{
                                    GracePeriod: &mesos.DurationInfo{
                                            Nanoseconds: duration,
                                    },  
                            }   
                    }   
            }   
     
            return s.send(call)
    

    其中, Type 类型指定了消息的类型, FrameworkId 指定了当前 Framework 在 Mesos 注册的 ID , task.ID 指定了需要 kill 的 task 的 ID , task.AgentId 指定了需要 kill 的 task 所在的 agentId.

    killPolicy 是一个自定义的优雅终止相关的策略,其中指定了优雅终止的超时时间 duration,也就是 Mesos 先給 task 发一个 SIGTERM 的信号,让 task 有时间去做一些清理工作,如果 task 没有正常终止,在经过一定时间后发送 SIGKILL 去强制杀死 task 。这个时间由 duration 指定, Mesos 默认是 3 秒。

    Mesos 断线重连

    Framework 和 Mesos 之间通过一个长连接进行通信,在某些情况下,连接可能出错,这时候就需要 Framework 重新去连接 Mesos,示例代码如下:

        logrus.Infof("Subscribe with mesos master %s", s.Master)
        call := &sched.Call{
            Type: sched.Call_SUBSCRIBE.Enum(),
            Subscribe: &sched.Call_Subscribe{
                FrameworkInfo: s.Framework,
            },
        }
     
        if s.Framework.Id != nil {
            call.FrameworkId = &mesos.FrameworkID{
                Value: proto.String(s.Framework.Id.GetValue()),
            }
        }
     
        resp, err := s.Send(call)
        if err != nil {
            return err
        }
     
        // http might now be the default transport in future release
        if resp.StatusCode != http.StatusOK {
            return fmt.Errorf("Subscribe with unexpected response status: %d", resp.StatusCode)
        }
     
        go s.handleEvents(resp)
     
        return nil
    }
     
    func (s *Scheduler) resubscribe() {
        for {
            logrus.Warn("Connection to mesos got error, reconnect")
            if err := s.subscribe(); err == nil {
                return
            } else {
                logrus.Errorf("%s", err.Error())
            }
            <-time.After(2 * time.Second)
        }
    }
     
    func (s *Scheuler) handleEvents(resp *http.Response) {
        defer func() {
            resp.Body.Close()
        }()
     
        r := NewReader(resp.Body)
        dec := json.NewDecoder(r)
     
        for {
            event := new(sched.Event)
            if err := dec.Decode(event); err != nil {    //got some error, reconnect.
            go func() {
                s.resubscribe()
            }()
     
            return
            }
     
            switch event.GetType() {
                case sched.Event_SUBSCRIBED:
                    doSomework()
                case sched.Event_OFFERS:
                    doSomework()
                case sched.Event_RESCIND:
                    doSomework()
                case sched.Event_UPDATE:
                    doSomework()
                case sched.Event_MESSAGE:
                    doSomework()
                case sched.Event_FAILURE:
                    doSomework()
                case sched.Event_ERROR:
                    doSomework()
                case sched.Event_HEARTBEAT:
                    doSomework()
                }
            }
        }
    

    函数 resubscribe 用来向 Mesos 重新注册,如果注册失败,隔 2 秒之后会重试,直到连接成功为止。注册成功后会在一个新的 goroutine 里继续原来逻辑的处理。具体可以查看 Mesos 文档(http://mesos.apache.org/documentation/latest/scheduler-http-api)的 Disconnections 小节关于重连的内容。

    五步走的 Mesos Framework 教程就分享到这里了,更多代码请跳转https://github.com/Dataman-Cloud/swan,欢迎 Star&Fork 。

    3 条回复    2017-03-28 13:58:16 +08:00
    ydxred
        1
    ydxred  
       2017-03-28 11:05:17 +08:00
    一言不合就打广告....
    v2dead
        2
    v2dead  
       2017-03-28 12:49:31 +08:00
    缩进有 4 格有 8 格,对不起,你是好人。
    dataman
        3
    dataman  
    OP
       2017-03-28 13:58:16 +08:00
    @v2dead 小编不是很懂代码缩进,可能复制过来的时候有问题,~~~~(>_<)~~~~
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   2913 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 28ms · UTC 14:24 · PVG 22:24 · LAX 06:24 · JFK 09:24
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.