如何在 Ubuntu 20.04 上使用 Python 中的 Luigi 构建数据处理管道

_ 作者选择了 免费和开源基金作为 写给捐款计划的一部分接受捐款。

介绍

Luigi是管理长期运行 batch processingPython 套件,它可以自动运行数据处理任务,用于一批项目。 Luigi 允许您将数据处理任务定义为一组依赖任务。例如,任务 B 取决于任务 A 的输出,任务 D 取决于任务 B 和任务 C 的输出。

总的来说,Luigi为开发和管理数据处理管道提供了一个框架。 它最初由Spotify开发,后者利用它共同管理管道,收集需要从各种来源获取和处理数据的任务. 在Luigi内部,Spotify的开发者构建了功能来帮助其批量处理需要,包括处理故障,自动解决任务之间的依赖性的能力,以及任务处理的可视化. Spotify使用Luigi支持批量处理工作,包括向用户提供音乐建议,充斥内部仪表板,并计算出顶级歌曲列表.

在本教程中,您将构建一个数据处理管道来分析最流行的书籍中最常见的单词在 Gutenberg 项目. 要做到这一点,您将使用 Luigi 包构一个管道。

前提条件

要完成本教程,您将需要以下内容:

  • 与具有 " sudo " 权限的非根用户建立的Ubuntu服务器。 遵循[初始服务器设置与Ubuntu 20.04] (https://andsky.com/tech/tutorials/initial-server-setup-with-ubuntu-20-04)指南.
  • Python 3.6或以上并安装了Virtualenv'。 跟随 [如何在 Ubuntu 20.04 上安装 Python 3 并设置本地编程环境(https://andsky.com/tech/tutorials/how-to-install-python-3-and-set-up-a-programming-environment-on-an-ubuntu-20-04-server),以配置 Python并安装 virtualenv` 。 您将在此教程中设置环境和项目文件夹 。 .

步骤 1 - 安装 Luigi

在此步骤中,您将为您的 Luigi 安装创建一个干净的沙盒环境。

首先,创建一个项目目录. 为此教程 luigi-demo:

1mkdir luigi-demo

导航到新创建的luigi-demo目录:

1cd luigi-demo

创建一个新的虚拟环境luigi-venv:

1python3 -m venv luigi-venv

并激活新创建的虚拟环境:

1. luigi-venv/bin/activate

您会发现(luigi-venv)附在您的终端提示的前面,以表示哪个虚拟环境是活跃的:

1[secondary_label Output]
2(luigi-venv) username@hostname:~/luigi-demo$

对于本教程,你需要三个库:luigi,beautifulsoup4requestsrequests库简化了制作HTTP请求;你将使用它来下载Gutenberg项目书籍列表和分析书籍。

运行以下命令以使用 pip 安装这些库:

1pip install wheel luigi beautifulsoup4 requests

您将收到一个答复,确认安装了图书馆的最新版本及其所有依赖:

1[secondary_label Output]
2Successfully installed beautifulsoup4-4.9.1 certifi-2020.6.20 chardet-3.0.4 docutils-0.16 idna-2.10 lockfile-0.12.2 luigi-3.0.1 python-daemon-2.2.4 python-dateutil-2.8.1 requests-2.24.0 six-1.15.0 soupsieve-2.0.1 tornado-5.1.1 urllib3-1.25.10

您已经为您的项目安装了依赖性,现在,您将继续构建您的第一个Luigi任务。

第2步:创建一个Luigi任务

在此步骤中,您将创建一个Hello World Luigi 任务来展示它们的工作方式。

Luigi task 是您管道的执行以及每个任务的输入和输出依赖的定义所发生的地方。

  • 具有执行任务的逻辑的 run() 方法. * 返回任务生成的文物的 output() 方法. run() 方法填充了这些文物. * 一个可选的 input() 方法,返回您管道中的任何需要执行当前任务的额外任务。

创建一个新的文件 hello-world.py:

1nano hello-world.py

现在将以下代码添加到您的文件中:

 1[label hello-world.py]
 2import luigi
 3
 4class HelloLuigi(luigi.Task):
 5
 6    def output(self):
 7        return luigi.LocalTarget('hello-luigi.txt')
 8
 9    def run(self):
10        with self.output().open("w") as outfile:
11            outfile.write("Hello Luigi!")

您可以通过添加 luigi.Task mixin来定义 HelloLuigi() 是 Luigi 任务。

输出()方法定义了任务产生的一个或多个目标输出,在本示例中,您定义了luigi.LocalTarget,这是一个本地文件。

<美元 > [注] 注: 路易吉允许您连接到各种常见数据源,包括[AWS S3桶] (https://luigi.readthedocs.io/en/stable/api/luigi.contrib.s3.html),[MongoDB数据库] (https://luigi.readthedocs.io/en/stable/api/luigi.contrib.mongodb.html),以及[SQL数据库] (https://luigi.readthedocs.io/en/stable/api/luigi.contrib.sqla.html). 您可以在 [Luigi docs] (https://luigi.readthedocs.io/en/stable/api/luigi.contrib.html) 中找到完整的支持数据源列表. < $ > (美元)

对于这个例子,你在写模式中打开).open()作为输出文件:`,并在)中写入它。

要执行您创建的任务,请运行以下命令:

1python -m luigi --module hello-world HelloLuigi --local-scheduler

在这里,你使用python -m来执行任务,而不是直接执行luigi命令;这是因为Luigi只能执行当前PYTHONPATH内的代码。

1PYTHONPATH='.' luigi --module hello-world HelloLuigi --local-scheduler

使用「--模块 Hello-world HelloLuigi」旗帜,你告诉Luigi哪个Python模块和Luigi任务要执行。

「--本地計劃者」旗告訴Luigi不要連接到Luigi計劃器,而是本地執行這個任務(我們在 步驟 4 中解釋了Luigi計劃器)。

Luigi将发布执行任务的摘要:

 1[secondary_label Output]
 2===== Luigi Execution Summary =====
 3
 4Scheduled 1 tasks of which:
 5* 1 ran successfully:
 6    - 1 HelloLuigi()
 7
 8This progress looks :) because there were no failed tasks or missing dependencies
 9
10===== Luigi Execution Summary =====

它会创建一个新的文件hello-luigi.txt,包含内容:

1[label hello-luigi.txt]
2Hello Luigi!

你已经创建了一个 Luigi 任务,它会生成一个文件,然后使用 Luigi 的本地日程安排器执行它,现在你将创建一个任务,可以从网页中提取一份书籍列表。

步骤 3 — 创建一个任务来提取一份书籍列表

在此步骤中,您将创建一个 Luigi 任务,并为该任务定义一个 run() 方法,以下载在 [Gutenberg 项目] 上最受欢迎的书籍列表(https://www.gutenberg.org/browse/scores/top)。您将定义一个 output() 方法,以便在一个文件中存储这些书籍的链接。您将使用 Luigi 本地时间表来运行这些。

在您的)`方法中定义的文件存储的地方。在运行任务之前,您需要创建目录 – Python在尝试将文件写入尚不存在的目录时抛出例外:

1mkdir data
2mkdir data/counts
3mkdir data/downloads

创建一个新的文件 word-frequency.py:

1nano word-frequency.py

插入以下代码,这是Luigi的任务,以提取一个列表的链接到最畅读的书籍在 Gutenberg项目:

 1[label word-frequency.py]
 2import requests
 3import luigi
 4from bs4 import BeautifulSoup
 5
 6class GetTopBooks(luigi.Task):
 7    """
 8    Get list of the most popular books from Project Gutenberg
 9    """
10
11    def output(self):
12        return luigi.LocalTarget("data/books_list.txt")
13
14    def run(self):
15        resp = requests.get("http://www.gutenberg.org/browse/scores/top")
16
17        soup = BeautifulSoup(resp.content, "html.parser")
18
19        pageHeader = soup.find_all("h2", string="Top 100 EBooks yesterday")[0]
20        listTop = pageHeader.find_next_sibling("ol")
21
22        with self.output().open("w") as f:
23            for result in listTop.select("li>a"):
24                if "/ebooks/" in result["href"]:
25                    f.write("http://www.gutenberg.org{link}.txt.utf-8\n"
26                        .format(
27                            link=result["href"]
28                        )
29                    )

您定义了)`目标,以存储图书列表。

run()方法中,您:

  • 使用 " 请求 " 图书馆下载Gutenberg项目上图书页的HTML内容。
  • 使用 " BeautifulSoup " 图书馆来分析页面内容。 " 美丽的苏普 " 图书馆使我们能够从网页上删除信息。 要了解更多关于使用 " BeautifulSoup 库 " 的情况,请阅读如何用美丽的汤和Python 3打网页教程。
  • 打开输出方法所定义的输出文件。
  • 通过HTML结构来获取昨天 Top 100 EBooks** 列表中的所有链接. 对于此页面, 这是定位所有链接 QQAQ 属于列表项目 。 对于其中的每一个链接,如果链接到一个在包含 " /ebooks/ " 的链接上指向的页面,你就可以假设它是一本书并写入你的 " 输出() " 文件的链接。 .

Screenshot of the Project Gutenberg top books web page with the top ebooks links highlighted

保存和退出文件一旦完成。

使用以下命令执行此新任务:

1python -m luigi --module word-frequency GetTopBooks --local-scheduler

Luigi将发布执行任务的摘要:

 1[secondary_label Output]
 2===== Luigi Execution Summary =====
 3
 4Scheduled 1 tasks of which:
 5* 1 ran successfully:
 6    - 1 GetTopBooks()
 7
 8This progress looks :) because there were no failed tasks or missing dependencies
 9
10===== Luigi Execution Summary =====

在).运行以下命令来输出文件的内容:

1cat data/books_list.txt

此文件包含从 Project Gutenberg 顶级项目列表中提取的 URL 列表:

 1[secondary_label Output]
 2http://www.gutenberg.org/ebooks/1342.txt.utf-8
 3http://www.gutenberg.org/ebooks/11.txt.utf-8
 4http://www.gutenberg.org/ebooks/2701.txt.utf-8
 5http://www.gutenberg.org/ebooks/1661.txt.utf-8
 6http://www.gutenberg.org/ebooks/16328.txt.utf-8
 7http://www.gutenberg.org/ebooks/45858.txt.utf-8
 8http://www.gutenberg.org/ebooks/98.txt.utf-8
 9http://www.gutenberg.org/ebooks/84.txt.utf-8
10http://www.gutenberg.org/ebooks/5200.txt.utf-8
11http://www.gutenberg.org/ebooks/51461.txt.utf-8
12...

您已经创建了一个任务,可以从网页中提取一份书籍列表,在下一步,您将设置一个中央的Luigi日程表。

第4步:运行Luigi Scheduler

现在,您将启动 Luigi 日程表来执行和可视化您的任务. 您将采取在 Step 3 中开发的任务,并使用 Luigi 日程表运行它。

到目前为止,您一直在使用--local-scheduler标签运行Luigi,以便在本地运行您的工作,而无需将工作分配给中央日程表。

  • 执行你的任务的中心点. * 可视化你的任务的执行。

要访问 Luigi 日程表接口,您需要启用端口 8082

1sudo ufw allow 8082/tcp

要运行日程表,请执行以下命令:

1sudo sh -c ". luigi-venv/bin/activate ;luigid --background --port 8082"

<$>[注] 注: 我们已经重新运行了‘virtualenv’激活脚本作为根,然后将Luigi计时器作为背景任务启动。

如果您不希望作为 root 运行,您可以将 Luigi 编程程序作为当前用户的背景流程运行。此命令将 Luigi 编程程序运行在背景中,并从编程程序的背景任务中隐藏消息。您可以在 How To Use Bash's Job Control to Manage Foreground and Background Processes找到有关管理终端的背景流程的更多信息:

1luigid --port 8082 > /dev/null 2> /dev/null &

美元

打开浏览器以访问 Luigi 界面. 此处将位于 http://your_server_ip:8082,或者您已为您的服务器设置域名 http://your_domain:8082。 此处将打开 Luigi 用户界面。

Luigi default user interface

默认情况下,Luigi 任务使用 Luigi 日程表运行。 若要使用 Luigi 日程表运行您之前的任务,请从命令中删除 --local-scheduler 参数。

1python -m luigi --module word-frequency GetTopBooks

更新 Luigi 日程表用户界面. 您将找到在运行列表中添加的 GetTopBooks 任务及其执行状态。

Luigi User Interface after running the GetTopBooks Task

您将继续重定向到此用户界面以监控您的管道的进展。

如果您想通过 HTTPS 保护您的 Luigi 编程器,您可以通过 Nginx 服务。 若要使用 HTTPS 设置 Nginx 服务器,请参阅: How To Secure Nginx with Let's Encrypt on Ubuntu 20.04. 请参阅 Github - Luigi - Pull Request 2785关于连接 Luigi 到 Ngx 服务器的适当 Nginx 配置的建议。

您已经启动了 Luigi Scheduler,并使用它来可视化已执行的任务,接下来,您将创建一个任务来下载GetTopBooks()任务输出的书籍列表。

第5步:下载书籍

在此步骤中,您将创建一个 Luigi 任务来下载特定书籍. 您将定义这个新创建的任务和在 Step 3 中创建的任务之间的依赖性。

首先打开你的文件:

1nano word-frequency.py

在你的GetTopBooks()任务之后,添加一个额外的类到word-frequency.py文件中,使用以下代码:

 1[label word-frequency.py]
 2. . .
 3class DownloadBooks(luigi.Task):
 4    """
 5    Download a specified list of books
 6    """
 7    FileID = luigi.IntParameter()
 8
 9    REPLACE_LIST = """.,"';_[]:*-"""
10
11    def requires(self):
12        return GetTopBooks()
13
14    def output(self):
15        return luigi.LocalTarget("data/downloads/{}.txt".format(self.FileID))
16
17    def run(self):
18        with self.input().open("r") as i:
19            URL = i.read().splitlines()[self.FileID]
20
21            with self.output().open("w") as outfile:
22                book_downloads = requests.get(URL)
23                book_text = book_downloads.text
24
25                for char in self.REPLACE_LIST:
26                    book_text = book_text.replace(char, " ")
27
28                book_text = book_text.lower()
29                outfile.write(book_text)

在这个任务中,你输入一个参数;在这种情况下,一个整数参数。Luigi _parameters_是影响管道执行的任务的输入。

您已为您的 Luigi 任务添加了一种额外的方法, def requires();在此方法中,您定义了您需要的 Luigi 任务,才能执行此任务。

output() 方法中,您定义您的目标. 您使用 FileID 参数创建由此步骤创建的文件的名称. 在这种情况下,您格式化 `data/downloads/{FileID}.txt」。

