V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
rapospectre
V2EX  ›  Python

深入理解 tornado 之 底层 ioloop 实现(二)

  •  
  •   rapospectre ·
    bluedazzle · 2016-06-06 21:14:50 +08:00 · 3450 次点击
    这是一个创建于 3121 天前的主题,其中的信息可能已经有所发展或是发生改变。
    def configurable_default(cls):
            """Returns the implementation class to be used if none is configured."""
            raise NotImplementedError()
    

    显然也是个接口,那么我们再回头看 ioloop 的 configurable_default():

    def configurable_default(cls):
            if hasattr(select, "epoll"):
                from tornado.platform.epoll import EPollIOLoop
                return EPollIOLoop
            if hasattr(select, "kqueue"):
                # Python 2.6+ on BSD or Mac
                from tornado.platform.kqueue import KQueueIOLoop
                return KQueueIOLoop
            from tornado.platform.select import SelectIOLoop
            return SelectIOLoop
    

    原来这是个工厂函数,根据不同的操作系统返回不同的事件池( linux 就是 epoll , mac 返回 kqueue ,其他就返回普通的 select 。 kqueue 基本等同于 epoll , 只是不同系统对其的不同实现)

    现在线索转移到了 tornado.platform.epoll.EPollIOLoop 上,我们再来看看 EPollIOLoop:

    tornado.platform.epoll.EPollIOLoop

    import select
    
    from tornado.ioloop import PollIOLoop
    
    
    class EPollIOLoop(PollIOLoop):
        def initialize(self, **kwargs):
            super(EPollIOLoop, self).initialize(impl=select.epoll(), **kwargs)
    

    EPollIOLoop 完全继承自 PollIOLoop注意这里是 PollIOLoop 不是 IOLoop)并只是在初始化时指定了 impl 是 epoll ,所以看起来我们用 IOLoop 初始化最后初始化的其实就是这个 PollIOLoop,所以接下来,我们真正需要理解和阅读的内容应该都在这里:

    tornado.ioloop.PollIOLoop

    class PollIOLoop(IOLoop):
        """Base class for IOLoops built around a select-like function.
    
        For concrete implementations, see `tornado.platform.epoll.EPollIOLoop`
        (Linux), `tornado.platform.kqueue.KQueueIOLoop` (BSD and Mac), or
        `tornado.platform.select.SelectIOLoop` (all platforms).
        """
        def initialize(self, impl, time_func=None, **kwargs):
            super(PollIOLoop, self).initialize(**kwargs)
            self._impl = impl
            if hasattr(self._impl, 'fileno'):
                set_close_exec(self._impl.fileno())
            self.time_func = time_func or time.time
            self._handlers = {}
            self._events = {}
            self._callbacks = []
            self._callback_lock = threading.Lock()
            self._timeouts = []
            self._cancellations = 0
            self._running = False
            self._stopped = False
            self._closing = False
            self._thread_ident = None
            self._blocking_signal_threshold = None
            self._timeout_counter = itertools.count()
    
            # Create a pipe that we send bogus data to when we want to wake
            # the I/O loop when it is idle
            self._waker = Waker()
            self.add_handler(self._waker.fileno(),
                             lambda fd, events: self._waker.consume(),
                             self.READ)
    
        def close(self, all_fds=False):
            with self._callback_lock:
                self._closing = True
            self.remove_handler(self._waker.fileno())
            if all_fds:
                for fd, handler in self._handlers.values():
                    self.close_fd(fd)
            self._waker.close()
            self._impl.close()
            self._callbacks = None
            self._timeouts = None
    
        def add_handler(self, fd, handler, events):
            fd, obj = self.split_fd(fd)
            self._handlers[fd] = (obj, stack_context.wrap(handler))
            self._impl.register(fd, events | self.ERROR)
    
        def update_handler(self, fd, events):
            fd, obj = self.split_fd(fd)
            self._impl.modify(fd, events | self.ERROR)
    
        def remove_handler(self, fd):
            fd, obj = self.split_fd(fd)
            self._handlers.pop(fd, None)
            self._events.pop(fd, None)
            try:
                self._impl.unregister(fd)
            except Exception:
                gen_log.debug("Error deleting fd from IOLoop", exc_info=True)
    
        def set_blocking_signal_threshold(self, seconds, action):
            if not hasattr(signal, "setitimer"):
                gen_log.error("set_blocking_signal_threshold requires a signal module "
                              "with the setitimer method")
                return
            self._blocking_signal_threshold = seconds
            if seconds is not None:
                signal.signal(signal.SIGALRM,
                              action if action is not None else signal.SIG_DFL)
    
        def start(self):
            ...
    
            try:
                while True:
                    # Prevent IO event starvation by delaying new callbacks
                    # to the next iteration of the event loop.
                    with self._callback_lock:
                        callbacks = self._callbacks
                        self._callbacks = []
    
                    # Add any timeouts that have come due to the callback list.
                    # Do not run anything until we have determined which ones
                    # are ready, so timeouts that call add_timeout cannot
                    # schedule anything in this iteration.
                    due_timeouts = []
                    if self._timeouts:
                        now = self.time()
                        while self._timeouts:
                            if self._timeouts[0].callback is None:
                                # The timeout was cancelled.  Note that the
                                # cancellation check is repeated below for timeouts
                                # that are cancelled by another timeout or callback.
                                heapq.heappop(self._timeouts)
                                self._cancellations -= 1
                            elif self._timeouts[0].deadline <= now:
                                due_timeouts.append(heapq.heappop(self._timeouts))
                            else:
                                break
                        if (self._cancellations > 512
                                and self._cancellations > (len(self._timeouts) >> 1)):
                            # Clean up the timeout queue when it gets large and it's
                            # more than half cancellations.
                            self._cancellations = 0
                            self._timeouts = [x for x in self._timeouts
                                              if x.callback is not None]
                            heapq.heapify(self._timeouts)
    
                    for callback in callbacks:
                        self._run_callback(callback)
                    for timeout in due_timeouts:
                        if timeout.callback is not None:
                            self._run_callback(timeout.callback)
                    # Closures may be holding on to a lot of memory, so allow
                    # them to be freed before we go into our poll wait.
                    callbacks = callback = due_timeouts = timeout = None
    
                    if self._callbacks:
                        # If any callbacks or timeouts called add_callback,
                        # we don't want to wait in poll() before we run them.
                        poll_timeout = 0.0
                    elif self._timeouts:
                        # If there are any timeouts, schedule the first one.
                        # Use self.time() instead of 'now' to account for time
                        # spent running callbacks.
                        poll_timeout = self._timeouts[0].deadline - self.time()
                        poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT))
                    else:
                        # No timeouts and no callbacks, so use the default.
                        poll_timeout = _POLL_TIMEOUT
    
                    if not self._running:
                        break
    
                    if self._blocking_signal_threshold is not None:
                        # clear alarm so it doesn't fire while poll is waiting for
                        # events.
                        signal.setitimer(signal.ITIMER_REAL, 0, 0)
    
                    try:
                        event_pairs = self._impl.poll(poll_timeout)
                    except Exception as e:
                        # Depending on python version and IOLoop implementation,
                        # different exception types may be thrown and there are
                        # two ways EINTR might be signaled:
                        # * e.errno == errno.EINTR
                        # * e.args is like (errno.EINTR, 'Interrupted system call')
                        if errno_from_exception(e) == errno.EINTR:
                            continue
                        else:
                            raise
    
                    if self._blocking_signal_threshold is not None:
                        signal.setitimer(signal.ITIMER_REAL,
                                         self._blocking_signal_threshold, 0)
    
                    # Pop one fd at a time from the set of pending fds and run
                    # its handler. Since that handler may perform actions on
                    # other file descriptors, there may be reentrant calls to
                    # this IOLoop that update self._events
                    self._events.update(event_pairs)
                    while self._events:
                        fd, events = self._events.popitem()
                        try:
                            fd_obj, handler_func = self._handlers[fd]
                            handler_func(fd_obj, events)
                        except (OSError, IOError) as e:
                            if errno_from_exception(e) == errno.EPIPE:
                                # Happens when the client closes the connection
                                pass
                            else:
                                self.handle_callback_exception(self._handlers.get(fd))
                        except Exception:
                            self.handle_callback_exception(self._handlers.get(fd))
                    fd_obj = handler_func = None
    
            finally:
                # reset the stopped flag so another start/stop pair can be issued
                self._stopped = False
                if self._blocking_signal_threshold is not None:
                    signal.setitimer(signal.ITIMER_REAL, 0, 0)
                IOLoop._current.instance = old_current
                if old_wakeup_fd is not None:
                    signal.set_wakeup_fd(old_wakeup_fd)
    
        def stop(self):
            self._running = False
            self._stopped = True
            self._waker.wake()
    
        def time(self):
            return self.time_func()
    
        def call_at(self, deadline, callback, *args, **kwargs):
            timeout = _Timeout(
                deadline,
                functools.partial(stack_context.wrap(callback), *args, **kwargs),
                self)
            heapq.heappush(self._timeouts, timeout)
            return timeout
    
        def remove_timeout(self, timeout):
            # Removing from a heap is complicated, so just leave the defunct
            # timeout object in the queue (see discussion in
            # http://docs.python.org/library/heapq.html).
            # If this turns out to be a problem, we could add a garbage
            # collection pass whenever there are too many dead timeouts.
            timeout.callback = None
            self._cancellations += 1
    
        def add_callback(self, callback, *args, **kwargs):
            with self._callback_lock:
                if self._closing:
                    raise RuntimeError("IOLoop is closing")
                list_empty = not self._callbacks
                self._callbacks.append(functools.partial(
                    stack_context.wrap(callback), *args, **kwargs))
                if list_empty and thread.get_ident() != self._thread_ident:
                    # If we're in the IOLoop's thread, we know it's not currently
                    # polling.  If we're not, and we added the first callback to an
                    # empty list, we may need to wake it up (it may wake up on its
                    # own, but an occasional extra wake is harmless).  Waking
                    # up a polling IOLoop is relatively expensive, so we try to
                    # avoid it when we can.
                    self._waker.wake()
    
        def add_callback_from_signal(self, callback, *args, **kwargs):
            with stack_context.NullContext():
                if thread.get_ident() != self._thread_ident:
                    # if the signal is handled on another thread, we can add
                    # it normally (modulo the NullContext)
                    self.add_callback(callback, *args, **kwargs)
                else:
                    # If we're on the IOLoop's thread, we cannot use
                    # the regular add_callback because it may deadlock on
                    # _callback_lock.  Blindly insert into self._callbacks.
                    # This is safe because the GIL makes list.append atomic.
                    # One subtlety is that if the signal interrupted the
                    # _callback_lock block in IOLoop.start, we may modify
                    # either the old or new version of self._callbacks,
                    # but either way will work.
                    self._callbacks.append(functools.partial(
                        stack_context.wrap(callback), *args, **kwargs))
    

    果然, PollIOLoop 继承自 IOLoop 并实现了它的所有接口,现在我们终于可以进入真正的正题了:joy:

    ioloop 分析

    首先要看的是关于 epoll 操作的方法,还记得前文说过的 epoll 只需要四个 api 就能完全操作嘛? 我们来看 PollIOLoop 的实现:

    epoll 操作

    def add_handler(self, fd, handler, events):
    	fd, obj = self.split_fd(fd)
    	self._handlers[fd] = (obj, stack_context.wrap(handler))
    	self._impl.register(fd, events | self.ERROR)
    
    def update_handler(self, fd, events):
    	fd, obj = self.split_fd(fd)
    	self._impl.modify(fd, events | self.ERROR)
    
    def remove_handler(self, fd):
    	fd, obj = self.split_fd(fd)
    	self._handlers.pop(fd, None)
    	self._events.pop(fd, None)
    	try:
    		self._impl.unregister(fd)
    		except Exception:
    			gen_log.debug("Error deleting fd from IOLoop", exc_info=True)
    

    epoll_ctl:这个三个方法分别对应 epoll_ctl 中的 add 、 modify 、 del 参数。 所以这三个方法实现了 epoll 的 epoll_ctl 。

    epoll_create:然后 epoll 的生成在前文 EPollIOLoop 的初始化中就已经完成了:super(EPollIOLoop, self).initialize(impl=select.epoll(), **kwargs)。 这个相当于 epoll_create 。

    epoll_wait: epoll_wait 操作则在 start() 中:event_pairs = self._impl.poll(poll_timeout)

    epoll_close:而 epoll 的 close 则在 PollIOLoop 中的 close 方法内调用: self._impl.close() 完成。

    initialize

    接下来看 PollIOLoop 的初始化方法中作了什么:

    def initialize(self, impl, time_func=None, **kwargs):
            super(PollIOLoop, self).initialize(**kwargs)
            self._impl = impl                         # 指定 epoll
            if hasattr(self._impl, 'fileno'):
                set_close_exec(self._impl.fileno())   # fork 后关闭无用文件描述符
            self.time_func = time_func or time.time   # 指定获取当前时间的函数
            self._handlers = {}                       # handler 的字典,储存被 epoll 监听的 handler ,与打开它的文件描述符 ( file descriptor 简称 fd ) 一一对应
            self._events = {}                         # event 的字典,储存 epoll 返回的活跃的 fd event pairs
            self._callbacks = []                      # 储存各个 fd 回调函数的列表
            self._callback_lock = threading.Lock()    # 指定进程锁
            self._timeouts = []                       # 将是一个最小堆结构,按照超时时间从小到大排列的 fd 的任务堆( 通常这个任务都会包含一个 callback )
            self._cancellations = 0                   # 关于 timeout 的计数器
            self._running = False                     # ioloop 是否在运行
            self._stopped = False                     # ioloop 是否停止
            self._closing = False                     # ioloop 是否关闭
            self._thread_ident = None                 #  当前线程堆标识符 ( thread identify )
            self._blocking_signal_threshold = None    # 系统信号, 主要用来在 epoll_wait 时判断是否会有 signal alarm 打断 epoll
            self._timeout_counter = itertools.count() # 超时计数器 ( 暂时不是很明白具体作用,好像和前面的 _cancellations 有关系? 请大神讲讲)
            self._waker = Waker()                     # 一个 waker 类,主要是对于管道 pipe 的操作,因为 ioloop 属于底层的数据操作,这里 epoll 监听的是 pipe
            self.add_handler(self._waker.fileno(),
                             lambda fd, events: self._waker.consume(),
                             self.READ)               # 将管道加入 epoll 监听,对于 web server 初始化时只需要关心 READ 事件
    

    除了注释中的解释,还有几点补充:

    1. close_exec 的作用: 子进程在 fork 出来的时候,使用了写时复制( COW , Copy-On-Write )方式获得父进程的数据空间、 堆和栈副本,这其中也包括文件描述符。刚刚 fork 成功时,父子进程中相同的文件描述符指向系统文件表中的同一项,接着,一般我们会调用 exec 执行另一个程序,此时会用全新的程序替换子进程的正文,数据,堆和栈等。此时保存文件描述符的变量当然也不存在了,我们就无法关闭无用的文件描述符了。所以通常我们会 fork 子进程后在子进程中直接执行 close 关掉无用的文件描述符,然后再执行 exec 。 所以 close_exec 执行的其实就是 关闭 + 执行的作用。 详情可以查看: 关于 linux 进程间的 close-on-exec 机制

    2. Waker(): Waker 封装了对于管道 pipe 的操作:

      def set_close_exec(fd):
      	flags = fcntl.fcntl(fd, fcntl.F_GETFD)
      	fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)
      
      def _set_nonblocking(fd):
      	flags = fcntl.fcntl(fd, fcntl.F_GETFL)
      	fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
      
      class Waker(interface.Waker):
      	def __init__(self):
      		r, w = os.pipe()
      		_set_nonblocking(r)
      		_set_nonblocking(w)
      		set_close_exec(r)
      		set_close_exec(w)
      		self.reader = os.fdopen(r, "rb", 0)
      		self.writer = os.fdopen(w, "wb", 0)
      
      	def fileno(self):
      		return self.reader.fileno()
      
      	def write_fileno(self):
      		return self.writer.fileno()
      
      	def wake(self):
      		try:
      			self.writer.write(b"x")
      		except IOError:
      			pass
      
      	def consume(self):
      		try:
      			while True:
      				result = self.reader.read()
      				if not result:
      					break
      		except IOError:
      			pass
      
      	def close(self):
      		self.reader.close()
      		self.writer.close()
      ```
      

    可以看到 waker 把 pipe 分为读、 写两个管道并都设置了非阻塞和 close_exec。 注意wake(self)方法中:self.writer.write(b"x") 直接向管道中写入随意字符从而释放管道。

    原文地址

    作者:rapospectre

    1 条回复    2016-06-06 22:18:31 +08:00
    rapospectre
        1
    rapospectre  
    OP
       2016-06-06 22:18:31 +08:00
    噗,,还有一部分发不出来了,不能短时间内频繁发布主 @_@
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   1091 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 24ms · UTC 23:33 · PVG 07:33 · LAX 15:33 · JFK 18:33
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.