如何使用 RabbitMQ 和 Puka Python 库根据路由密钥交付消息

开始


前提条件


这个文本是如何使用RabbitMQ和Python的Puka来交付消息给多个消费者(https://www.digitalocean.com/community/articles/how-to-use-rabbitmq-and-python-s-puka-to-deliver-messages-to-multiple-consumers)的延续,并需要相同的软件包装并正常运行。同样,相同的定义在整个文章中使用,我们假定读者熟悉从前文的主题。

交换


我们已经描述了fanout交换,它向与该交换相关的每一个队列发送消息,没有额外的规则。 这是一个非常有用的机制,但缺乏灵活性。 通常不愿收到生产者发送给交易所的一切信息。 RabbitMQ 提供两种不同的交换类型,可以用来实施更复杂的场景。

◎直接交换


介绍


直接交换提供了简单的,基于密钥的路由机制在 RabbitMQ . 它有点类似于在第一例中使用的无名交换,其中一个消息被交付到等于消息的路由密钥的名列。

在使用直接交换时,每个发送给该交换机的消息都必须有指定的路由密钥,即任意的名称字符串,例如 Texas

基本的无名交换和直接交换之间的最大区别在于,后者需要绑定,并且在此之前没有对该交换的消息进行排队。

  1. 一个 队列可以被绑定,以便在同一个交易所听到 ** 多个** 不同的路由密钥 2 ** 一个** 队列可以被绑定,以便同时听到 ** 多个** 不同的交易所 3 ** 许多** 队列可以被绑定,以便在交易所上听到 ** 同一个** 路由密钥

让我们想象一个大城市中心:一个铁路和公交车站,有许多目的地可以通过两种交通方式到达,并想象该站希望使用 RabbitMQ 发送出发通知,任务是告知所有感兴趣的巴士或火车前往 SeattleTooeleBoston 即将离开。

这样一个程序会定义一个直接的分离交换,所有感兴趣的客户可以订阅他们的队列,然后包含出发时间的消息将被生成到该交换与包含目的地的路由密钥。

  1. 与路由键Tooele和身体2014-01-03 15:23交换给Boston和身体2014-01-03 15:41的信息 2. 与路由键Seattle和身体2014-01-03 15:55交换给Seattle的信息

由于一个队列可以同时绑定到许多路由密钥,并且许多队列可以绑定到同一个密钥,我们可以很容易地有:

一位客户只对 Tooele 感兴趣 2. 一位客户只对 Boston 感兴趣 3. 另一位客户同时对 TooeleBoston 感兴趣

每个人都在等待信息,同时他们会通过我们的直接交流收到适当的消息。

制作人


为了简化任务稍微为例,让我们写一个基本的通知发送器,它将接受一个命令行参数,它将指定目的地,应用程序将发送当前时间给所有感兴趣的消费者。

创建一个名为direct_notify.py的样本 Python 脚本

1vim direct_notify.py

并插入脚本内容:

 1import puka
 2import datetime
 3import time
 4import sys
 5
 6# declare and connect a producer
 7producer = puka.Client("amqp://localhost/")
 8connect_promise = producer.connect()
 9producer.wait(connect_promise)
10
11# create a direct exchange named departures
12exchange_promise = producer.exchange_declare(exchange='departures', type='direct')
13producer.wait(exchange_promise)
14
15# send current time to destination specified with command line argument
16message = "%s" % datetime.datetime.now()
17
18message_promise = producer.basic_publish(exchange='departures', routing_key=sys.argv[1], body=message)
19producer.wait(message_promise)
20
21print "Departure to %s at %s" % (sys.argv[1], message)
22
23producer.close()

:wq 保存文件并停止。

运行一个参数的脚本应该打印当前时间和使用的目的地。

1root@rabbitmq:~# python direct_notify.py Tooele
2Departure to Tooele at 2014-02-18 15:57:29.035000
3root@rabbitmq:~#

让我们一步一步地通过脚本:

  1. 联合国 制作者 客户端创建并与本地RabbitMQ实例连接。 从现在开始它可以自由与兔子MQ通信.
  2. 联合国 建立了一个名为 " 离境 " 的直接交易所。 它不需要在创建时指定的路由密钥,因为向该交换所发布的任何消息都可以有不同的密钥指定. 在该步骤之后,交换就存在于RabbitMQ服务器上,可以用来将队列绑定到它,并通过它发送消息. 3个 包含当前时间的信息会被发布到该交换站,使用命令行参数作为路由键. 在样本运行中,Tooele被用作参数,因此被用作出发目的地-出行方向键. .

注意: 为了简单,脚本不会检查是否提供强制性的命令行参数!如果没有参数执行,它不会正常工作。

◎消费者


此示例消费者应用程序将作为公共交通客户感兴趣的一个或多个目的地可从火车站到达。

创建一个名为direct_watch.py的样本 Python 脚本

1vim direct_watch.py

并插入脚本内容:

 1import puka
 2import sys
 3
 4# declare and connect a consumer
 5consumer = puka.Client("amqp://localhost/")
 6connect_promise = consumer.connect()
 7consumer.wait(connect_promise)
 8
 9# create temporary queue
10queue_promise = consumer.queue_declare(exclusive=True)
11queue = consumer.wait(queue_promise)['queue']
12
13# bind the queue to all routing keys specified by command line arguments
14for destination in sys.argv[1:]:
15    print "Watching departure times for %s" % destination
16    bind_promise = consumer.queue_bind(exchange='departures', queue=queue, routing_key=destination)
17    consumer.wait(bind_promise)
18
19# start waiting for messages on the queue created beforehand and print them out
20message_promise = consumer.basic_consume(queue=queue, no_ack=True)
21
22while True:
23    message = consumer.wait(message_promise)
24    print "Departure for %s at %s" % (message['routing_key'], message['body'])
25
26consumer.close()

:wq 保存文件并停止。

使用一个参数 Tooele 运行脚本应该宣布,该脚本会显示 Tooele 的出发时间,而使用多个参数运行该脚本应该宣布观看许多目的地的出发时间。

1root@rabbitmq:~# python direct_watch.py Tooele
2Watching departure times for Tooele
3(...)
4root@rabbitmq:~# python direct_watch.py Tooele Boston
5Watching departure times for Tooele
6Watching departure times for Boston
7(...)
8root@rabbitmq:~#

让我们一步一步地通过脚本来解释它所做的事情:

  1. 联合国 **客户端创建并连接到本地的RabitMQ实例。 从现在开始它可以自由与兔子MQ通信.
  2. 联合国 为这个特定的消费者创建一个临时的队列,由RabbitMQ自动生成名称. 剧本结束后会销毁队列 。 3个 队列被绑定在使用命令行参数指定的所有路由密钥(去向)上的所有"去向"交换上,在每个屏幕上打印以获取信息. 4.四. 脚本开始等待队列上的消息. 它应接收与绑定的路由密钥相匹配的所有信件。 当以 _Tooele_作为单一参数运行时——只有这些参数,在两个参数上同时运行 _Tooele_和 _Boston_时. 每次出发时间都会被打印在屏幕上. .

测试


要检查两种脚本是否按预期工作,请打开三个终端窗口到服务器上,其中一个将用作公共交通站发送通知,另外两个将作为客户等待出发。

在第一个终端中,运行direct_notify.py脚本一次使用任何参数:

1root@rabbitmq:~# python direct_notify.py Tooele
2Departure to Tooele at 2014-02-18 15:57:29.035000
3root@rabbitmq:~#

重要: direct_notify.py脚本必须在任何消费者之前至少执行一次,因为交易所必须在连接到它之前创建。

在第二个终端中,运行direct_watch.py脚本,使用一个参数 - Tooele:

1root@rabbitmq:~# python direct_watch.py Tooele
2Watching departure times for Tooele
3(...)
4root@rabbitmq:~#

在第三个终端中,运行direct_watch.py脚本,使用两个参数 - TooeleBoston:

1root@rabbitmq:~# python direct_watch.py Tooele Boston
2Watching departure times for Tooele
3Watching departure times for Boston
4(...)
5root@rabbitmq:~#

然后,回到第一个终端,发送三个出发通知,一个发送到 Tooele,一个发送到 Boston,一个发送到 Chicago:

1root@rabbitmq:~# python direct_notify.py Tooele
2Departure to Tooele at 2014-02-18 15:57:29.035000
3root@rabbitmq:~# python direct_notify.py Boston
4Departure to Tooele at 2014-02-18 15:57:31.035000
5root@rabbitmq:~# python direct_notify.py Chicago
6Departure to Tooele at 2014-02-18 15:57:35.035000
7root@rabbitmq:~#

第一条通知只应由两位消费者接收,等待到Tooele的出发;第二条通知只应接收到消费者等待到波士顿的出发;第三条通知不应由这些消费者接收,因为他们都没有等待到芝加哥的出发。

这些简单的示例说明了如何发送信息,只有路由密钥指定的特定消费者才能接收。

继续阅读


直接路由不提供完全控制消息将交付到哪里,但是从以前的交换中使用的fanout交换的一个大步,它盲目地在任何地方交付消息。

本文的主要目的是使用简单的现实世界的情况来引入基本的直接路由,许多其他用途都详细介绍在官方的RabbitMQ文档中(http://www.rabbitmq.com/documentation.html),这对于RabbitMQ用户和管理员来说是一个很好的资源。

Article Submitted by: Mateusz Papiernik
Published At
Categories with 技术
Tagged with
comments powered by Disqus