如何在 Python 3 中使用 ThreadPoolExecutor

作者选择了 COVID-19 救援基金作为 Write for Donations计划的一部分接受捐款。

介绍

Python _threads_是允许你的程序同时运行多个程序的一种形式,在Python中也可以使用多个流程实现平行,但线程特别适合加速涉及大量I/O(输入/输出)的应用程序。

例如 I/O-bound 操作包括创建 Web 请求和从文件中读取数据. 与 I/O-bound 操作不同, CPU-bound 操作(如使用 Python 标准库进行数学)不会从 Python 线程中受益。

Python 3 包含ThreadPoolExecutor工具,用于在线程中执行代码。

在本教程中,我们将使用ThreadPoolExecutor来有效地执行网络请求,我们将定义一个适合在线程中调用的函数,使用ThreadPoolExecutor来执行该函数,并从这些执行中处理结果。

对于本教程,我们将进行网络请求,以检查 维基百科页面的存在。

<$>[注] 注: I/O-bound 操作比 CPU-bound 操作更能从线索中受益的事实,是由 Python 中的一个名为 global interpreter lock的独特性引起的。

前提条件

为了充分利用本教程,建议熟悉Python编程和本地Python编程环境,安装请求

您可以查看这些教程以获取必要的背景信息:

1pip install --user requests==2.23.0

步骤 1 — 定义要在线程中执行的函数

让我们先定义一个我们希望使用线程执行的函数。

使用「nano」或您偏好的文本编辑器/开发环境,您可以打开此文件:

1nano wiki_page_function.py

对于本教程,我们会写一个函数来确定维基百科页面是否存在:

 1[label wiki_page_function.py]
 2import requests
 3
 4def get_wiki_page_existence(wiki_page_url, timeout=10):
 5    response = requests.get(url=wiki_page_url, timeout=timeout)
 6
 7    page_status = "unknown"
 8    if response.status_code == 200:
 9        page_status = "exists"
10    elif response.status_code == 404:
11        page_status = "does not exist"
12
13    return wiki_page_url + " - " + page_status

get_wiki_page_existence 函数接受两个参数:一个维基百科页面的URL(‘wiki_page_url’)和一个等待该URL的响应的时限数秒。

get_wiki_page_existence 使用 requests 套件向该 URL 发送网页请求. 根据 HTTP 响应的 状态代码返回一个字符串,描述该页是否存在。 不同的状态代码代表 HTTP 请求的不同结果。 此过程假定一个 200成功状态代码意味着维基百科页面存在,而一个 404' 的未找到`状态代码意味着维基百科页面不存在。

如前提部分所述,您需要安装的请求包来运行此功能。

让我们试着通过添加url和函数调用来运行该函数,然后使用get_wiki_page_existence函数:

1[label wiki_page_function.py]
2. . .
3url = "https://en.wikipedia.org/wiki/Ocean"
4print(get_wiki_page_existence(wiki_page_url=url))

添加代码后,保存并关闭文件。

如果我们使用此代码:

1python wiki_page_function.py

我们会看到这样的输出:

1[secondary_label Output]
2https://en.wikipedia.org/wiki/Ocean - exists

使用有效的维基百科页面调用get_wiki_page_existence函数将返回一个字符串,确认该页面确实存在。

<$>[警告] 警告: 一般来说,在不特别注意的情况下在线程之间共享 Python 对象或状态是安全的。 当定义一个函数在线程中执行时,最好定义一个函数执行单个任务,而不共享或发布状态给其他线程。

步骤 2 — 使用 ThreadPoolExecutor 在 Threads 中执行函数

现在我们有一个适合用线程调用的函数,我们可以使用ThreadPoolExecutor来及时执行该函数的多个调用。

让我们在wiki_page_function.py中添加以下突出的代码:

 1[label wiki_page_function.py]
 2import requests
 3import concurrent.futures
 4
 5def get_wiki_page_existence(wiki_page_url, timeout=10):
 6    response = requests.get(url=wiki_page_url, timeout=timeout)
 7
 8    page_status = "unknown"
 9    if response.status_code == 200:
