DEV Community

菜皮日记
菜皮日记

Posted on

1

tornado 的协程调度原理

本文讨论 tornado 的协程实现原理,简单做了一份笔记。

首先看一段最常见的 tornado web 代码:

import tornado
import tornado.web
import tornado.gen
from tornado.gen import coroutine
from tornado.httpclient import AsyncHTTPClient

class GenHandler(tornado.web.RequestHandler):
    @coroutine
    def get(self):
        url = 'http://www.baidu.com'
        http_client = AsyncHTTPClient()
        response = yield http_client.fetch(url)
        yield tornado.gen.sleep(5)
        self.write(response.body)

class MainHanler(tornado.web.RequestHandler):
    def get(self):
        self.write('root')

if __name__ == "__main__":
    application = tornado.web.Application([
        (r"/", MainHanler),
        (r"/gen_async/", GenHandler),
    ], autoreload=True)
    application.listen(8888)
    tornado.ioloop.IOLoop.current().start()
Enter fullscreen mode Exit fullscreen mode

其中最后一行代码 tornado.ioloop.IOLoop.current().start() 启动服务。带着几个问题往下看:

  • 知道 yield 可以暂存执行状态,等「合适的时机」重新恢复执行,那么保存的状态到哪去了?
  • 上一个问题中「合适的时机」是到底是什么时候?
  • 继续接上一个问题,具体是怎么恢复执行的?

IOLoop 类相当于是对多路复用的封装,起到事件循环的作用,调度整个协程执行过程。

查看 IOLoop 的源码,可以看到 IOLoop 继承自 Configurable,PollIOLoop 又继承自 IOLoop。当 IOLoop 启动时,会确定使用哪一种多路复用方式,epoll、kqueue 还是 select?

# IOLoop 类
# IOLoop 中的 configurable_default 方法是重写 Configurable 的
# 这里会确定使用哪种多路复用方式
@classmethod
def configurable_default(cls):
    if hasattr(select, "epoll"):
        from tornado.platform.epoll import EPollIOLoop
        return EPollIOLoop
    if hasattr(select, "kqueue"):
        # Python 2.6+ on BSD or Mac
        from tornado.platform.kqueue import KQueueIOLoop
      return KQueueIOLoop
    from tornado.platform.select import SelectIOLoop
  return SelectIOLoop
Enter fullscreen mode Exit fullscreen mode
# PollIOLoop类
def initialize(self, impl, time_func=None, **kwargs):
    super(PollIOLoop, self).initialize(**kwargs)
    self._impl = impl
    if hasattr(self._impl, 'fileno'):
        set_close_exec(self._impl.fileno())
    self.time_func = time_func or time.time
    self._handlers = {}
    self._events = {}
    self._callbacks = []
    self._callback_lock = threading.Lock()
    self._timeouts = []
    self._cancellations = 0
    self._running = False
    self._stopped = False
    self._closing = False
    self._thread_ident = None
    self._blocking_signal_threshold = None
    self._timeout_counter = itertools.count()

    # Create a pipe that we send bogus data to when we want to wake
    # the I/O loop when it is idle
    self._waker = Waker()
    self.add_handler(self._waker.fileno(),
                     lambda fd, events: self._waker.consume(),
                     self.READ)

def add_handler(self, fd, handler, events):
    fd, obj = self.split_fd(fd)
    self._handlers[fd] = (obj, stack_context.wrap(handler))
    self._impl.register(fd, events | self.ERROR)

def update_handler(self, fd, events):
    fd, obj = self.split_fd(fd)
    self._impl.modify(fd, events | self.ERROR)

def remove_handler(self, fd):
    fd, obj = self.split_fd(fd)
    self._handlers.pop(fd, None)
    self._events.pop(fd, None)
    try:
        self._impl.unregister(fd)
    except Exception:
        gen_log.debug("Error deleting fd from IOLoop", exc_info=True)
Enter fullscreen mode Exit fullscreen mode

PollIOLoop 中 initalize 方法中调用 add_handler 方法,注册对应事件的处理函数,如 socket 可读时,回调哪个函数去处理。

IOLoop 和协程之间的信使:Future

class Future(object):
    def __init__(self):
        self._result = None
        self._exc_info = None
        self._callbacks = []
        self.running = True

    def set_result(self, result):
        ...

    def set_exc_info(self, exce_info):
        ...

    def result(self):
        ...

    def exc_info(self):
        ...

    def add_done_callback(self, callback):
        self._callbacks.append(callback)
Enter fullscreen mode Exit fullscreen mode

Future 对象起到“占位符”的作用,协程的执行结果会通过 set_result 方式写入其中,并调用通过 add_done_callback 设置的回调。

恢复唤醒协程的 Runner