run()方法中,您:

  • 打开在GetTopBooks()任务中生成的书籍列表。 * 从参数FileID指定的行中获取URL。 * 使用请求库从URL下载书籍的内容。 * 过滤书中的任何特殊字符,如:,.?,以便它们不会被包含在您的单词分析中。

保存和退出您的文件。

使用此命令执行新的DownloadBooks()任务:

1python -m luigi --module word-frequency DownloadBooks --FileID 2

在此命令中,您使用--FileID参数设置了FileID参数。

<$>[注] 注: 在定义名称中的_参数时要小心。在 Luigi 中引用它们时,您需要将_ 替换为- 例如,在从终端调用任务时,将File_ID参数引用为--File-ID

您将获得以下输出:

 1[secondary_label Output]
 2===== Luigi Execution Summary =====
 3
 4Scheduled 2 tasks of which:
 5* 1 complete ones were encountered:
 6    - 1 GetTopBooks()
 7* 1 ran successfully:
 8    - 1 DownloadBooks(FileID=2)
 9
10This progress looks :) because there were no failed tasks or missing dependencies
11
12===== Luigi Execution Summary =====

从输出中注意到,Luigi已经检测到您已经生成了GetTopBooks()的输出,并且跳过了运行该任务。

您已创建了一个任务,该任务使用了另一个任务的输出,并下载了一组书籍来分析。

