2017-07-03-python-协程和生成器

Table of Contents

1 生成器

1.1 简单的generator

python中的for很好用,但是如果循环的次数较多的话,需要占用很多内存。 于是就有了 generator ,需要在时候才计算。看看下面的的例子:

     L = [x for x in range(10)]
     l = (x for x in range(10))
     print L
     print l

generator可以例用next来访问:

     l = (x for x in range(4))
     print l.next()
     print l.next()
     print l.next()
     print l.next()
     print l.next()

结果如下:

     0
     1
     2
     3
     Traceback (most recent call last):
       File "<stdin>", line 1, in <module>
       File "/tmp/main.py", line 6, in <module>
         print l.next()
     StopIteration

可以看到,最后一个next抛出了一个 StopIteration 。我理解generator应 该是靠这个机制来完成它正确地打开方式的:

     l = (x for x in range(3))
     for i in l:
         print i
     def fib(max):
         n, a, b = 0, 0, 1
         while (n < max):
             print '---->b = %s<----' % b
             yield b
             a, b = b, a + b
             n = n + 1

     g = fib(6)
     for i in g:
         print '====>b = %s<====\n' % i

1.2 定义函数做为generator

比较复杂的情况下,还可以定义一个函数做为generator,比如打印1到n的函 数:

     def foo(n):
         for i in range(n):
             print i

     foo(4)

上面的函数稍威修改一下就是一个generator啦:

     def foo(n):
         for i in range(n):
             yield i

     print foo(3)
     for n in foo(3):
         print n

如果一个函数中包含yield关键字,那么这个函数就不再是个普通函数,而 是一个generator

generator不像一般函数一样遇到return或者运行到最后返回。而generator 的函数,在调用next时开始执行,遇到yield返回,吐出yield的内容,再次 调用next又从上次yield后执行。

     def foo():
         print 'step 1'
         yield '[test] %s from yield' % 1
         print 'step 2'
         yield '[test] %s from yield' % 1
         print 'step 3'
         yield '[test] %s from yield' % 1

     f = foo()
     print f.next()
     print f.next()
     print f.next()

2 python中的协程

这里讲得很形象

协程和 生成器 类似,但是不同是的,生成器是数据的生产者,协程是数据的 消费者。

对应生成器的例子:

    def foo(n):
        for i in range(n):
            yield i

    print foo(3)
    for n in foo(3):
        print n

这样可以定义一个协程:

     def grep(pattern):
         print("Searching for", pattern)
         while True:
             line = (yield)
             if pattern in line:
                 print(line)

和生成器一样,定义一个协程后,调用 next() 函数才开始执行,到 yield 后停止,然后调用 send() 函数给协程发送数据。协程收到数据 后继续往下执行,直到下一次的 yield

     def grep(pattern):
         print("Searching for", pattern)
         while True:
             line = (yield)
             print 'recieve data ', line
             if pattern in line:
                 print(line)
             print 'after if'


     g = grep('test')
     g.next()
     g.send('hi this is')
     g.send('hi this is test')
    #!/usr/local/bin/python
    # coroutine.py
    #
    # A decorator function that takes care of starting a coroutine
    # automatically on call.


    def coroutine(func):
        def start(*args, **kwargs):
            cr = func(*args, **kwargs)
            cr.next()
            return cr
        return start

    # Example use
    if __name__ == '__main__':
        @coroutine
        def grep(pattern):
            print "Looking for %s" % pattern
            while True:
                line = (yield)
                if pattern in line:
                    print line,

        g = grep("python")
        # Notice how you don't need a next() call here
        g.send("Yeah, but no, but yeah, but no")
        g.send("A series of tubes")
        g.send("python generators rock!")

3 理解一下python中的yield

    import time

    def consumer():
        r = ''
        while True:
            n = yield r
            if not n:
                return
            print('[CONSUMER] Consuming %s...' % n)
            time.sleep(1)
            r = '200 OK'

    def produce(c):
        c.next()
        n = 0
        while n < 5:
            n = n + 1
            print('[PRODUCER] Producing %s...' % n)
            r = c.send(n)
            print('[PRODUCER] Consumer return: %s' % r)
        c.close()

    if __name__=='__main__':
        c = consumer()
        produce(c)

可以看到 !n = yield r! 这一句,yield出去的r返回给send的发起者的。而 yield 本身的值赋给了n,n就是send发送过来的。。

