在我们之前的教程中,我们了解了 Python CSV 示例。
Python 多处理器
平行处理现在越来越受到关注,如果你仍然不知道平行处理,请从 wikipedia学习)。随着CPU制造商开始将越来越多的核心添加到他们的处理器中,创建平行代码是提高性能的好方法。Python引入了 multiprocessing模块,让我们写平行代码。
Python 多处理过程、队列和锁定
在 python 多处理模块中有许多类用于构建并行程序,其中有三个基本类是进程
,Queue
和锁
。这些类将帮助你构建并行程序,但在描述这些之前,让我们用简单的代码开始这个主题。为了使并行程序有用,你必须知道你电脑中有多少个核心。
1import multiprocessing
2
3print("Number of cpu : ", multiprocessing.cpu_count())
The following output may vary for your pc. For me, number of cores is 8.
Python 多处理过程类
Python multiprocessing Process
类是一个抽象的,它设置了另一个 Python 过程,提供它运行代码,并为主应用程序控制执行的方式。有两个重要的函数属于进程类 - start()
和 join()
函数. 首先,我们需要写一个函数,这将由进程运行。 然后,我们需要实时化一个进程对象。 如果我们创建一个进程对象,什么都不会发生,直到我们告诉它通过 start()
函数开始处理。 然后,进程将运行并返回其结果。 然后,我们告诉进程通过 join()
函数完成。 没有 join()
呼叫函数,进程将保持空白,不会终止。 所以如果你创建了许多进程,而不会终止它们,你
1from multiprocessing import Process
2
3def print_func(continent='Asia'):
4 print('The name of continent is : ', continent)
5
6if __name__ == "__main__": # confirms that the code is under main function
7 names = ['America', 'Europe', 'Africa']
8 procs = []
9 proc = Process(target=print_func) # instantiating without any argument
10 procs.append(proc)
11 proc.start()
12
13 # instantiating process with arguments
14 for name in names:
15 # print(name)
16 proc = Process(target=print_func, args=(name,))
17 procs.append(proc)
18 proc.start()
19
20 # complete the processes
21 for proc in procs:
22 proc.join()
The output of the following code will be:
Python 多处理列表
你对计算机数据结构有基本的知识,你可能知道Queue。Python Multiprocessing模块提供了‘Queue’类,这正是一个 First-In-First-Out数据结构。它们可以存储任何 pickle Python 对象(尽管简单的对象是最好的),并且对于在进程之间共享数据非常有用。在作为参数传递到进程的目标函数时,队列特别有用,以便进程消耗数据。通过使用 put()
函数,我们可以插入数据来排队,然后使用 get()
我们可以从队列中获取项目。
1from multiprocessing import Queue
2
3colors = ['red', 'green', 'blue', 'black']
4cnt = 1
5# instantiating a queue object
6queue = Queue()
7print('pushing items to queue:')
8for color in colors:
9 print('item no: ', cnt, ' ', color)
10 queue.put(color)
11 cnt += 1
12
13print('\npopping items from queue:')
14cnt = 0
15while not queue.empty():
16 print('item no: ', cnt, ' ', queue.get())
17 cnt += 1
Python 多处理器锁类
锁类的任务很简单,它允许代码声称锁,这样任何其他过程都无法执行类似的代码,直到锁被释放。因此,锁类的任务主要是两个。
Python 多处理器
在这个Python多处理示例中,我们将将所有的知识合并在一起。假设我们有几个任务要完成。 要完成这个任务,我们将使用多个流程。 因此,我们将保持两个队列。 一个将包含任务,另一个将包含完成任务的日志。 然后我们将实时化进程完成任务。 请注意,Python 队列类已经同步。 也就是说,我们不需要使用锁定类来阻止多个进程访问相同的队列对象。 这就是为什么我们不需要使用锁定类在这种情况下。 下面是实现,我们正在将任务添加到队列中,然后创建进程并启动它们,然后使用‘join()’完成进程。 最后,我们正在从第二队列打印日志。
1from multiprocessing import Lock, Process, Queue, current_process
2import time
3import queue # imported for using queue.Empty exception
4
5def do_job(tasks_to_accomplish, tasks_that_are_done):
6 while True:
7 try:
8 '''
9 try to get task from the queue. get_nowait() function will
10 raise queue.Empty exception if the queue is empty.
11 queue(False) function would do the same task also.
12 '''
13 task = tasks_to_accomplish.get_nowait()
14 except queue.Empty:
15
16 break
17 else:
18 '''
19 if no exception has been raised, add the task completion
20 message to task_that_are_done queue
21 '''
22 print(task)
23 tasks_that_are_done.put(task + ' is done by ' + current_process().name)
24 time.sleep(.5)
25 return True
26
27def main():
28 number_of_task = 10
29 number_of_processes = 4
30 tasks_to_accomplish = Queue()
31 tasks_that_are_done = Queue()
32 processes = []
33
34 for i in range(number_of_task):
35 tasks_to_accomplish.put("Task no " + str(i))
36
37 # creating processes
38 for w in range(number_of_processes):
39 p = Process(target=do_job, args=(tasks_to_accomplish, tasks_that_are_done))
40 processes.append(p)
41 p.start()
42
43 # completing process
44 for p in processes:
45 p.join()
46
47 # print the output
48 while not tasks_that_are_done.empty():
49 print(tasks_that_are_done.get())
50
51 return True
52
53if __name__ == '__main__':
54 main()
Depending on the number of task, the code will take some time to show you the output. The output of the following code will vary from time to time.
Python 多处理池
Python multiprocessing Pool 可用于在多个输入值之间并行执行函数,在各个进程中分配输入数据(数据并行性)。
1from multiprocessing import Pool
2
3import time
4
5work = (["A", 5], ["B", 2], ["C", 1], ["D", 3])
6
7def work_log(work_data):
8 print(" Process %s waiting %s seconds" % (work_data[0], work_data[1]))
9 time.sleep(int(work_data[1]))
10 print(" Process %s Finished." % work_data[0])
11
12def pool_handler():
13 p = Pool(2)
14 p.map(work_log, work)
15
16if __name__ == '__main__':
17 pool_handler()
Below image shows the output of the above program. Notice that pool size is 2, so two executions of work_log
function is happening in parallel. When one of the function processing finishes, it picks the next argument and so on. So, that’s all for python multiprocessing module. Reference: Official Documentation