前提条件
兔子
使用RabbitMQ发送和接收消息,只有在安装和配置软件后才能工作。 How To Install and Manage RabbitMQ详细解释了如何让RabbitMQ工作,是使用此消息经纪人的一个很好的起点。
Puka Python 图书馆
本文中的所有示例都使用Python语言来备份处理AMQP消息协议的puka库。Python已被选为一个清洁易懂的语言,以便简洁的演示,但由于AMQP是一个广泛采用的协议,任何其他编程语言都可以自由使用以实现类似的目标。
puka可以通过Pip(Python包管理器)快速安装。
1pip install puka
pip 并不总是与 Linux 发行版相结合,在基于 Debian 的发行版(包括 Ubuntu)上,它可以通过:
1apt-get install python-pip
基于RHEL,如CentOS:
1yum install python-setuptools
2easy_install pip
介绍RabbitMQ及其术语
Messaging(特别是RabbitMQ)介绍了一些描述消息经纪商及其机制的基本原则的术语。
- ** 制作人** 是发送_send_ messages的一方,因此正在生成一条消息。
- ** 消费者** 是收到_收到_消息的一方,因此收到信息很耗钱.
- ue**是一个缓冲器,发送信件被存储并准备接收. 单个队列可以持有多少信件没有限制. 对于有多少生产者可以向排队者发送信息,以及有多少消费者可以试图获取信息,也没有任何限制。 当一个消息击中了已有的队列时,它会在那里等待被消费者访问该特定队列所消耗. 当一个消息击中一个不存在的队列时,它会被丢弃.
- ** Exchange** 是一个位于生产者和队列之间的实体. 制作方从不直接向队列发送消息. 它向交换机发送消息,而交换机又依次将消息放入一个或多个队列,这取决于所使用的交换. 用现实生活的比喻,交换就像邮差: 它可以处理消息,以便传递到适当的队列(mailbox),消费者可以从中收集信息.
- ** Binding** 是队列和交换之间的连接. 与某一交易所挂钩的队列由交易所提供。 如何确切地取决于交换本身。 (英语)
所有五个术语将在整个文本中使用。还有一个,与puka python库密切相关,它是由于其清晰度而被选为选择的库。 这是一个 promise,可以理解为对AMQP服务器的同步请求,这保证了请求的执行(成功或不成功),客户端等待完成。
虽然puka可以非同步工作,但在我们的示例中,puka将被用作同步库,这意味着在每个请求(承诺)之后,puka将等到执行,然后再进入下一步。
测试RabbitMQ和Puka用一个简单的例子
要测试消息经纪人和puka是否完美工作,并了解发送和接收消息在实践中是如何工作的,请创建一个名为rabbit_test.py
的样本 Python 脚本。
1vim rabbit_test.py
并插入脚本内容:
1import puka
2
3# declare send and receive clients, both connecting to the same server on local machine
4producer = puka.Client("amqp://localhost/")
5consumer = puka.Client("amqp://localhost/")
6
7# connect sending party
8send_promise = producer.connect()
9producer.wait(send_promise)
10
11# connect receiving party
12receive_promise = consumer.connect()
13consumer.wait(receive_promise)
14
15# declare queue (queue must exist before it is being used - otherwise messages sent to that queue will be discarded)
16send_promise = producer.queue_declare(queue='rabbit')
17producer.wait(send_promise)
18
19# send message to the queue named rabbit
20send_promise = producer.basic_publish(exchange='', routing_key='rabbit', body='Droplet test!')
21producer.wait(send_promise)
22
23print "Message sent!"
24
25# start waiting for messages, also those sent before (!), on the queue named rabbit
26receive_promise = consumer.basic_consume(queue='rabbit', no_ack=True)
27
28print "Starting receiving!"
29
30while True:
31 received_message = consumer.wait(receive_promise)
32 print "GOT: %r" % (received_message['body'],)
33 break
按 :wq 保存文件并停止。
运行脚本时应该打印由脚本发送到 RabbitMQ队列的消息,因为测试程序会接收消息。
1root@rabbitmq:~# python rabbit_test.py
2Message sent!
3Starting receiving!
4GOT: 'Droplet test!'
5root@rabbitmq:~#
为了解释这个代码中发生了什么,让我们一步一步进行:
- 消费者和生产者都被创建并连接到同一个RabbitMQ服务器,居住在
localhost
2 生产者声明一个队列,以确保它在消息被生成时存在。如果它不是为此步骤,队列可能会不存在,因此消息可以立即被丢弃。 3 生产者将消息发送到一个 nameless exchange_ (更多在交易所后会出现) 使用一条路由键,指定先创建的队列。 之后,消息到达了交易所,这反过来会把它放在rabbit
队列中。 然后消息一直在那里,直到有人将其消耗。 4 消费者访问了bbrait
队列并开始接收存储在那里的消息。
粉丝交换
在上面的例子中,一个无名交换器被用来将消息交付到一个名为兔子
的特定队列中。
在 RabbitMQ中也有其他类型的交易,其中之一是 fanout,我们本文的主要关注点。 Fanout 交易是一个简单的,盲目的工具,它向 ALL的队列发送消息,它知道。在 fanout 交易中,没有必要(事实上 - 这是不可能的)提供特定的队列名称。打击此类交易的消息在发送到所有连接到交易之前的队列。
发布 / 订阅模式
借助 fanout 交流,我们可以轻松创建一个 publish/subscribe 模式,像是向所有新闻稿开放的模式。 制作人,一个新闻稿发送者,向其可能甚至不知道的受众发送定期消息(发送消息并发送到新闻稿 fanout 交换处)。 新订阅者申请新闻稿(将自己的队列连接到同一新闻稿 fanout)。
虽然一个对一个的消息是相当简单的,开发人员经常使用其他通信手段,一个对许多(其中许多
是未定义的,可以是 few 和 lots 之间的任何东西)是一个非常受欢迎的场景,在其中一个消息经纪人可以非常有帮助。
编写生产者应用程序
生产者应用程序的唯一作用是创建一个名为fanout的交换,并生成定期消息(每隔几秒钟)给该交换。在现实生活的场景中,消息将被生成一个原因。
创建一个名为newsletter_produce.py
的 Python 脚本
1vim newsletter_produce.py
并插入脚本内容:
1import puka
2import datetime
3import time
4
5# declare and connect a producer
6producer = puka.Client("amqp://localhost/")
7connect_promise = producer.connect()
8producer.wait(connect_promise)
9
10# create a fanout exchange
11exchange_promise = producer.exchange_declare(exchange='newsletter', type='fanout')
12producer.wait(exchange_promise)
13
14# send current time in a loop
15while True:
16 message = "%s" % datetime.datetime.now()
17
18 message_promise = producer.basic_publish(exchange='newsletter', routing_key='', body=message)
19 producer.wait(message_promise)
20
21 print "SENT: %s" % message
22
23 time.sleep(1)
24
25producer.close()
让我们以示例一步一步来解释代码中发生的事情。
- 生产者客户端被创建并连接到本地 RabbitMQ 实例. 从现在开始,它可以自由地与 RabbitMQ 通信.
- 创建了一个名为
新闻通讯
的对外交换。 在此步骤之后,该交换存在于 RabbitMQ 服务器上,可以用来连接队列并通过其发送消息。 - 在一个无尽的循环中,与当前时间的消息被生成到
新闻通讯
交换。 请注意,routing_key
是空的,这意味着没有特定的队列指定。 这是交换将进一步传递消息给正确的队列。
该应用程序在运行时通知所有报纸订阅者当前时间。
写消费者应用程序
消费者应用程序将创建一个临时的队列,并将其绑定到一个命名的偏差交换。之后,它将开始等待消息。连接到交换的队列后,由之前创建的生产商发送的每一个消息都将被这个消费者接收。
创建一个名为newsletter_consume.py
的 Python 脚本
1vim newsletter_consume.py
并插入脚本内容:
1import puka
2
3# declare and connect a consumer
4consumer = puka.Client("amqp://localhost/")
5connect_promise = consumer.connect()
6consumer.wait(connect_promise)
7
8# create temporary queue
9queue_promise = consumer.queue_declare(exclusive=True)
10queue = consumer.wait(queue_promise)['queue']
11
12# bind the queue to newsletter exchange
13bind_promise = consumer.queue_bind(exchange='newsletter', queue=queue)
14consumer.wait(bind_promise)
15
16# start waiting for messages on the queue created beforehand and print them out
17message_promise = consumer.basic_consume(queue=queue, no_ack=True)
18
19while True:
20 message = consumer.wait(message_promise)
21 print "GOT: %r" % message['body']
22
23consumer.close()
消费者代码比生产商更为复杂,让我们逐步研究一下:
- 消费者客户端被创建并连接到本地 RabbitMQ 实例。
2 创建了一个临时的队列。 临时意味着没有提供的名称 - 队列名称将由 RabbitMQ 自动生成。 此外,在客户端断开连接后,这样的队列将被摧毁。 这是创建只存在于交易所之一而没有其他特殊目的的队列的一种常见方式。 由于需要创建队列来接收任何东西,这是避免思考队列名称的方便方法。
3 创建的队列将被束缚在
新闻邮件
交换上。 从那一刻起,激光交换将向该队列传递每个消息。 4 在无休止的循环中,消费者在
该应用程序在运行时会收到新闻稿发行商的时间通知,可以同时执行多个次,并且该应用程序的每个实例都会收到当前时间。
测试两种应用程序
要测试电子邮件发行商及其消费者,打开多个SSH会话到虚拟服务器(或打开多个终端窗口,如果在本地计算机上工作)。
1root@rabbitmq:~# python newsletter_produce.py
它将开始显示每秒当前时间:
1SENT: 2014-02-11 17:24:47.309000
2SENT: 2014-02-11 17:24:48.310000
3SENT: 2014-02-11 17:24:49.312000
4SENT: 2014-02-11 17:24:50.316000
5...
在每个其他窗口中,运行消费者应用程序:
1root@rabbitmq:~# python newsletter_consume.py
此应用程序的每个实例都会收到由生产商发送的时间通知:
1GOT: 2014-02-11 17:24:47.309000
2GOT: 2014-02-11 17:24:48.310000
3GOT: 2014-02-11 17:24:49.312000
4GOT: 2014-02-11 17:24:50.316000
5...
这意味着RabbitMQ正确地注册了反馈交易所,将订阅者队列绑定到该交易所,并将发送的消息发送到适当的队列。
进一步阅读
发布/订阅是一个简单的(概念和实施)消息模式,通常可能非常有用;但它没有接近RabbitMQ的限制,但有无数方法可以使用RabbitMQ来解决消息问题,包括先进的消息路由、消息承认、安全或坚持。
本文的主要目的是通过简单的示例介绍基本的消息传递概念,许多其他用途在官方的RabbitMQ文档(http://www.rabbitmq.com/documentation.html)中详细介绍,这对于RabbitMQ用户和管理员来说是一个很好的资源。