步骤6 - 数字和总结结果

在此步骤中,您将创建一个 Luigi 任务,以计算在 Step 5 下载的每本书中的单词频率。

首先,重新打开文件:

1nano word-frequency.py

將下列進口添加到「word-frequency.py」的頂部:

1[label word-frequency.py]
2from collections import Counter
3import pickle

在你的)`任务的输出,并返回该书中最常见的单词:

 1[label word-frequency.py]
 2class CountWords(luigi.Task):
 3    """
 4    Count the frequency of the most common words from a file
 5    """
 6
 7    FileID = luigi.IntParameter()
 8
 9    def requires(self):
10        return DownloadBooks(FileID=self.FileID)
11
12    def output(self):
13        return luigi.LocalTarget(
14            "data/counts/count_{}.pickle".format(self.FileID),
15            format=luigi.format.Nop
16        )
17
18    def run(self):
19        with self.input().open("r") as i:
20            word_count = Counter(i.read().split())
21
22            with self.output().open("w") as outfile:
23                pickle.dump(word_count, outfile)

当您定义‘requires()’时,您将‘FileID’参数传递给下一个任务. 当您指定一个任务取决于另一个任务时,您会指定需要执行依赖任务的参数。

run()方法中,您:

  • 打开由 Download Books () 生成的文件 任务。
  • 在[`收集 ' (https://andsky.com/tech/tutorials/how-to-use-the-collections-module-in-python-3)库中使用内置的 " 咨询 " 对象。 这为分析一本书中最常用的词提供了简便的方法.
  • 使用 " pickle " 库存储 Python Counter " 对象的输出,以便在以后的任务中重新使用该对象。 " pickle " 是用于转换的图书馆 Python 对象进入一个字节流,可以存储并还原为后期的Python会话. 您必须设定 luigi 格式` 属性 。 " 本地目标 " ,使其能够编写 " pickle " 图书馆生成的二进制输出。 .

