介绍
您可以选择以多种方式描述ZeroMQ;然而,它仍然是它真正的样子:一个真正出色的通信库,其丰富和成熟的功能集对开发人员非常有益。
在DigitalOcean ZeroMQ文章的第二部分中,在我们之前关于安装应用程序的文章之后,我们将深入了解其使用情况,并发现如何实际实现这个快速而强大的库。
** 注意:** 这篇文章构成了我们关于该主题的第二篇文章. 如果您有兴趣了解更多(即它是什么以及它如何与完整的消息经纪人进行比较),请在阅读本教程之前查看 ZeroMQ 介绍和安装方法。
关于
零子
ZeroMQ是一个用于实现应用程序和流程之间的消息和通信系统的库 - 快速和不同步。
如果您有其他应用程序消息解决方案(如RabbitMQ)的经验,了解ZeroMQ的确切位置可能有点困难。
当与一些更大的项目相比,这些项目提供所有必要的企业信息发送部件时,ZeroMQ仍然是一个轻量级和快速的工具来制作自己的。
此文章
虽然从技术上讲不是一个框架,考虑到其功能性和它解决任务的关键位置,您可以认为ZeroMQ是实现应用程序实际通信层的支柱。
在本文中,我们旨在为您提供一些示例,以激发您所能做的所有事情。
注: 我们将在我们的例子中使用Python语言及其经典解释器(Python C解释器)。安装必要的语言绑定后,您应该能够简单地翻译代码并使用您最喜欢的代码,而无需任何问题。
使用 ZeroMQ 编程
ZeroMQ作为一个图书馆通过跟踪某些网络通信模式来通过接口工作,它旨在非同步工作,这就是它的名字的MQ补丁的来源 - 从发送前的线条排队消息。
ZeroMQ 插槽类型
ZeroMQ在其接口的工作方式上有所不同. 与常规接口的同步工作方式不同,ZeroMQ的接口实现呈现无同步消息排列的抽象性
。
这些接口的工作方式取决于所选择的接口类型,而发送的消息流程取决于所选择的模式,其中有四种:
- 请求/回复模式: 用于发送请求并接收每一个发送的答案
- 发布/订阅模式: 用于将数据从单个过程(例如出版商)分发给多个收件人(例如订阅者)。
- 管道模式: 用于将数据分发到连接的节点
- ** 专属对模式:** 用于连接两个同行,形成一对
ZeroMQ运输类型
ZeroMQ为通信提供四种不同的运输方式,这些是:
- **在进程中(INPROC):**本地(在进程中)的通信传输
- **在进程中(IPC):**本地(在进程中)的通信传输
- **TCP:*使用TCP 的Unicast通信传输 **PGM:**使用PGM的多重传输通信
ZeroMQ 应用程序结构化
ZeroMQ的工作与典型的和传统的通信设置不同. 它可以链接的两侧(即服务器或客户端)连接并等待连接. 与标准接口不同,ZeroMQ通过知道连接可能发生的概念工作,因此,可以等待它完全好。
客户端 - 服务器结构
为了构建您的客户端和服务器代码,最好选择一个更稳定的 binding 侧,另一个(s)作为 connecting。
例子:
1Server Application Client Application
2---------------------[ < .. < .. < .. < .. ......................
3Bound -> Port:8080 Connects <- Port:8080
客户端 - 代理 - 服务器结构
为了解决由于通信的两端处于动态(因此不稳定)状态而引起的问题,ZeroMQ提供网络设备(即工具出盒子)。
- Streamer: 用于管道并行通信的流媒体设备
- ** 传输器:** 用于 pub/sub 通信的传输设备
- ** 传输器:** 请求/回复通信的传输设备
例子:
1Server App. Device | Forward Client App.
2 ............ > .. > . ]------------------[ < .. < .. .........
3 Connects 2 Port Binding Connects
编程示例
利用我们从上一节获得的知识,我们现在将开始利用它们来创建简单的应用程序。
** 注意:** 下面的例子通常包括同时运行的应用程序。例如,为了一个客户端/服务器设置工作,您需要同时运行客户端和服务器应用程序。 这样做的一种方法是使用 Linux 屏幕工具。 要了解更多,请参阅此 DigitalOceanTutorial。 要在 CentOS 系统上安装屏幕,请记住,您可以简单地运行: yum install -y screen
。
简单消息使用请求/回复模式
在应用程序之间的通信方面,请求 / 回复模式可能是绝对的经典,并给了我们一个很好的机会,从ZeroMQ的基本知识开始。
使用案例:
- 用于服务器和客户端之间的简单通信
- 检查信息并要求更新
- 发送 checks 和更新到服务器
- Echo 或 ping/pong 实现
使用的插槽类型(s):
- zmq.REP
- zmq.REQ
服务器类型: server.py
使用 nano(nano server.py
)创建一个server.py
,并粘贴下面的自我解释内容。
1import zmq
2
3# ZeroMQ Context
4context = zmq.Context()
5
6# Define the socket using the "Context"
7sock = context.socket(zmq.REP)
8sock.bind("tcp://127.0.0.1:5678")
9
10# Run a simple "Echo" server
11while True:
12 message = sock.recv()
13 sock.send("Echo: " + message)
14 print "Echo: " + message
当您完成编辑时,保存并通过按CTRL+X,然后按Y。
客户端示例: client.py
使用 nano(nano client.py
)创建一个client.py
,并粘贴下面的内容。
1import zmq
2import sys
3
4# ZeroMQ Context
5context = zmq.Context()
6
7# Define the socket using the "Context"
8sock = context.socket(zmq.REQ)
9sock.connect("tcp://127.0.0.1:5678")
10
11# Send a "message" using the socket
12sock.send(" ".join(sys.argv[1:]))
13print sock.recv()
当您完成编辑时,通过按CTRL+X,然后按Y来保存和退出。
** 注意:** 使用 ZeroMQ 库时,请记住,每条用于发送消息的线程(即 .send(..)
)都预计会随之出现 .recv(..)
。
使用
我们的server.py
设置为响应
应用程序,无论我们选择向它发送什么,它都会发送回来(例如Echo: _message_
).
使用您的 Python 解读器运行服务器:
1python server.py
在另一个窗口中,使用客户端应用程序发送消息:
1python client.py hello world!
2# Echo: hello world!
** 注意:** 要关闭服务器,您可以使用键组合: Ctrl+C
使用发布/订阅模式
在发布 / 订阅模式的情况下,ZeroMQ 用于建立一个或多个订阅者,连接到一个或多个出版商,并连续接收出版商发送的内容(或 seeds)。
选择指定前缀仅接受此类消息的选择,以此模式可用。
使用案例:
发布/订阅模式用于均匀地在不同消费者之间分发消息,自动更新分类表和新闻可以被视为使用此解决方案的可能区域。
使用的插槽类型(s):
- zmq.PUB
- zmq.SUB
出版商: pub.py
使用 nano(nano pub.py
)创建一个pub.py
,并粘贴下面的内容。
1import zmq
2import time
3
4# ZeroMQ Context
5context = zmq.Context()
6
7# Define the socket using the "Context"
8sock = context.socket(zmq.PUB)
9sock.bind("tcp://127.0.0.1:5680")
10
11id = 0
12
13while True:
14 time.sleep(1)
15 id, now = id+1, time.ctime()
16
17 # Message [prefix][message]
18 message = "1-Update! >> #{id} >> {time}".format(id=id, time=now)
19 sock.send(message)
20
21 # Message [prefix][message]
22 message = "2-Update! >> #{id} >> {time}".format(id=id, time=now)
23 sock.send(message)
24
25 id += 1
当您完成编辑时,通过按CTRL+X,然后按Y来保存和退出。
订阅实例: sub.py
使用 nano(nano sub.py
)创建一个sub.py
并粘贴下面的内容。
1import zmq
2
3# ZeroMQ Context
4context = zmq.Context()
5
6# Define the socket using the "Context"
7sock = context.socket(zmq.SUB)
8
9# Define subscription and messages with prefix to accept.
10sock.setsockopt(zmq.SUBSCRIBE, "1")
11sock.connect("tcp://127.0.0.1:5680")
12
13while True:
14 message= sock.recv()
15 print message
当您完成编辑时,通过按CTRL+X,然后按Y来保存和退出。
** 注意:** 使用 .setsockopt(..)
程序,我们正在订阅以接收开始于 string 1
的消息。
使用
我们的pub.py
设置为作为一个 publisher,发送两个不同的消息 - 同时 - 面向不同的订阅者。
运行编辑器发送消息:
1python pub.py
在另一个窗口中,请参见订阅内容的打印输出(即1
):
1python sub.py!
2# 1-Update! >> 1 >> Wed Dec 25 17:23:56 2013
** 注意:** 若要关闭订阅者和发布者应用程序,可以使用键组合: Ctrl+C
管道管道管道管道管道管道管道管道管道管道管道管道管道管道管道管道管道管道管道管道管道管道管道管道管道管道管道管道管道管道管道管道管道管道管道管道管道管道管道管道管道管道管道管道管道管道管(Push/Pull)
与发布/订阅模式非常相似,第三个线路管道模式是解决不同类型的问题的一种解决方案:按需求分发消息。
使用案例:
管道模式可以在需要路由(即在行中 pushed)的排列项目列表的情况下使用(即 pull 请求者)。
使用的插槽类型(s):
- zmq.PUSH
- zmq.PULL
举个例子: manager.py
使用 nano(nano manager.py
)创建一个manager.py
,并粘贴下面的内容。
1import zmq
2import time
3
4# ZeroMQ Context
5context = zmq.Context()
6
7# Define the socket using the "Context"
8sock = context.socket(zmq.PUSH)
9sock.bind("tcp://127.0.0.1:5690")
10
11id = 0
12
13while True:
14 time.sleep(1)
15 id, now = id+1, time.ctime()
16
17 # Message [id] - [message]
18 message = "{id} - {time}".format(id=id, time=now)
19
20 sock.send(message)
21
22 print "Sent: {msg}".format(msg=message)
manager.py
文件将作为一个 _task allocator。
PULL 示例: worker_1.py
使用 nano(nano worker_1.py
)创建一个worker_1.py
并粘贴下面的内容。
输入 zmq
1# ZeroMQ Context
2context = zmq.Context()
3
4# Define the socket using the "Context"
5sock = context.socket(zmq.PULL)
6sock.connect("tcp://127.0.0.1:5690")
7
8while True:
9 message = sock.recv()
10 print "Received: {msg}".format(msg=message)
文件 worker_1.py
将作为一个 task processes (消费者/工人)。
使用
我们的 manager.py
设置为具有任务分配器(即管理员)的角色, **PUSH**ing 项目. 同样,worker_1.py
设置为作为一个 worker 实例工作,接收这些项目,当完成处理时 **PULL**ing 列表。
运行编辑器发送消息:
1python manager.py
在另一个窗口中,请参见订阅内容的打印输出(即1
):
1python worker_1.py!
2# 1-Update! >> 1 >> Wed Dec 25 17:23:56 2013
** 注意:** 若要关闭订阅者和发布者应用程序,可以使用键组合: Ctrl+C
独特的双重模式
专属对模式意味着并允许使用zmq/PAIR
接口类型建立一种类型的通信渠道。
标签: bind.py
使用 nano(nano bind.py
)创建一个bind.py
,并粘贴下面的内容。
1import zmq
2
3# ZeroMQ Context
4context = zmq.Context()
5
6# Define the socket using the "Context"
7socket = context.socket(zmq.PAIR)
8socket.bind("tcp://127.0.0.1:5696")
当您完成编辑时,通过按CTRL+X,然后按Y来保存和退出。
连接示例: connect.py
使用 nano(nano connect.py
)创建一个connect.py
,并粘贴下面的内容。
1import zmq
2
3# ZeroMQ Context
4context = zmq.Context()
5
6# Define the socket using the "Context"
7socket = context.socket(zmq.PAIR)
8socket.connect("tcp://127.0.0.1:5696")
当您完成编辑时,通过按CTRL+X,然后按Y来保存和退出。
使用
您可以使用上述示例创建任何双向单连接通信应用程序。
** 注意:** 要关闭,也可以使用键组合: Ctrl+C