V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
Goodapp
V2EX  ›  推广

Docker 源码分析之容器日志处理与 log-driver 实现

  •  
  •   Goodapp · 2017-01-19 12:07:10 +08:00 · 2262 次点击
    这是一个创建于 2871 天前的主题,其中的信息可能已经有所发展或是发生改变。

    概要

    本文将从 docker(1.12.6)源码的角度分析 docker daemon 怎么将容器的日志收集出来并通过配置的 log-driver 发送出去,并结合示例介绍了好雨云帮中实现的一个 zmq-loger 。阅读本文,你也可以实现适合自己业务场景的 log-driver 。

    阅读准备

    本文适合能够阅读和编写 golang 代码的同学。

    ( 1 )首先你需要认知以下几个关键词:

    • stdout:
      标准输出,进程写数据的流。
    • stderr:
      错误输出,进程写错误数据的流。
    • 子进程:
      由一个进程(父进程)创建的进程,集成父进程大部分属性,同时可以被父进程守护和管理。

    ( 2 )你需要知道关于进程产生日志的形式
    进程产生日志有两类输出方式,一类是写入到文件中。另一类是直接写到 stdout 或者 stderr ,例如 php 的echo python 的print golang 的fmt.Println("")等等。
    ( 3 )是否知道 docker-daemon 与运行中 container 的关系? 一个 container 就是一个特殊的进程,它是由 docker daemon 创建并启动,因此 container 是 docker daemon 的子进程。由 docker daemon 守护和管理。因此 container 的 stdout 能够被 docker daemon 获取到。基于此理论,我们来分析 docker daemon 相关代码。

    docker-daemon 关于日志源码分析

    container 实例源码

    # /container/container.go:62
    type CommonContainer struct{
        StreamConfig *stream.Config
        ...
    }
    # /container/stream/streams.go:26
    type Config struct {
    	sync.WaitGroup
    	stdout    *broadcaster.Unbuffered
    	stderr    *broadcaster.Unbuffered
    	stdin     io.ReadCloser
    	stdinPipe io.WriteCloser
    }
    

    找到如上所示对应的代码,显示了每一个 container 实例都有几个属性 stdout,stderr,stdin,以及管道 stdinPipe 。这里说下 stdinPipe,当容器使用-i 参数启动时标准输入将被运行, daemon 将能够使用此管道向容器内写入标准输入。

    ![2017011930658image2017-1-18 17-18-38.png]( http://7xqmjb.com1.z0.glb.clouddn.com/2017011930658image2017-1-18 17-18-38.png)

    我们试想以上图例,如果是你,你怎么实现日志收集转发?

    # /container/container.go:312
    func (container *Container) StartLogger(cfg containertypes.LogConfig) (logger.Logger, error) {
    	c, err := logger.GetLogDriver(cfg.Type)
    	if err != nil {
    		return nil, fmt.Errorf("Failed to get logging factory: %v", err)
    	}
    	ctx := logger.Context{
    		Config:              cfg.Config,
    		ContainerID:         container.ID,
    		ContainerName:       container.Name,
    		ContainerEntrypoint: container.Path,
    		ContainerArgs:       container.Args,
    		ContainerImageID:    container.ImageID.String(),
    		ContainerImageName:  container.Config.Image,
    		ContainerCreated:    container.Created,
    		ContainerEnv:        container.Config.Env,
    		ContainerLabels:     container.Config.Labels,
    		DaemonName:          "docker",
    	}
    
    	// Set logging file for "json-logger"
    	if cfg.Type == jsonfilelog.Name {
    		ctx.LogPath, err = container.GetRootResourcePath(fmt.Sprintf("%s-json.log", container.ID))
    		if err != nil {
    			return nil, err
    		}
    	}
    	return c(ctx)
    }
    #/container/container.go:978
    func (container *Container) startLogging() error {
    	if container.HostConfig.LogConfig.Type == "none" {
    		return nil // do not start logging routines
    	}
    
    	l, err := container.StartLogger(container.HostConfig.LogConfig)
    	if err != nil {
    		return fmt.Errorf("Failed to initialize logging driver: %v", err)
    	}
    
    	copier := logger.NewCopier(map[string]io.Reader{"stdout": container.StdoutPipe(), "stderr": container.StderrPipe()}, l)
    	container.LogCopier = copier
    	copier.Run()
    	container.LogDriver = l
    
    	// set LogPath field only for json-file logdriver
    	if jl, ok := l.(*jsonfilelog.JSONFileLogger); ok {
    		container.LogPath = jl.LogPath()
    	}
    
    	return nil
    }
    

    第一个方法是为 container 查找 log-driver 。首先根据容器配置的 log-driver 类别调用:logger.GetLogDriver(cfg.Type)返回一个方法类型:

    /daemon/logger/factory.go:9
    type Creator func(Context) (Logger, error)
    

    实质就是从工厂类注册的 logdriver 插件去查找,具体源码下文分析。获取到 c 方法后构建调用参数具体就是容器的一些信息。然后使用调用 c 方法返回 driver 。 driver 是个接口类型,我们看看有哪些方法:

    # /daemon/logger/logger.go:61
    type Logger interface {
    	Log(*Message) error
    	Name() string
    	Close() error
    }
    

    很简单的三个方法,也很容易理解,Log()发送日志消息到 driver,Close()进行关闭操作(根据不同实现)。 也就是说我们自己实现一个 logdriver ,只需要实现如上三个方法,然后注册到 logger 工厂类中即可。下面我们来看/daemon/logger/factory.go

    第二个方法就是处理日志了,获取到日志 driver,在创建一个Copier,顾名思义就是复制日志,分别从 stdout 和 stderr 复制到 logger driver 。下面看看具体关键实现:

    #/daemon/logger/copir.go:41
    func (c *Copier) copySrc(name string, src io.Reader) {
    	defer c.copyJobs.Done()
    	reader := bufio.NewReader(src)
    
    	for {
    		select {
    		case <-c.closed:
    			return
    		default:
    			line, err := reader.ReadBytes('\n')
    			line = bytes.TrimSuffix(line, []byte{'\n'})
    
    			// ReadBytes can return full or partial output even when it failed.
    			// e.g. it can return a full entry and EOF.
    			if err == nil || len(line) > 0 {
    				if logErr := c.dst.Log(&Message{Line: line, Source: name, Timestamp: time.Now().UTC()}); logErr != nil {
    					logrus.Errorf("Failed to log msg %q for logger %s: %s", line, c.dst.Name(), logErr)
    				}
    			}
    
    			if err != nil {
    				if err != io.EOF {
    					logrus.Errorf("Error scanning log stream: %s", err)
    				}
    				return
    			}
    		}
    	}
    }
    

    每读取一行数据,构建一个消息,调用 logdriver 的 log 方法发送到 driver 处理。

    日志 driver 注册器

    位于/daemon/logger/factory.go的源码实现即时日志 driver 的注册器,其中几个重要的方法(上文已经提到一个):

    # /daemon/logger/factory.go:21
    func (lf *logdriverFactory) register(name string, c Creator) error {
    	if lf.driverRegistered(name) {
    		return fmt.Errorf("logger: log driver named '%s' is already registered", name)
    	}
    
    	lf.m.Lock()
    	lf.registry[name] = c
    	lf.m.Unlock()
    	return nil
    }
    # /daemon/logger/factory.go:39
    func (lf *logdriverFactory) registerLogOptValidator(name string, l LogOptValidator) error {
    	lf.m.Lock()
    	defer lf.m.Unlock()
    
    	if _, ok := lf.optValidator[name]; ok {
    		return fmt.Errorf("logger: log validator named '%s' is already registered", name)
    	}
    	lf.optValidator[name] = l
    	return nil
    }
    

    看起来很简单,就是将一个Creator方法类型添加到一个 map 结构中,将LogOptValidator添加到另一个 map 这里注意加锁的操作。

    #/daemon/logger/factory.go:13
    type LogOptValidator func(cfg map[string]string) error
    

    这个主要是验证 driver 的参数 ,dockerd 和 docker 启动参数中有:--log-opt

    好雨云帮自己实现一个基于 zmq 的 log-driver

    上文已经完整分析了 docker daemon 管理 logdriver 和处理日志的整个流程。相信你已经比较明白了。下面我们以 zmq-driver 为例讲讲我们怎么实现自己的 driver 。直接接收容器的日志。
    上文我们已经谈了一个 log-driver 需要实现的几个方法。 我们可以看看位于/daemon/logger目录下的已有的 driver 的实现,例如fluentd,awslogs等。 下面我们来分析 zmq-driver 具体的代码:

    //定义一个 struct,这里包含一个 zmq 套接字
    type ZmqLogger struct {
    	writer      *zmq.Socket
    	containerId string
    	tenantId    string
    	serviceId   string
    	felock      sync.Mutex
    }
    //定义 init 方法调用 logger 注册器的方法注册当前 driver
    //和参数验证方法。
    func init() {
    	if err := logger.RegisterLogDriver(name, New); err != nil {
    		logrus.Fatal(err)
    	}
    	if err := logger.RegisterLogOptValidator(name, ValidateLogOpt); err != nil {
    		logrus.Fatal(err)
    	}
    }
    //实现一个上文提到的 Creator 方法注册 logdriver.
    //这里新建一个 zmq 套接字构建一个实例
    func New(ctx logger.Context) (logger.Logger, error) {
    	zmqaddress := ctx.Config[zmqAddress]
    
    	puber, err := zmq.NewSocket(zmq.PUB)
    	if err != nil {
    		return nil, err
    	}
    	var (
    		env       = make(map[string]string)
    		tenantId  string
    		serviceId string
    	)
    	for _, pair := range ctx.ContainerEnv {
    		p := strings.SplitN(pair, "=", 2)
    		//logrus.Errorf("ContainerEnv pair: %s", pair)
    		if len(p) == 2 {
    			key := p[0]
    			value := p[1]
    			env[key] = value
    		}
    	}
    	tenantId = env["TENANT_ID"]
    	serviceId = env["SERVICE_ID"]
    
    	if tenantId == "" {
    		tenantId = "default"
    	}
    
    	if serviceId == "" {
    		serviceId = "default"
    	}
    
    	puber.Connect(zmqaddress)
    
    	return &ZmqLogger{
    		writer:      puber,
    		containerId: ctx.ID(),
    		tenantId:    tenantId,
    		serviceId:   serviceId,
    		felock:      sync.Mutex{},
    	}, nil
    }
    //实现 Log 方法,这里使用 zmq socket 发送日志消息
    //这里必须注意, zmq socket 是线程不安全的,我们知道
    //本方法可能被两个线程(复制 stdout 和肤质 stderr)调用 //必须使用锁保证线程安全。否则会发生错误。
    func (s *ZmqLogger) Log(msg *logger.Message) error {
    	s.felock.Lock()
    	defer s.felock.Unlock()
    	s.writer.Send(s.tenantId, zmq.SNDMORE)
    	s.writer.Send(s.serviceId, zmq.SNDMORE)
    	if msg.Source == "stderr" {
    		s.writer.Send(s.containerId+": "+string(msg.Line), zmq.DONTWAIT)
    	} else {
    		s.writer.Send(s.containerId+": "+string(msg.Line), zmq.DONTWAIT)
    	}
    	return nil
    }
    //实现 Close 方法,这里用来关闭 zmq socket 。
    //同样注意线程安全,调用此方法的是容器关闭协程。
    func (s *ZmqLogger) Close() error {
    	s.felock.Lock()
    	defer s.felock.Unlock()
    	if s.writer != nil {
    		return s.writer.Close()
    	}
    	return nil
    }
    
    func (s *ZmqLogger) Name() string {
    	return name
    }
    //验证参数的方法,我们使用参数传入 zmq pub 的地址。
    func ValidateLogOpt(cfg map[string]string) error {
    	for key := range cfg {
    		switch key {
    		case zmqAddress:
    		default:
    			return fmt.Errorf("unknown log opt '%s' for %s log driver", key, name)
    		}
    	}
    	if cfg[zmqAddress] == "" {
    		return fmt.Errorf("must specify a value for log opt '%s'", zmqAddress)
    	}
    	return nil
    }
    

    总结

    多研究源码可以方便我们理解 docker 的工作原理。今天我们分析了日志部分。希望读者对这部分功能能够理解得更清晰。

    云盟认证成员: barnett

    目前尚无回复
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   2787 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 20ms · UTC 02:29 · PVG 10:29 · LAX 18:29 · JFK 21:29
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.