class Runner(object):
    def __init__(self, gen, result_future, first_yielded):
        self.gen = gen
        self.result_future = result_future
        self.future = _null_future
        self.yield_point = None
        self.pending_callbacks = None
        self.results = None
        self.running = False
        self.finished = False
        self.had_exception = False
        self.io_loop = IOLoop.current()
        self.stack_context_deactivate = None
        # 上面一堆不需要看的初始化
        if self.handle_yield(first_yielded):
            gen = result_future = first_yielded = None
            self.run()


    def handle_yield(self, yielded):

        self.future = convert_yielded(yielded)

        if self.future is moment:
            self.io_loop.add_callback(self.run)
            return False
        elif not self.future.done():
            def inner(f):
                # Break a reference cycle to speed GC.
                f = None
                self.run()
            self.io_loop.add_future(
                self.future, inner)
            return False
        return True

    def run(self):
        if self.running or self.finished:
            return
        try:
            self.running = True
            while True:
                future = self.future
                if not future.done():
                    return
                self.future = None
                try:
                    orig_stack_contexts = stack_context._state.contexts
                    exc_info = None

                    try:
                        value = future.result()
                    except Exception:
                        self.had_exception = True
                        exc_info = sys.exc_info()
                    future = None

                    yielded = self.gen.send(value)

                except (StopIteration, Return) as e:
                    self.finished = True
                    self.future = _null_future
                    if self.pending_callbacks and not self.had_exception:
                        raise LeakedCallbackError(
                            "finished without waiting for callbacks %r" %
                            self.pending_callbacks)
                    future_set_result_unless_cancelled(self.result_future,
_value_from_stopiteration(e))
                    self.result_future = None
                    self._deactivate_stack_context()
                    return
                except Exception:
                    # 一些结束操作
                    return
                if not self.handle_yield(yielded):
                    return
                yielded = None
        finally:
            self.running = False
Enter fullscreen mode Exit fullscreen mode

协程每生成一个 Future,都会生成对应的一个 Runner,并将 Future 初始化注入都其中。Runner 的 run 方法中,通过 self.gen.send(Future) 来启动 Future,当 Future 完成时,将其设置成 done,并回调其预设的 callback。

回答第一个问题:协程的状态保存到哪去了:

IOLoop 中通过 add_future 调用实现类 PollIOLoop 中的 add_callback 方法,其中通过 functools 生成偏函数,放入 _callbacks 列表,等待被回调执行。

# IOLoop 的add_future
def add_future(self, future, callback):
    """Schedules a callback on the ``IOLoop`` when the given
    `.Future` is finished.

    The callback is invoked with one argument, the
    `.Future`.
    """
    assert is_future(future)
    callback = stack_context.wrap(callback)
    future.add_done_callback(
        lambda future: self.add_callback(callback, future))

# PollIOLoop 的add_callback
def add_callback(self, callback, *args, **kwargs):
        if thread.get_ident() != self._thread_ident:
            with self._callback_lock:
                if self._closing:
                    return
                list_empty = not self._callbacks
                self._callbacks.append(functools.partial(
                    stack_context.wrap(callback), *args, **kwargs))
                if list_empty:
                    self._waker.wake()
        else:
            if self._closing:
                return
            self._callbacks.append(functools.partial(
                stack_context.wrap(callback), *args, **kwargs))
Enter fullscreen mode Exit fullscreen mode

第二个问题:「合适的时机」是什么?

IOLoop 实际上就是对多路复用的封装,当底层 epoll_wait 事件发生时,即会通知 IOLoop 主线程。

这一段是 IOLoop 中等待多路复用的事件,以及处理事件。

try:
    # 等待事件
      event_pairs = self._impl.poll(poll_timeout)
except Exception as e:
      print("wait fail")

      if errno_from_exception(e) == errno.EINTR:
          continue
      else:
          raise
if self._blocking_signal_threshold is not None:
                    signal.setitimer(signal.ITIMER_REAL,
                                     self._blocking_signal_threshold, 0)
# 处理事件
self._events.update(event_pairs)
while self._events:
    fd, events = self._events.popitem()
    try:
        fd_obj, handler_func = self._handlers[fd]
        handler_func(fd_obj, events)
    except (OSError, IOError) as e:
        if errno_from_exception(e) == errno.EPIPE:
            pass
        else:
            self.handle_callback_exception(self._handlers.get(fd))
    except Exception:
        self.handle_callback_exception(self._handlers.get(fd))
fd_obj = handler_func = None
Enter fullscreen mode Exit fullscreen mode

第三个问题:具体是怎么恢复的。

Runner 通过不断 check Future 的状态,最后调用 callback 来返回结果。

总结

首先 tornado 对多路复用系统调用做了封装,来实现非阻塞 web 服务。

其次 tornado 通过 yield+Future+Runner 实现了生成 Future,Runner 监控结果,回调 callback 来实现协程的执行。

参考:

http://www.nodekey.com/tornado-yi-bu-yuan-ma-jie-xi/

https://blog.csdn.net/wyx819/article/details/45420017

https://yangyaq.github.io/2019/03/06/tornado的事件循环机制/

Heroku

This site is built on Heroku

Join the ranks of developers at Salesforce, Airbase, DEV, and more who deploy their mission critical applications on Heroku. Sign up today and launch your first app!

Get Started

Top comments (0)

Image of Docusign

🛠️ Bring your solution into Docusign. Reach over 1.6M customers.

Docusign is now extensible. Overcome challenges with disconnected products and inaccessible data by bringing your solutions into Docusign and publishing to 1.6M customers in the App Center.

Learn more