使用kombu操作rabbitmq
Table of Contents
2 exchange和queue
消息是直接发给它的。然后由exchange来路由到各个queue中。每个queue是需 要绑定到一个exchange上的。
路由常用的两种:
- direct:直接发送。根据route key直接发到对应queue中(每个queue在声 明的时候需 要有一个route key)。
- fanout:类似广播。这样的exchange就不管route key了,会把发向这个 exchange的消息发给所有绑定到它的queue中。
3 Example
首先需要在系统里安装rabbitmq。在ubuntu中:
sudo apt-get install rabbitmq-server
3.1 simplest example
send.py:
from __future__ import absolute_import, unicode_literals import datetime from kombu import Connection with Connection('amqp://guest:guest@localhost:5672//') as conn: simple_queue = conn.SimpleQueue('simple_queue') message = 'helloworld, sent at {0}'.format(datetime.datetime.today()) simple_queue.put(message) print('Sent: {0}'.format(message)) simple_queue.close()
recieve.py:
from __future__ import absolute_import, unicode_literals, print_function from kombu import Connection with Connection('amqp://guest:guest@localhost:5672//') as conn: simple_queue = conn.SimpleQueue('simple_queue') message = simple_queue.get(block=True, timeout=1) print('Received: {0}'.format(message.payload)) message.ack() simple_queue.close()
3.2 publish
publisher.py:
from kombu import Connection, Exchange, Producer exchange = Exchange('simple_queue_publish', type='fanout') with Connection('amqp://guest:guest@localhost:5672//') as conn: p = Producer(conn, exchange) p.publish('nihao ')
consumer1.py:
from kombu import Connection from kombu import Exchange, Queue from kombu.mixins import ConsumerMixin name = 'simple_queue_publish' queue_name = 'queue' task_exchange = Exchange(name, type='fanout') task_queues = [Queue(queue_name, task_exchange, routing_key=queue_name)] class C(ConsumerMixin): def __init__(self, connection): self.connection = connection def get_consumers(self, Consumer, channel): return [ Consumer(task_queues, callbacks=[self.on_message], accept=['json']), ] def on_message(self, body, message): print("RECEIVED MESSAGE: %r" % (body, )) message.ack() with Connection('amqp://guest:guest@localhost:5672//') as conn: try: C = C(conn) C.run() except KeyboardInterrupt: print('bye bye')
from kombu import Connection from kombu import Exchange, Queue from kombu.mixins import ConsumerMixin name = 'simple_queue_publish' queue_name = 'queue2' task_exchange = Exchange(name, type='fanout') task_queues = [Queue(queue_name, task_exchange, routing_key=queue_name)] class C(ConsumerMixin): def __init__(self, connection): self.connection = connection def get_consumers(self, Consumer, channel): return [ Consumer(task_queues, callbacks=[self.on_message], accept=['json']), ] def on_message(self, body, message): print("RECEIVED MESSAGE: %r" % (body, )) message.ack() with Connection('amqp://guest:guest@localhost:5672//') as conn: try: C = C(conn) C.run() except KeyboardInterrupt: print('bye bye')
3.3 direct
publisher.py:
from kombu import Connection import datetime with Connection('amqp://guest:guest@localhost:5672//') as conn: simple_queue = conn.SimpleQueue('simple_queue') message = 'helloword, sent at %s' % datetime.datetime.today() simple_queue.put(message) print('Sent: %s' % message) simple_queue.close()
consumer.py:
from kombu import Connection from kombu import Exchange, Queue from kombu.mixins import ConsumerMixin task_exchange = Exchange('simple_queue', type='direct') task_queues = [Queue('simple_queue', task_exchange, routing_key='simple_queue')] class C(ConsumerMixin): def __init__(self, connection): self.connection = connection def get_consumers(self, Consumer, channel): return [ Consumer(task_queues, callbacks=[self.on_message], accept=['json']), ] def on_message(self, body, message): print("RECEIVED MESSAGE: %r" % (body, )) message.ack() with Connection('amqp://guest:guest@localhost:5672//') as conn: try: C = C(conn) C.run() except KeyboardInterrupt: print('bye bye')
3.4 publish
consumer1:
from kombu import Connection from kombu import Exchange, Queue from kombu.mixins import ConsumerMixin name = 'simple_queue_publish' queue_name = 'queue' task_exchange = Exchange(name, type='fanout') task_queues = [Queue(queue_name, task_exchange, routing_key=queue_name)] class C(ConsumerMixin): def __init__(self, connection): self.connection = connection def get_consumers(self, Consumer, channel): return [ Consumer(task_queues, callbacks=[self.on_message], accept=['json']), ] def on_message(self, body, message): print("RECEIVED MESSAGE: %r" % (body, )) message.ack() with Connection('amqp://guest:guest@localhost:5672//') as conn: try: C = C(conn) C.run() except KeyboardInterrupt: print('bye bye')
consumer2:
from kombu import Connection from kombu import Exchange, Queue from kombu.mixins import ConsumerMixin name = 'simple_queue_publish' queue_name = 'queue2' task_exchange = Exchange(name, type='fanout') task_queues = [Queue(queue_name, task_exchange, routing_key=queue_name)] class C(ConsumerMixin): def __init__(self, connection): self.connection = connection def get_consumers(self, Consumer, channel): return [ Consumer(task_queues, callbacks=[self.on_message], accept=['json']), ] def on_message(self, body, message): print("RECEIVED MESSAGE: %r" % (body, )) message.ack() with Connection('amqp://guest:guest@localhost:5672//') as conn: try: C = C(conn) C.run() except KeyboardInterrupt: print('bye bye')
publisher:
from kombu import Connection, Exchange, Producer exchange = Exchange('simple_queue_publish', type='fanout') with Connection('amqp://guest:guest@localhost:5672//') as conn: p = Producer(conn, exchange) p.publish('nihao ')
4 TODO Question
1.[ ] 一个queue能不能绑定到多个exchange中?