执行流程是这样的:

  1. 主程序调用produce,c.next()执行就直接切换到consumer中开始执行。
  2. consumer中执行到yield,切换到produce从c.next()后继续执行,此时从 consumer中yield回来的值是空。也就是consumer中第一句给r的赋值。
  3. produce进入while,执行到c.send(n)。然后切换到consumer又从yield开 始执行。此时consumer收到的n是由produce发送过来的值。
  4. consumer睡一秒后给r赋值,然后重复while循环。到yield后吐出新赋值的 r给produce。
  5. produce打印返回值r,重复循环到c.send(n)。进入步骤3。

4 参考资料三写得太好了,我理解式的翻译了一些

就是它,建议英语可以的直接读它 Coroutines.pdf

4.1 更进一步理解生成器

4.1.1 一个python版本的 tail -f

     import time
       def follow(thefile):
           thefile.seek(0,2)      # Go to the end of the file
           while True:
                line = thefile.readline()
                if not line:
                    time.sleep(0.1)    # Sleep briefly
                    continue
                yield line


     logfile  = open("access-log")
     for line in follow(logfile):
         print line,

程序执行后,会一直在while中循环,如果文件结尾有插入数据,用yield吐出 来进行输出。输出完后再次进入follow中的while循环继续执行

4.1.2 生成器做为管道

结合 一个python版本的 tail -f ,来看看生成器做为管道的用法:

     import time
     def follow(thefile):
         thefile.seek(0,2)      # Go to the end of the file
         while True:
              line = thefile.readline()
              if not line:
                  time.sleep(0.1)    # Sleep briefly
                  continue
              yield line

     def grep(pattern, lines):
         for line in lines:
             if pattern in line:
                 yield line

     # Set up a processing pipe : tail -f | grep python
     logfile  = open("access-log")
     loglines = follow(logfile)
     pylines  = grep("python",loglines)
     # Pull results out of the processing pipeline
     for line in pylines:
         print line,

grep本身就是一个生成器,它接受一个安符串的pattern和一个生成器做为参 数。程序执行时,应该是一直在 follow 函数的while中循环等待。如果文 件结尾有数据进来,follow吐出数据,此时进入grep中的for,如果在这一行 中找到pattern,grep又吐出数据进行输出。完毕后又回到follow的while循环 中继续等待。

4.1.3 一个特殊一点的例子

注意看为啥中间的数字没有了。

      def countdown(n):
          print "Counting down from", n
          while n >= 0:
              newvalue = (yield n)
              # If a new value got sent in, reset n with it
              if newvalue is not None:
                  n = newvalue
              else:
                  n -= 1

      c = countdown(5)
      for n in c:
          print n
          if n == 5:
              a = c.send(3)
              print 'a = %s' % a

还需要注意的 :send后如果不接收它的结果,那么countdown中间有一次 yield出来的3将被丢弃。比如这样,结果里面是没有3的:

      def countdown(n):
          print "Counting down from", n
          while n >= 0:
              newvalue = (yield n)
              # If a new value got sent in, reset n with it
              if newvalue is not None:
                  n = newvalue
              else:
                  n -= 1

      c = countdown(5)
      for n in c:
          print n
          if n == 5:
              c.send(3)

4.1.4 简单总结

  1. 生成器为迭代器产生数据。
  2. 协程是数据的消费者。
  3. 不要把协程和迭代器混在一起。
  4. 协程与迭代器无关。

4.2 更进一步理解协程

4.2.1 使用协程写的 tail -f

     import time
     def coroutine(fun):
         def start(*arg, **args):
             cr = fun(*arg, ** args)
             cr.next()
             return cr
         return start


     def follow(thefile, target):
         thefile.seek(0,2)      # Go to the end of the file
         while True:
              line = thefile.readline()
              if not line:
                  time.sleep(0.1)    # Sleep briefly
                  continue
              target.send(line)


     @coroutine
     def printer():
         while True:
             line = (yield)
             print line,

     f = open("access-log")
     follow(f, printer())

程序执行后:

  1. 执行printer到yield返回follow。
  2. follow循环等待数据。
  3. 如果有数据follow发送给printer。
  4. printer输出数据后,再次从yield返回follow进入步骤2。

4.2.2 协程做为管道

     import time
     def coroutine(fun):
         def start(*arg, **args):
             cr = fun(*arg, ** args)
             cr.next()
             return cr
         return start

     def follow(thefile, target):
         print 'begin follow'
         thefile.seek(0,2)      # Go to the end of the file
         while True:
              line = thefile.readline()
              if not line:
                  time.sleep(0.1)    # Sleep briefly
                  continue
              target.send(line)

     @coroutine
     def printer():
         print 'begind printer'
         while True:
             line = (yield)
             print line,

     @coroutine
     def grep(pattern, target):
         print 'begin grep'
         while True:
             line = (yield)
             if pattern in line:
                 target.send(line)

     f = open("kk.txt")
     follow(f, grep('python', printer()))

