博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
python 协程 深入浅出(二)
阅读量:7054 次
发布时间:2019-06-28

本文共 3162 字,大约阅读时间需要 10 分钟。

hot3.png

为了更好理解python 里的协程机制,仿照asyncio的思路,写一个完全基于yield语法的的协程实现。对协程还有困惑的人,看了应该会有帮助。

下面代码基本实现了python3.5+ 协程的功能。

没有实现的功能包括:

协程的嵌套调用时,内部协程sleep信号如何传到外部协程,也就是await语法和协程标志。

python3.6+ 对协程都添加了标志,并通过await语法,让子协程的sleep操作可以传到外部,从而可以被事件循环捕获。因为这个处理比较复杂(虽然原理很简单),我就没有实现。

也因为我没有协程标志,所以我的Task类需要用isinstance来判断面对的是一个生成器还是一个普通函数。

import typesimport tracebackimport timeclass Sleep():    def __init__(self, n):        assert isinstance(n, int)        self.end = now()+nnow=lambda:time.time()    def every(func, a_list):    ret = True    for i in a_list:        ret = ret and func(i)    return ret                class Task(object):    generator = type((i for i in range(1)))    def __init__(self, corotine, *args):        assert isinstance(corotine, self.generator) or isinstance(corotine, types.FunctionType)        self.status = "pending"        self.corotine = corotine        self.args = args    def start(self):        try:            self.status = "running"            #print(self.status)            try:                while True:                    tmp_status = next(self.corotine)                    if isinstance(tmp_status, Sleep):                        self.status = "sleep"                        self.end_sleep = tmp_status.end                        break            except TypeError:                result = self.corotine(*self.args)                raise StopIteration(result)        except StopIteration as result:            self.status = "completed"            self._result = result        except:            traceback.print_exc()    @staticmethod    def wait(task_list):        def all_task():            while True:                for task in task_list:                    #print(task.status)                    if task.status == "sleep":                        if now()>task.end_sleep:                            task.status = "awake"                            del(task.end_sleep)                for task in task_list:                    if task.status in ["pending", "awake"]:                        #print(task.status)                        task.start()                if every(lambda i:i.status=="completed", task_list):                    return task_list        return Task(all_task)    @staticmethod    def sleep(n):        return Sleep(n)    def result(self):        assert self.status == "completed"        return self._resultclass Loop(object):    def __init__(self):        pass    def run_until_complete(self, task, *args):        if not isinstance(task, Task):            task = Task(task, *args)        assert isinstance(task, Task)        Task.wait([task]).start()        def a():    print("a: ","hello")    for _ in range(5):        yield Task.sleep(1)        print("a: ", "bye")    return "goodbye"def b():    print("b: haha")    for _ in range(5):        yield Task.sleep(1)        print("b: again")    return "nihao"def c():    for i in range(10):        print("c:",i)    return "nice"loop = Loop()task1 = Task(a())task2 = Task(b())task3 = Task(c)task = Task.wait([task1, task2, task3])loop.run_until_complete(task)print(task.result())print(task1.result())print(task2.result())print(task3.result())

 

转载于:https://my.oschina.net/backbye/blog/1825926

你可能感兴趣的文章
Linux基础(3)
查看>>
扫盲贴:T-SQL语句执行顺序
查看>>
shiro 过滤属性的意义
查看>>
linux中正则表达式的使用方法
查看>>
grub2配置文件
查看>>
文件压缩
查看>>
SQL中EXISTS的用法
查看>>
基于虚拟用户的邮件系统配置
查看>>
阿修罗监控与grafana结合使用
查看>>
我的友情链接
查看>>
JDK 和JRE的区别
查看>>
lanhelper
查看>>
PLSQL中的内存表--Index By Table
查看>>
Shell练习(十二)
查看>>
解决springmvc+mybatis+mysql中文乱码问题【转】
查看>>
十三,十四单元总结
查看>>
表单布局
查看>>
Centos 6.6 下 nginx +php mysql + phpMyadmin 安装部署
查看>>
device-mapper 块级重删(dm dedup) <3>代码结构(2)
查看>>
cas_client之AuthenticationFilter源码分析
查看>>