使用kombu操作rabbitmq

Table of Contents

1 官方文档

kombu 使用rabbitmq的一个python库
RabbitMQ  

2 exchange和queue

消息是直接发给它的。然后由exchange来路由到各个queue中。每个queue是需 要绑定到一个exchange上的。

路由常用的两种:

  1. direct:直接发送。根据route key直接发到对应queue中(每个queue在声 明的时候需 要有一个route key)。
  2. fanout:类似广播。这样的exchange就不管route key了,会把发向这个 exchange的消息发给所有绑定到它的queue中。

3 Example

首先需要在系统里安装rabbitmq。在ubuntu中:

     sudo apt-get install rabbitmq-server

3.1 simplest example

send.py:

     from __future__ import absolute_import, unicode_literals

     import datetime

     from kombu import Connection


     with Connection('amqp://guest:guest@localhost:5672//') as conn:
         simple_queue = conn.SimpleQueue('simple_queue')
         message = 'helloworld, sent at {0}'.format(datetime.datetime.today())
         simple_queue.put(message)
         print('Sent: {0}'.format(message))
         simple_queue.close()

recieve.py:

     from __future__ import absolute_import, unicode_literals, print_function

     from kombu import Connection


     with Connection('amqp://guest:guest@localhost:5672//') as conn:
         simple_queue = conn.SimpleQueue('simple_queue')
         message = simple_queue.get(block=True, timeout=1)
         print('Received: {0}'.format(message.payload))
         message.ack()
         simple_queue.close()

3.2 publish

publisher.py:

      from kombu import Connection, Exchange, Producer
      exchange = Exchange('simple_queue_publish', type='fanout')
      with Connection('amqp://guest:guest@localhost:5672//') as conn:
          p = Producer(conn, exchange)
          p.publish('nihao ')

consumer1.py:

      from kombu import Connection
      from kombu import Exchange, Queue
      from kombu.mixins import ConsumerMixin

      name = 'simple_queue_publish'
      queue_name = 'queue'
      task_exchange = Exchange(name, type='fanout')
      task_queues = [Queue(queue_name,
                           task_exchange,
                           routing_key=queue_name)]


      class C(ConsumerMixin):

          def __init__(self, connection):
              self.connection = connection

          def get_consumers(self, Consumer, channel):
              return [
                  Consumer(task_queues,
                           callbacks=[self.on_message],
                           accept=['json']),

              ]

          def on_message(self, body, message):
              print("RECEIVED MESSAGE: %r" % (body, ))
              message.ack()

      with Connection('amqp://guest:guest@localhost:5672//') as conn:
          try:
              C = C(conn)
              C.run()
          except KeyboardInterrupt:
              print('bye bye')

      from kombu import Connection
      from kombu import Exchange, Queue
      from kombu.mixins import ConsumerMixin

      name = 'simple_queue_publish'
      queue_name = 'queue2'
      task_exchange = Exchange(name, type='fanout')
      task_queues = [Queue(queue_name,
                           task_exchange,
                           routing_key=queue_name)]


      class C(ConsumerMixin):

          def __init__(self, connection):
              self.connection = connection

          def get_consumers(self, Consumer, channel):
              return [
                  Consumer(task_queues,
                           callbacks=[self.on_message],
                           accept=['json']),

              ]

          def on_message(self, body, message):
              print("RECEIVED MESSAGE: %r" % (body, ))
              message.ack()

      with Connection('amqp://guest:guest@localhost:5672//') as conn:
          try:
              C = C(conn)
              C.run()
          except KeyboardInterrupt:
              print('bye bye')

3.3 direct

publisher.py:

      from kombu import Connection
      import datetime

      with Connection('amqp://guest:guest@localhost:5672//') as conn:
          simple_queue = conn.SimpleQueue('simple_queue')
          message = 'helloword, sent at %s' % datetime.datetime.today()
          simple_queue.put(message)
          print('Sent: %s' % message)
          simple_queue.close()

consumer.py:

      from kombu import Connection
      from kombu import Exchange, Queue
      from kombu.mixins import ConsumerMixin

      task_exchange = Exchange('simple_queue', type='direct')
      task_queues = [Queue('simple_queue',
                           task_exchange,
                           routing_key='simple_queue')]


      class C(ConsumerMixin):

          def __init__(self, connection):
              self.connection = connection

          def get_consumers(self, Consumer, channel):
              return [
                  Consumer(task_queues,
                           callbacks=[self.on_message],
                           accept=['json']),

              ]

          def on_message(self, body, message):
              print("RECEIVED MESSAGE: %r" % (body, ))
              message.ack()

      with Connection('amqp://guest:guest@localhost:5672//') as conn:
          try:
              C = C(conn)
              C.run()
          except KeyboardInterrupt:
              print('bye bye')

3.4 publish

consumer1:

      from kombu import Connection
      from kombu import Exchange, Queue
      from kombu.mixins import ConsumerMixin

      name = 'simple_queue_publish'
      queue_name = 'queue'
      task_exchange = Exchange(name, type='fanout')
      task_queues = [Queue(queue_name,
                           task_exchange,
                           routing_key=queue_name)]


      class C(ConsumerMixin):

          def __init__(self, connection):
              self.connection = connection

          def get_consumers(self, Consumer, channel):
              return [
                  Consumer(task_queues,
                           callbacks=[self.on_message],
                           accept=['json']),

              ]

          def on_message(self, body, message):
              print("RECEIVED MESSAGE: %r" % (body, ))
              message.ack()

      with Connection('amqp://guest:guest@localhost:5672//') as conn:
          try:
              C = C(conn)
              C.run()
          except KeyboardInterrupt:
              print('bye bye')

consumer2:

      from kombu import Connection
      from kombu import Exchange, Queue
      from kombu.mixins import ConsumerMixin

      name = 'simple_queue_publish'
      queue_name = 'queue2'
      task_exchange = Exchange(name, type='fanout')
      task_queues = [Queue(queue_name,
                           task_exchange,
                           routing_key=queue_name)]


      class C(ConsumerMixin):

          def __init__(self, connection):
              self.connection = connection

          def get_consumers(self, Consumer, channel):
              return [
                  Consumer(task_queues,
                           callbacks=[self.on_message],
                           accept=['json']),

              ]

          def on_message(self, body, message):
              print("RECEIVED MESSAGE: %r" % (body, ))
              message.ack()

      with Connection('amqp://guest:guest@localhost:5672//') as conn:
          try:
              C = C(conn)
              C.run()
          except KeyboardInterrupt:
              print('bye bye')

publisher:

      from kombu import Connection, Exchange, Producer


      exchange = Exchange('simple_queue_publish', type='fanout')


      with Connection('amqp://guest:guest@localhost:5672//') as conn:
          p = Producer(conn, exchange)
          p.publish('nihao ')

4 TODO Question

1.[ ] 一个queue能不能绑定到多个exchange中?

Author: Peng Xie

Created: 2018-10-01 Mon 21:36