从打印可以看到,程序开始这样执行:

  1. 从printer开始执行,到yield后停下等待数据。
  2. 然后执行grep到yield。
  3. 然后执行follow进入while一直等待数据。
  4. 有数据后follow发送给grep。
  5. grep处理后,如果有合适的数据再发送给printer处理,完成后从yield返 回follow。如果grep没有发现合适的数据就直接从yield返回follow。
  6. printer收到数据后从再次停在yield后返回grep。然后grep返回follow进 入步骤3。

结合 生成器做为管道 ,生成器是使用迭代器从管道中拉取数据。而协程是使 用 send() 向管道中push数据。

4.2.3 协程做广播

结合 协程做为管道 ,其实协程还有类似于广播的功能:

coroutine_broadcasting.png

     import time
     def coroutine(fun):
         def start(*arg, **args):
             cr = fun(*arg, ** args)
             cr.next()
             return cr
         return start

     def follow(thefile, target):
         print 'begin follow'
         thefile.seek(0,2)      # Go to the end of the file
         while True:
              line = thefile.readline()
              if not line:
                  time.sleep(0.1)    # Sleep briefly
                  continue
              target.send(line)

     @coroutine
     def printer():
         print 'begind printer'
         while True:
             line = (yield)
             print line,

     @coroutine
     def grep(pattern, target):
         print 'begin grep'
         while True:
             line = (yield)
             if pattern in line:
                 target.send(line)


     @coroutine
     def broadcast(targets):
         while True:
             item = (yield)
             for target in targets:
                 target.send(item)


     f = open("kk.txt")
     follow(f, broadcast([grep('python', printer()),
                          grep('ply', printer()),
                          grep('simple', printer())]))

这个和 协程做为管道 的区别就只是到了broadcast,它send给了多个 coroutine。这种情况的模型如下: coroutine_model_1.png

另一种模型是这样的:

      f = open("access-log")
      p = printer()

      follow(f, broadcast([grep('python', p),
                           grep('ply', p),
                           grep('simple', p)]))

coroutine_model_2.png

4.2.4 错误地使用协程

不能对一个正在执行的协程再调用 send() 。比如这样就会导致程序挂掉: coroutine_creash.png

4.3 使用协程和生成器做成一个类似于OS的多任务系统

     class Task(object):
         taskid = 0
         def __init__(self,target):
             Task.taskid += 1
             self.tid     = Task.taskid
             self.target  = target
             self.sendval = None


         # Task ID
         # Target coroutine
         # Value to send
         def run(self):
             return self.target.send(self.sendval)


     if __name__ == '__main__':    
         # A very simple generator
         def foo():
             print "Part 1"
             yield
             print "Part 2"
             yield
    
         t1 = Task(foo())
         t1.run()
         t1.run()

调度器:

     class Scheduler(object):
         def __init__(self):
             self.ready   = Queue()
             self.taskmap = {}
         def new(self,target):
             newtask = Task(target)
             self.taskmap[newtask.tid] = newtask
             self.schedule(newtask)
             return newtask.tid
         def schedule(self,task):
             self.ready.put(task)
         def mainloop(self):
             while self.taskmap:
                 task = self.ready.get()
                 result = task.run()
                 self.schedule(task)

第一个多任务的例子:

     from kombu import Queue

     class Task(object):
         taskid = 0
         def __init__(self,target):
             Task.taskid += 1
             self.tid     = Task.taskid
             self.target  = target
             self.sendval = None


         # Task ID
         # Target coroutine
         # Value to send
         def run(self):
             return self.target.send(self.sendval)


     class Scheduler(object):
         def __init__(self):
             self.ready   = Queue()
             self.taskmap = {}
         def new(self,target):
             newtask = Task(target)
             self.taskmap[newtask.tid] = newtask
             self.schedule(newtask)
             return newtask.tid
         def schedule(self,task):
             self.ready.put(task)
         def mainloop(self):
             while self.taskmap:
                 task = self.ready.get()
                 result = task.run()
                 self.schedule(task)

     def foo():
         while True:
             print "I'm foo"
             yield

     def bar():
         while True:
             print "I'm bar"
             yield

     sched = Scheduler()
     sched.new(foo())
     sched.new(bar())
     sched.mainloop()

