作者选择了 COVID-19 救援基金作为 Write for Donations计划的一部分接受捐款。
介绍
Python _threads_是允许你的程序同时运行多个程序的一种形式,在Python中也可以使用多个流程实现平行,但线程特别适合加速涉及大量I/O(输入/输出)的应用程序。
例如 I/O-bound 操作包括创建 Web 请求和从文件中读取数据. 与 I/O-bound 操作不同, CPU-bound 操作(如使用 Python 标准库进行数学)不会从 Python 线程中受益。
Python 3 包含ThreadPoolExecutor
工具,用于在线程中执行代码。
在本教程中,我们将使用ThreadPoolExecutor
来有效地执行网络请求,我们将定义一个适合在线程中调用的函数,使用ThreadPoolExecutor
来执行该函数,并从这些执行中处理结果。
对于本教程,我们将进行网络请求,以检查 维基百科页面的存在。
<$>[注] 注: I/O-bound 操作比 CPU-bound 操作更能从线索中受益的事实,是由 Python 中的一个名为 global interpreter lock的独特性引起的。
前提条件
为了充分利用本教程,建议熟悉Python编程和本地Python编程环境,安装请求
。
您可以查看这些教程以获取必要的背景信息:
- 如何在Python中编码 3
- 如何在Ubuntu 18.04上安装Python 3并设置本地编程环境
- 要在您的本地Python编程环境中安装
请求
包(https://andsky.com/tech/tutorials/how-to-install-python-3-and-set-up-a-local-programming-environment-on-ubuntu-18-04),您可以执行以下命令:
1pip install --user requests==2.23.0
步骤 1 — 定义要在线程中执行的函数
让我们先定义一个我们希望使用线程执行的函数。
使用「nano」或您偏好的文本编辑器/开发环境,您可以打开此文件:
1nano wiki_page_function.py
对于本教程,我们会写一个函数来确定维基百科页面是否存在:
1[label wiki_page_function.py]
2import requests
3
4def get_wiki_page_existence(wiki_page_url, timeout=10):
5 response = requests.get(url=wiki_page_url, timeout=timeout)
6
7 page_status = "unknown"
8 if response.status_code == 200:
9 page_status = "exists"
10 elif response.status_code == 404:
11 page_status = "does not exist"
12
13 return wiki_page_url + " - " + page_status
get_wiki_page_existence
函数接受两个参数:一个维基百科页面的URL(‘wiki_page_url’)和一个等待该URL的响应的时限
数秒。
get_wiki_page_existence
使用 requests
套件向该 URL 发送网页请求. 根据 HTTP 响应的 状态代码返回一个字符串,描述该页是否存在。 不同的状态代码代表 HTTP 请求的不同结果。 此过程假定一个 200
的成功
状态代码意味着维基百科页面存在,而一个 404' 的
未找到`状态代码意味着维基百科页面不存在。
如前提部分所述,您需要安装的请求
包来运行此功能。
让我们试着通过添加url
和函数调用来运行该函数,然后使用get_wiki_page_existence
函数:
1[label wiki_page_function.py]
2. . .
3url = "https://en.wikipedia.org/wiki/Ocean"
4print(get_wiki_page_existence(wiki_page_url=url))
添加代码后,保存并关闭文件。
如果我们使用此代码:
1python wiki_page_function.py
我们会看到这样的输出:
1[secondary_label Output]
2https://en.wikipedia.org/wiki/Ocean - exists
使用有效的维基百科页面调用get_wiki_page_existence
函数将返回一个字符串,确认该页面确实存在。
<$>[警告] 警告: 一般来说,在不特别注意的情况下在线程之间共享 Python 对象或状态是安全的。 当定义一个函数在线程中执行时,最好定义一个函数执行单个任务,而不共享或发布状态给其他线程。
步骤 2 — 使用 ThreadPoolExecutor 在 Threads 中执行函数
现在我们有一个适合用线程调用的函数,我们可以使用ThreadPoolExecutor
来及时执行该函数的多个调用。
让我们在wiki_page_function.py中添加以下突出的代码:
1[label wiki_page_function.py]
2import requests
3import concurrent.futures
4
5def get_wiki_page_existence(wiki_page_url, timeout=10):
6 response = requests.get(url=wiki_page_url, timeout=timeout)
7
8 page_status = "unknown"
9 if response.status_code == 200:
10 page_status = "exists"
11 elif response.status_code == 404:
12 page_status = "does not exist"
13
14 return wiki_page_url + " - " + page_status
15
16wiki_page_urls = [
17 "https://en.wikipedia.org/wiki/Ocean",
18 "https://en.wikipedia.org/wiki/Island",
19 "https://en.wikipedia.org/wiki/this_page_does_not_exist",
20 "https://en.wikipedia.org/wiki/Shark",
21]
22with concurrent.futures.ThreadPoolExecutor() as executor:
23 futures = []
24 for url in wiki_page_urls:
25 futures.append(executor.submit(get_wiki_page_existence, wiki_page_url=url))
26 for future in concurrent.futures.as_completed(futures):
27 print(future.result())
让我们来看看这个代码是如何工作的:
concurrent.futures
被导入,使我们能够访问ThreadPoolExecutor
.- A
with
语句被用来创建一个ThreadPoolExecutor
实例executor
,该实例将在完成后立即清理线程。 - 四个工作被‘提交’给
executor
:在wiki_page_urls
列表中的每个 URL 一个。 - 每个呼叫到
submit
都会返回一个在futures
列表中存储的Future
实例(https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Future)。 as_completed function
等待每个Future
`get_wiki_page_
如果我们再次运行该程序,使用以下命令:
1python wiki_page_function.py
我们会看到这样的输出:
1[secondary_label Output]
2https://en.wikipedia.org/wiki/Island - exists
3https://en.wikipedia.org/wiki/Ocean - exists
4https://en.wikipedia.org/wiki/this_page_does_not_exist - does not exist
5https://en.wikipedia.org/wiki/Shark - exists
此输出有意义: 3 个 URL 是有效的维基百科页面,其中一个是this_page_does_not_exist
不是。 请注意,您的输出可能与此输出不同的排序。
步骤 3 — 处理在线程中运行函数的例外
在之前的步骤中,get_wiki_page_existence 成功地返回了我们所有召唤的值. 在此步骤中,我们将看到,ThreadPoolExecutor 也可以提到在线程函数召唤中生成的例外。
让我们来看看下面的例子代码块:
1[label wiki_page_function.py]
2import requests
3import concurrent.futures
4
5def get_wiki_page_existence(wiki_page_url, timeout=10):
6 response = requests.get(url=wiki_page_url, timeout=timeout)
7
8 page_status = "unknown"
9 if response.status_code == 200:
10 page_status = "exists"
11 elif response.status_code == 404:
12 page_status = "does not exist"
13
14 return wiki_page_url + " - " + page_status
15
16wiki_page_urls = [
17 "https://en.wikipedia.org/wiki/Ocean",
18 "https://en.wikipedia.org/wiki/Island",
19 "https://en.wikipedia.org/wiki/this_page_does_not_exist",
20 "https://en.wikipedia.org/wiki/Shark",
21]
22with concurrent.futures.ThreadPoolExecutor() as executor:
23 futures = []
24 for url in wiki_page_urls:
25 futures.append(
26 executor.submit(
27 get_wiki_page_existence, wiki_page_url=url, timeout=0.00001
28 )
29 )
30 for future in concurrent.futures.as_completed(futures):
31 try:
32 print(future.result())
33 except requests.ConnectTimeout:
34 print("ConnectTimeout.")
这个代码块与我们在步骤 2 中使用的代码几乎相同,但它有两个关键差异:
由于requests
包无法在0.00001
秒内完成其对维基百科的 Web 请求,它将提出一个ConnectTimeout
例外。
如果我们再次运行该程序,我们将看到以下输出:
1[secondary_label Output]
2ConnectTimeout.
3ConnectTimeout.
4ConnectTimeout.
5ConnectTimeout.
打印了四个ConnectTimeout
消息,每一个为我们的四个wiki_page_urls
,因为它们中的任何一个都无法在0.00001
秒内完成,而每一个get_wiki_page_existence
的四个呼叫都提出了ConnectTimeout
例外。
您现在已经看到,如果向ThreadPoolExecutor
提交的函数呼叫提出了例外,那么可以通过呼叫Future.result
来提交正常的例外。
步骤 4 – 比较与没有线条的执行时间
现在让我们检查一下,使用ThreadPoolExecutor
实际上使您的程序更快。
首先,让我们在没有线条的情况下运行它时get_wiki_page_existence
:
1[label wiki_page_function.py]
2import time
3import requests
4import concurrent.futures
5
6def get_wiki_page_existence(wiki_page_url, timeout=10):
7 response = requests.get(url=wiki_page_url, timeout=timeout)
8
9 page_status = "unknown"
10 if response.status_code == 200:
11 page_status = "exists"
12 elif response.status_code == 404:
13 page_status = "does not exist"
14
15 return wiki_page_url + " - " + page_status
16
17wiki_page_urls = ["https://en.wikipedia.org/wiki/" + str(i) for i in range(50)]
18
19print("Running without threads:")
20without_threads_start = time.time()
21for url in wiki_page_urls:
22 print(get_wiki_page_existence(wiki_page_url=url))
23print("Without threads time:", time.time() - without_threads_start)
在代码示例中,我们将我们的get_wiki_page_existence
函数命名为五十个不同的维基百科页面URL一个接一个。
如果我们像以前一样再次运行此代码,我们将看到如下的输出:
1[secondary_label Output]
2Running without threads:
3https://en.wikipedia.org/wiki/0 - exists
4https://en.wikipedia.org/wiki/1 - exists
5. . .
6https://en.wikipedia.org/wiki/48 - exists
7https://en.wikipedia.org/wiki/49 - exists
8Without threads time: 5.803015232086182
本次输出中的 2 至 47 项被忽略为简要性。
在没有线程时间
之后打印的秒数在你在机器上运行时会有所不同 - 这是好事,你只是得到一个基线号码来比较使用ThreadPoolExecutor
的解决方案。
让我们通过get_wiki_page_existence
运行相同的50个维基百科 URL,但这次使用ThreadPoolExecutor
:
1[label wiki_page_function.py]
2import time
3import requests
4import concurrent.futures
5
6def get_wiki_page_existence(wiki_page_url, timeout=10):
7 response = requests.get(url=wiki_page_url, timeout=timeout)
8
9 page_status = "unknown"
10 if response.status_code == 200:
11 page_status = "exists"
12 elif response.status_code == 404:
13 page_status = "does not exist"
14
15 return wiki_page_url + " - " + page_status
16wiki_page_urls = ["https://en.wikipedia.org/wiki/" + str(i) for i in range(50)]
17
18print("Running threaded:")
19threaded_start = time.time()
20with concurrent.futures.ThreadPoolExecutor() as executor:
21 futures = []
22 for url in wiki_page_urls:
23 futures.append(executor.submit(get_wiki_page_existence, wiki_page_url=url))
24 for future in concurrent.futures.as_completed(futures):
25 print(future.result())
26print("Threaded time:", time.time() - threaded_start)
代码是我们在步骤 2 中创建的相同代码,只需添加一些打印声明,以显示执行代码所需的秒数。
如果我们再次运行该程序,我们会看到以下内容:
1[secondary_label Output]
2Running threaded:
3https://en.wikipedia.org/wiki/1 - exists
4https://en.wikipedia.org/wiki/0 - exists
5. . .
6https://en.wikipedia.org/wiki/48 - exists
7https://en.wikipedia.org/wiki/49 - exists
8Threaded time: 1.2201685905456543
再次,在线程时间
之后打印的秒数将在您的计算机上有所不同(就像您的输出顺序一样)。
现在您可以比较五十个维基百科页面URL的执行时间,有和没有线条。
在本教程中使用的机器上,没有线程花费了5803
秒,而在线程中花费了1220
秒。
结论
在本教程中,您已经学会了如何在Python 3中使用ThreadPoolExecutor
实用程序,以高效地运行 I/O 绑定的代码。
从这里您可以了解更多关于 concurrent.futures
模块提供的其他同步函数。