为了更好理解python 里的协程机制,仿照asyncio的思路,写一个完全基于yield语法的的协程实现。对协程还有困惑的人,看了应该会有帮助。
下面代码基本实现了python3.5+ 协程的功能。
没有实现的功能包括:
协程的嵌套调用时,内部协程sleep信号如何传到外部协程,也就是await语法和协程标志。
python3.6+ 对协程都添加了标志,并通过await语法,让子协程的sleep操作可以传到外部,从而可以被事件循环捕获。因为这个处理比较复杂(虽然原理很简单),我就没有实现。
也因为我没有协程标志,所以我的Task类需要用isinstance来判断面对的是一个生成器还是一个普通函数。
import typesimport tracebackimport timeclass Sleep(): def __init__(self, n): assert isinstance(n, int) self.end = now()+nnow=lambda:time.time() def every(func, a_list): ret = True for i in a_list: ret = ret and func(i) return ret class Task(object): generator = type((i for i in range(1))) def __init__(self, corotine, *args): assert isinstance(corotine, self.generator) or isinstance(corotine, types.FunctionType) self.status = "pending" self.corotine = corotine self.args = args def start(self): try: self.status = "running" #print(self.status) try: while True: tmp_status = next(self.corotine) if isinstance(tmp_status, Sleep): self.status = "sleep" self.end_sleep = tmp_status.end break except TypeError: result = self.corotine(*self.args) raise StopIteration(result) except StopIteration as result: self.status = "completed" self._result = result except: traceback.print_exc() @staticmethod def wait(task_list): def all_task(): while True: for task in task_list: #print(task.status) if task.status == "sleep": if now()>task.end_sleep: task.status = "awake" del(task.end_sleep) for task in task_list: if task.status in ["pending", "awake"]: #print(task.status) task.start() if every(lambda i:i.status=="completed", task_list): return task_list return Task(all_task) @staticmethod def sleep(n): return Sleep(n) def result(self): assert self.status == "completed" return self._resultclass Loop(object): def __init__(self): pass def run_until_complete(self, task, *args): if not isinstance(task, Task): task = Task(task, *args) assert isinstance(task, Task) Task.wait([task]).start() def a(): print("a: ","hello") for _ in range(5): yield Task.sleep(1) print("a: ", "bye") return "goodbye"def b(): print("b: haha") for _ in range(5): yield Task.sleep(1) print("b: again") return "nihao"def c(): for i in range(10): print("c:",i) return "nice"loop = Loop()task1 = Task(a())task2 = Task(b())task3 = Task(c)task = Task.wait([task1, task2, task3])loop.run_until_complete(task)print(task.result())print(task1.result())print(task2.result())print(task3.result())