保存和退出您的文件。

使用此命令执行新的CountWords()任务:

1python -m luigi --module word-frequency CountWords --FileID 2

在 Luigi Scheduler 用户界面中打开 CountWords 任务图表视图。

Showing how to view a graph from the Luigi user interface

删除隐藏完成选项,然后删除上游依赖选项,您将从您创建的任务中找到执行流程。

Visualizing the execution of the CountWords task

您创建了一个任务来计算下载的书中最常见的单词,并可视化这些任务之间的依赖性。

第7步:定义配置参数

在此步骤中,您将添加配置参数到管道,这些参数将允许您定制要分析的书籍数量以及要在结果中包含的单词数量。

当您想要设置在任务中共享的参数时,可以创建一个)`类别中定义的参数,这些参数是管道在执行任务时设置的。

在 word-frequency.py 的末尾添加以下Config()类,这将为您分析的书籍数量和在总结中包含的最常见单词数量定义两个新参数:

1[label word-frequency.py]
2class GlobalParams(luigi.Config):
3    NumberBooks = luigi.IntParameter(default=10)
4    NumberTopWords = luigi.IntParameter(default=500)

将下列类添加到 word-frequency.py. 此类汇总了所有 CountWords() 任务的结果,以创建最常见单词的摘要:

 1[label word-frequency.py]
 2class TopWords(luigi.Task):
 3    """
 4    Aggregate the count results from the different files
 5    """
 6
 7    def requires(self):
 8        requiredInputs = []
 9        for i in range(GlobalParams().NumberBooks):
10            requiredInputs.append(CountWords(FileID=i))
11        return requiredInputs
12
13    def output(self):
14        return luigi.LocalTarget("data/summary.txt")
15
16    def run(self):
17        total_count = Counter()
18        for input in self.input():
19            with input.open("rb") as infile:
20                nextCounter = pickle.load(infile)
21                total_count += nextCounter
22
23        with self.output().open("w") as f:
24            for item in total_count.most_common(GlobalParams().NumberTopWords):
25                f.write("{0: <15}{1}\n".format(*item))

requires() 方法中,您可以提供一个列表,您希望一个任务使用多个依赖任务的输出. 您可以使用 GlobalParams().NumberBooks 参数来设置您需要数字的书籍数量。

输出()方法中,您将定义一个data/summary.txt输出文件,该文件将是您管道的最终输出。

run()方法中,您:

  • 创建一个 Counter() 对象来存储总数。 * 打开文件并),对于在 CountWords() 方法中执行的每个计数 * 附加加加的计数并添加到总数。

使用以下命令运行管道:

1python -m luigi --module word-frequency TopWords --GlobalParams-NumberBooks 15 --GlobalParams-NumberTopWords 750

Luigi将执行剩余的任务,以生成顶级单词的摘要:

 1[secondary_label Output]
 2===== Luigi Execution Summary =====
 3
 4Scheduled 31 tasks of which:
 5* 2 complete ones were encountered:
 6    - 1 CountWords(FileID=2)
 7    - 1 GetTopBooks()
 8* 29 ran successfully:
 9    - 14 CountWords(FileID=0,1,10,11,12,13,14,3,4,5,6,7,8,9)
10    - 14 DownloadBooks(FileID=0,1,10,11,12,13,14,3,4,5,6,7,8,9)
11    - 1 TopWords()
12
13This progress looks :) because there were no failed tasks or missing dependencies
14
15===== Luigi Execution Summary =====

您可以从 Luigi 计划器中可视化管道的执行情况,在任务列表中选择 GetTopBooks 任务,然后按一下** View Graph** 按钮。

Showing how to view a graph from the Luigi user interface

删除隐藏完成上游依赖选项。

Visualizing the execution of the TopWords Task

它将显示在路易吉发生的处理流程。

打开data/summary.txt文件:

1cat data/summary.txt

您将找到计算最常见的单词:

 1[secondary_label Output]
 2the 64593
 3and 41650
 4of 31896
 5to 31368
 6a 25265
 7i 23449
 8in 19496
 9it 16282
10that 15907
11he 14974
12...

在此步骤中,您已定义并使用参数来定制任务的执行,您已经生成了一组书籍中最常见的单词的摘要。

找到这个教程的所有代码在(https://github.com/do-community/LuigiTutorial)。

结论

本教程介绍了您如何使用 Luigi 数据处理管道及其主要功能,包括任务、参数、配置参数和 Luigi 日程表。

Luigi 支持连接到大量的常见数据源,您也可以将其扩展到运行大型、复杂的数据管道,从而为您提供一个强大的框架,以便开始解决您的数据处理挑战。

有关更多教程,请查看我们的 数据分析主题页面Python主题页面.

Published At
Categories with 技术
comments powered by Disqus