介绍
非同步或非封锁处理是将某些任务的执行与程序的主要流程分开的方法,这为您提供了几个优点,包括允许用户面向的代码不间断地运行。
消息传递是一种程序组件可以使用的通信和信息交换的方法,可以同步或非同步实现,并可以允许分散的流程无问题地进行通信。
Celery是建立在非同步消息传递系统上的任务队列,可以用作一个库,在那里可以抛弃编程任务。通过任务的程序可以继续执行并响应性地运作,然后稍后,它可以进行调查,看看计算是否完成,并获取数据。
虽然Celery是用Python编写的,但其协议可以在任何语言中实现,甚至可以通过Webhooks与其他语言合作。
通过将工作队列部署到您的程序环境中,您可以轻松卸载任务,并继续处理用户的互动。
在本指南中,我们将安装和实施使用RabbitMQ作为Ubuntu 12.04 VPS上的消息系统的子工作队列。
安装组件
安装电池
Celery 是用 Python 编写的,因此,它很容易安装,就像我们处理常规 Python 包一样。
我们将遵循处理Python包的建议程序,创建一个虚拟环境来安装我们的消息系统,这有助于我们保持我们的环境稳定,而不会影响更大的系统。
从 Ubuntu 的默认存储库中安装 Python 虚拟环境包:
1sudo apt-get update
2sudo apt-get install python-virtualenv
我们将创建一个消息目录,在那里我们将部署我们的系统:
1mkdir ~/messaging
2cd ~/messaging
现在我们可以使用以下命令创建一个虚拟环境,在那里我们可以使用以下命令来安装子:
1virtualenv --no-site-packages venv
随着虚拟环境的配置,我们可以通过键入来激活它:
1source venv/bin/activate
您的提示将更改,以反映您现在在上面所做的虚拟环境中运作,这将确保我们的Python包安装在本地,而不是全球。
如果在任何时候我们需要关闭环境(现在不是),您可以键入:
1deactivate
现在,我们已经激活了环境,我们可以用管道安装子:
1pip install celery
安装RabbitMQ
Celery 需要一个消息代理来处理来自外部来源的请求,这个代理被称为经纪人
。
经纪人可以选择几个选项,包括关系数据库,NoSQL数据库,关键价值存储和实际消息系统。
我们将配置谷物以使用RabbitMQ消息传输系统,因为它提供强大,稳定的性能,并与谷物互动良好,这是一个很好的解决方案,因为它包括与我们预期使用的功能。
我们可以通过 Ubuntu 存储库安装 RabbitMQ:
1sudo apt-get install rabbitmq-server
RabbitMQ服务在我们的服务器上安装时自动启动。
创建一个 Celery 实例
为了使用Celery的任务排队功能,安装后的第一步必须是创建Celery实例,这是导入包、创建应用
的简单过程,然后设置Celery在背景下能够执行的任务。
让我们在我们的消息目录中创建一个名为tasks.py
的Python脚本,在那里我们可以定义我们的员工可以执行的任务。
1sudo nano ~/messaging/tasks.py
我们应该做的第一件事是从菜包中导入Celery函数:
1from celery import Celery
之后,我们可以创建一个连接到默认 RabbitMQ 服务的 celery 应用程序实例:
1from celery import Celery
2
3app = Celery('tasks', backend='amqp', broker='amqp://')
Celery
函数的第一个论点是将任务预先配置为识别它们的名称。
后端参数是一个可选的参数,如果您想要查询背景任务的状态或检索其结果。
如果你的任务只是执行某些工作,然后停止的函数,而不会返回在程序中使用的有用值,则可以将此参数排除在外。
经纪人
参数指定了连接到我们的经纪人所需的URL。在我们的情况下,这是在我们的服务器上运行的RabbitMQ服务。RabbitMQ使用一个名为amqp
的协议运作。如果RabbitMQ在其默认配置下运作,那么cellery可以连接到除amqp://
计划以外的任何其他信息。
创建Celery任务
仍然在这个文件中,我们现在需要添加我们的任务。
每个瓷砖任务都必须与装饰器@app.task
一起介绍,这使瓷砖能够识别它可以添加其排队功能的功能。
我们的第一个任务将是一个简单的函数,将一个字符串打印到控制台。
1from celery import Celery
2
3app = Celery('tasks', backend='amqp', broker='amqp://')
4
5@app.task
6def print_hello():
7 print 'hello there'
由于此功能不会返回任何有用的信息(相反,它将其打印到控制台),我们可以告诉celery不要使用后端来存储有关该任务的状态信息。
from celery import Celery
app = Celery('tasks', backend='amqp', broker='amqp://')
@app.task(ignore_result=True)
def print_hello():
print 'hello there'
接下来,我们将添加另一个函数,该函数将生成原始数(从 RosettaCode中获取)。
1from celery import Celery
2
3app = Celery('tasks', backend='amqp', broker='amqp://')
4
5@app.task(ignore_result=True)
6def print_hello():
7 print 'hello there'
8
9@app.task
10def gen_prime(x):
11 multiples = []
12 results = []
13 for i in xrange(2, x+1):
14 if i not in multiples:
15 results.append(i)
16 for j in xrange(i*i, x+1, i):
17 multiples.append(j)
18 return results
因为我们关心这个函数的返回值是什么,因为我们想知道它什么时候完成(以便我们可以使用结果等),我们不会将ignore_result
参数添加到这个第二个任务中。
保存并关闭文件。
开始Celery工人流程
我们现在可以启动一个工人流程,它将能够接受来自应用程序的连接,它将使用我们刚刚创建的文件来了解它可以执行的任务。
启动一个 worker 实例就像用 celery 命令调用应用程序名称一样简单,我们将在字符串的末尾添加一个&
字符来将我们的 worker 进程置于背景中:
1celery worker -A tasks &
这将启动应用程序,然后将其从终端分离,允许您继续使用它用于其他任务。
如果您想要启动多个工人,则可以通过用-n
参数命名每个工人:
1celery worker -A tasks -n one.%h &
2celery worker -A tasks -n two.%h &
当员工被命名时,%h
将被主机名取代。
您可以使用杀死命令来阻止工人,我们可以查询过程ID,然后根据此信息删除工人。
1ps auxww | grep 'celery worker' | awk '{print $2}' | xargs kill
这将允许员工在离开之前完成当前任务。
如果您想关闭所有员工,而不等他们完成任务,您可以执行:
1ps auxww | grep 'celery worker' | awk '{print $2}' | xargs kill -9
使用队列来处理工作
我们可以使用我们生长的工人过程(s)来完成我们的程序的背景工作。
不要创建一个完整的程序来展示它是如何工作的,我们将探索Python解释器中的不同选项:
1python
在快递中,我们可以将我们的功能导入环境:
1from tasks import print_hello
2from tasks import gen_prime
如果你测试这些函数,它们似乎没有任何特殊的功能。第一个函数按预期打印一条行:
1print_hello()
1hello there
第二个函数返回一个数列表:
1primes = gen_prime(1000)
2print primes
如果我们给第二个函数一个更大范围的数字来检查,执行在它计算时会悬挂:
1primes = gen_prime(50000)
通过键入CTRL-C
来停止执行,这个过程显然不是在背景下计算的。
要访问背景工作者,我们需要使用 .delay
方法. Celery 将我们的功能包装成额外的功能. 这种方法用于将函数传递给一个工作者执行。
1primes = gen_prime.delay(50000)
由于我们为我们的应用程序配置了后端
参数,我们可以检查计算的状态并获得结果。
要检查任务是否完成,我们可以使用 .ready
方法:
1primes.ready()
1False
False
值意味着任务仍在运行,结果尚未可用,当我们得到 True
值时,我们可以对答案做一些事情。
1primes.ready()
1True
我们可以通过使用.get
方法获取值。
如果我们已经验证了该值是用 .ready
方法计算的,那么我们可以使用该方法如下:
1print primes.get()
1[2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97, 101, 103, 107, 109, 113, 127, 131, 137, 139, 149, 151, 157, 163, 167, 173, 179, 181, 191, 193, 197, 199, 211, 223, 227, 229, 233, 239, 241, 251, 257, 263, 269, 271, 277, 281, 283, 293, 307, 311, 313, 317, 331, 337, 347, 349, 353, 359, 367, 373, 379, 383, 389, 397, 401, 409, 419, 421, 431, 433, 439, 443, 449, 457, 461, 463, 467, 479, 487, 491, 499, 503, 509, 521, 523,
2. . .
但是,如果您在调用.get 之前没有使用.ready 方法,那么您很可能想要添加一个 timeout
选项,以便您的程序不被迫等待结果,这会打败我们的实现的目的:
1print primes.get(timeout=2)
这将引发一个例外,如果时间过时,你可以在你的计划中处理。
结论
虽然这是足够的信息,让您开始在您的程序中使用,但它只是在这个库的全部功能的表面扫描。
虽然Celery是用Python写的,但它可以通过Webhooks与其他语言一起使用,这使得它非常灵活地将任务移动到背景中,无论您选择的语言如何。