10        page_status = "exists"
11    elif response.status_code == 404:
12        page_status = "does not exist"
13
14    return wiki_page_url + " - " + page_status
15
16wiki_page_urls = [
17    "https://en.wikipedia.org/wiki/Ocean",
18    "https://en.wikipedia.org/wiki/Island",
19    "https://en.wikipedia.org/wiki/this_page_does_not_exist",
20    "https://en.wikipedia.org/wiki/Shark",
21]
22with concurrent.futures.ThreadPoolExecutor() as executor:
23    futures = []
24    for url in wiki_page_urls:
25        futures.append(executor.submit(get_wiki_page_existence, wiki_page_url=url))
26    for future in concurrent.futures.as_completed(futures):
27        print(future.result())

让我们来看看这个代码是如何工作的:

  • concurrent.futures 被导入,使我们能够访问 ThreadPoolExecutor.
  • A with 语句被用来创建一个 ThreadPoolExecutor 实例 executor,该实例将在完成后立即清理线程。
  • 四个工作被‘提交’给 executor:在 wiki_page_urls 列表中的每个 URL 一个。
  • 每个呼叫到 submit 都会返回一个在 futures 列表中存储的 Future 实例(https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Future)。
  • as_completed function 等待每个 Future `get_wiki_page_

如果我们再次运行该程序,使用以下命令:

1python wiki_page_function.py

我们会看到这样的输出:

1[secondary_label Output]
2https://en.wikipedia.org/wiki/Island - exists
3https://en.wikipedia.org/wiki/Ocean - exists
4https://en.wikipedia.org/wiki/this_page_does_not_exist - does not exist
5https://en.wikipedia.org/wiki/Shark - exists

此输出有意义: 3 个 URL 是有效的维基百科页面,其中一个是this_page_does_not_exist 不是。 请注意,您的输出可能与此输出不同的排序。

步骤 3 — 处理在线程中运行函数的例外

在之前的步骤中,get_wiki_page_existence 成功地返回了我们所有召唤的值. 在此步骤中,我们将看到,ThreadPoolExecutor 也可以提到在线程函数召唤中生成的例外。

让我们来看看下面的例子代码块:

 1[label wiki_page_function.py]
 2import requests
 3import concurrent.futures
 4
 5def get_wiki_page_existence(wiki_page_url, timeout=10):
 6    response = requests.get(url=wiki_page_url, timeout=timeout)
 7
 8    page_status = "unknown"
 9    if response.status_code == 200:
10        page_status = "exists"
11    elif response.status_code == 404:
12        page_status = "does not exist"
13
14    return wiki_page_url + " - " + page_status
15
16wiki_page_urls = [
17    "https://en.wikipedia.org/wiki/Ocean",
18    "https://en.wikipedia.org/wiki/Island",
19    "https://en.wikipedia.org/wiki/this_page_does_not_exist",
20    "https://en.wikipedia.org/wiki/Shark",
21]
22with concurrent.futures.ThreadPoolExecutor() as executor:
23    futures = []
24    for url in wiki_page_urls:
25        futures.append(
26            executor.submit(
27                get_wiki_page_existence, wiki_page_url=url, timeout=0.00001
28            )
29        )
30    for future in concurrent.futures.as_completed(futures):
31        try:
32            print(future.result())
33        except requests.ConnectTimeout:
34            print("ConnectTimeout.")

这个代码块与我们在步骤 2 中使用的代码几乎相同,但它有两个关键差异:

由于requests包无法在0.00001秒内完成其对维基百科的 Web 请求,它将提出一个ConnectTimeout例外。

如果我们再次运行该程序,我们将看到以下输出:

1[secondary_label Output]
2ConnectTimeout.
3ConnectTimeout.
4ConnectTimeout.
5ConnectTimeout.

打印了四个ConnectTimeout消息,每一个为我们的四个wiki_page_urls,因为它们中的任何一个都无法在0.00001秒内完成,而每一个get_wiki_page_existence的四个呼叫都提出了ConnectTimeout例外。

您现在已经看到,如果向ThreadPoolExecutor提交的函数呼叫提出了例外,那么可以通过呼叫Future.result来提交正常的例外。

步骤 4 – 比较与没有线条的执行时间

现在让我们检查一下,使用ThreadPoolExecutor实际上使您的程序更快。

首先,让我们在没有线条的情况下运行它时get_wiki_page_existence:

 1[label wiki_page_function.py]
 2import time
 3import requests
 4import concurrent.futures
 5
 6def get_wiki_page_existence(wiki_page_url, timeout=10):
 7    response = requests.get(url=wiki_page_url, timeout=timeout)
 8
 9    page_status = "unknown"
10    if response.status_code == 200:
11        page_status = "exists"
12    elif response.status_code == 404:
13        page_status = "does not exist"
14
15    return wiki_page_url + " - " + page_status
16
17wiki_page_urls = ["https://en.wikipedia.org/wiki/" + str(i) for i in range(50)]
18
19print("Running without threads:")
20without_threads_start = time.time()
21for url in wiki_page_urls:
22    print(get_wiki_page_existence(wiki_page_url=url))
23print("Without threads time:", time.time() - without_threads_start)

在代码示例中,我们将我们的get_wiki_page_existence函数命名为五十个不同的维基百科页面URL一个接一个。

如果我们像以前一样再次运行此代码,我们将看到如下的输出:

1[secondary_label Output]
2Running without threads:
3https://en.wikipedia.org/wiki/0 - exists
4https://en.wikipedia.org/wiki/1 - exists
5. . .
6https://en.wikipedia.org/wiki/48 - exists
7https://en.wikipedia.org/wiki/49 - exists
8Without threads time: 5.803015232086182

本次输出中的 2 至 47 项被忽略为简要性。

没有线程时间之后打印的秒数在你在机器上运行时会有所不同 - 这是好事,你只是得到一个基线号码来比较使用ThreadPoolExecutor的解决方案。

让我们通过get_wiki_page_existence运行相同的50个维基百科 URL,但这次使用ThreadPoolExecutor:

 1[label wiki_page_function.py]
 2import time
 3import requests
 4import concurrent.futures
 5
 6def get_wiki_page_existence(wiki_page_url, timeout=10):
 7    response = requests.get(url=wiki_page_url, timeout=timeout)
 8
 9    page_status = "unknown"
10    if response.status_code == 200:
11        page_status = "exists"
12    elif response.status_code == 404:
13        page_status = "does not exist"
14
15    return wiki_page_url + " - " + page_status
16wiki_page_urls = ["https://en.wikipedia.org/wiki/" + str(i) for i in range(50)]
17
18print("Running threaded:")
19threaded_start = time.time()
20with concurrent.futures.ThreadPoolExecutor() as executor:
21    futures = []
22    for url in wiki_page_urls:
23        futures.append(executor.submit(get_wiki_page_existence, wiki_page_url=url))
24    for future in concurrent.futures.as_completed(futures):
25        print(future.result())
26print("Threaded time:", time.time() - threaded_start)

代码是我们在步骤 2 中创建的相同代码,只需添加一些打印声明,以显示执行代码所需的秒数。

如果我们再次运行该程序,我们会看到以下内容:

1[secondary_label Output]
2Running threaded:
3https://en.wikipedia.org/wiki/1 - exists
4https://en.wikipedia.org/wiki/0 - exists
5. . .
6https://en.wikipedia.org/wiki/48 - exists
7https://en.wikipedia.org/wiki/49 - exists
8Threaded time: 1.2201685905456543

再次,在线程时间之后打印的秒数将在您的计算机上有所不同(就像您的输出顺序一样)。

现在您可以比较五十个维基百科页面URL的执行时间,有和没有线条。

在本教程中使用的机器上,没有线程花费了5803秒,而在线程中花费了1220秒。

结论

在本教程中,您已经学会了如何在Python 3中使用ThreadPoolExecutor实用程序,以高效地运行 I/O 绑定的代码。

从这里您可以了解更多关于 concurrent.futures 模块提供的其他同步函数。

Published At
Categories with 技术
comments powered by Disqus