上面这个例子中,如果一个任务退出,会导致crash。所以需要处理一下任务 退出的情况:

     from kombu import Queue

     class Task(object):
         taskid = 0
         def __init__(self,target):
             Task.taskid += 1
             self.tid     = Task.taskid
             self.target  = target
             self.sendval = None


         # Task ID
         # Target coroutine
         # Value to send
         def run(self):
             return self.target.send(self.sendval)


     class Scheduler(object):
         def __init__(self):
             self.ready   = Queue()
             self.taskmap = {}

         def exit(self, task):
             print "Task %d terminated" % task.tid
             del self.taskmap[task.tid]

         def new(self,target):
             newtask = Task(target)
             self.taskmap[newtask.tid] = newtask
             self.schedule(newtask)
             return newtask.tid

         def schedule(self,task):
             self.ready.put(task)

         def mainloop(self):
             while self.taskmap:
                 task = self.ready.get()
                 try:
                     result = task.run()
                 except StopIteration:
                     self.exit(task)
                     continue
                 self.schedule(task)

     def foo():
         for i in xrange(5):
             print "I'm foo"
             yield

     def bar():
         for i in xrange(10):
             print "I'm bar"
             yield

     sched = Scheduler()
     sched.new(foo())
     sched.new(bar())
     sched.mainloop()

这个例子中,scheduler可以看成是操作系统。任务中的yield就是操作系统中 的“中断”(或者是trap)。如果任务需要请求操作系统(scheduler)提供 的服务,即系统调用,需要带参数使用yield:

     class SystemCall(object):
         def handle(self):
     pass
     class Scheduler(object):
         ...
         def mainloop(self):
              while self.taskmap:
                 task = self.ready.get()
                 try:
                     result = task.run()
                     if isinstance(result,SystemCall):
                         result.task  = task
                         result.sched = self
                         result.handle()
                         continue
                 except StopIteration:
                     self.exit(task)
                     continue
                 self.schedule(task)

这里的 SystemCall 就是系统调用的基类,所有系统调用都继承自它。在 mainloop中执行时,如果任务yield出来的是SystemCall,则先保存上下文 (当前任务和调度器)。下面定义一个返回任务id的系统调用:

     class GetTid(SystemCall):
         def handle(self):
             self.task.sendval = self.task.tid
             self.sched.schedule(self.task)

那么此时任务应该这样定义:

     def foo():
         mytid = yield GetTid()
         for i in xrange(5):
             print "I'm foo", mytid
             yield
     def bar():
         mytid = yield GetTid()
         for i in xrange(10):
             print "I'm bar", mytid
             yield

总的流程:,

  1. 调度器先调度到一个任务,任务yield出来一个系统调用。
  2. 调度器判断是系统调用没错后,保存上下文环境,然后进入系统调用执行。
  3. 本例系统调用中修改了任务自己的属性 self.sendval 。然后重新把该 任务加入加绪队列。
  4. 调度器继续从就绪队列中取出任务进行执行。
  5. 再次调度到该任务时,它继续从yield处开始执行,而这次mytid就是系统 调用设置的 self.sendval 。它是在执行时通过send发送过来的。其实 可以理解为系统调用的返回值就是它设置的 self.sendval

4.4 总结

yield总共有三个用途:

  1. 生成迭代器,数据的生产者。
  2. 接收消息,数据的消费者。
  3. 类似于操作系统的trap,实现多任务。

忠告就是: 不要尝试写一个带yield的函数来实现两个以上的上述功能。

5 参考资料

  1. 廖雪峰讲协程

           但是在A中是没有调用B的,所以协程的调用比函数调用理解起来要难一些。
    
           看起来A、B的执行有点像多线程,但协程的特点在于是一个线程执行,那和多线
           程比,协程有何优势?
    
           最大的优势就是协程极高的执行效率。因为子程序切换不是线程切换,而是由程
           序自身控制,因此,没有线程切换的开销,和多线程比,线程数量越多,协程的
           性能优势就越明显。
    
           第二大优势就是不需要多线程的锁机制,因为只有一个线程,也不存在同时写变
           量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了,所以执行效率
           比多线程高很多。
    
           因为协程是一个线程执行,那怎么利用多核CPU呢?最简单的方法是多进程+协程,
           既充分利用多核,又充分发挥协程的高效率,可获得极高的性能。
    
           Python对协程的支持还非常有限,用在generator中的yield可以一定程度上实现
           协程。虽然支持不完全,但已经可以发挥相当大的威力了。
    
  2. 廖雪峰讲gevent
  3. 这里还有一个不错的英文介绍

Python通过yield提供了对协程的基本支持,但是不完全。而第三方的gevent 为Python提供了比较完善的协程支持。

Author: Peng Xie

Created: 2018-10-01 Mon 21:36