本文來自於千鋒教育在阿里雲開發者社區學習中心上線課程《Python入門2020最新大課》,主講人姜偉。
進程池的使用
當需要創建的子進程數量不多時,可以直接利用multiprocessing中的Process動態成生多個進程,但如果是上百甚至上千個目標,手動的去創建進程的工作量巨大,此時就可以用到multiprocessing模塊提供的Pool方法。
Pool
開啟過多的進程並不能提高你的效率,反而會降低你的效率,假設有500個任務,同時開啟500個進程,這500個進程除了不能一起執行之外(cpu沒有那麼多核),操作系統調度這500個進程,讓他們平均在4個或8個cpu上執行,這會佔用很大的空間。
如果要啟動大量的子進程,可以用進程池的方式批量創建子進程:
def task(n):
print('{}----->start'.format(n))
time.sleep(1)
print('{}------>end'.format(n))
if __name__ == '__main__':
p = Pool(8) # 創建進程池,並指定線程池的個數,默認是CPU的核數
for i in range(1, 11):
# p.apply(task, args=(i,)) # 同步執行任務,一個一個的執行任務,沒有併發效果
p.apply_async(task, args=(i,)) # 異步執行任務,可以達到併發效果
p.close()
p.join()
進程池獲取任務的執行結果:
def task(n):
print('{}----->start'.format(n))
time.sleep(1)
print('{}------>end'.format(n))
return n ** 2
if __name__ == '__main__':
p = Pool(4)
for i in range(1, 11):
res = p.apply_async(task, args=(i,)) # res 是任務的執行結果
print(res.get()) # 直接獲取結果的弊端是,多任務又變成同步的了
p.close()
# p.join() 不需要再join了,因為 res.get()本身就是一個阻塞方法
異步獲取線程的執行結果:
import time
from multiprocessing.pool import Pool
def task(n):
print('{}----->start'.format(n))
time.sleep(1)
print('{}------>end'.format(n))
return n ** 2
if __name__ == '__main__':
p = Pool(4)
res_list = []
for i in range(1, 11):
res = p.apply_async(task, args=(i,))
res_list.append(res) # 使用列表來保存進程執行結果
for re in res_list:
print(re.get())
p.close()
初始化Pool時,可以指定一個最大進程數,當有新的請求提交到Pool中時,如果池還沒有滿,那麼就會創建一個新的進程用來執行該請求;但如果池中的進程數已經達到指定的最大值,那麼該請求就會等待,直到池中有進程結束,才會用之前的進程來執行新的任務,請看下面的實例:
from multiprocessing import Pool
import os, time, random
def worker(msg):
t_start = time.time()
print("%s開始執行,進程號為%d" % (msg, os.getpid()))
# random.random()隨機生成0~1之間的浮點數
time.sleep(random.random() * 2)
t_stop = time.time()
print(msg, "執行完畢,耗時%0.2f" % (t_stop - t_start))
if __name__ == '__main__':
po = Pool(3) # 定義一個進程池,最大進程數3
for i in range(0, 10):
# Pool().apply_async(要調用的目標,(傳遞給目標的參數元祖,))
# 每次循環將會用空閒出來的子進程去調用目標
po.apply_async(worker, (i,))
print("----start----")
po.close() # 關閉進程池,關閉後po不再接收新的請求
po.join() # 等待po中所有子進程執行完成,必須放在close語句之後
print("-----end-----")
運行效果:
----start----
0開始執行,進程號為21466
1開始執行,進程號為21468
2開始執行,進程號為21467
0 執行完畢,耗時1.01
3開始執行,進程號為21466
2 執行完畢,耗時1.24
4開始執行,進程號為21467
3 執行完畢,耗時0.56
5開始執行,進程號為21466
1 執行完畢,耗時1.68
6開始執行,進程號為21468
4 執行完畢,耗時0.67
7開始執行,進程號為21467
5 執行完畢,耗時0.83
8開始執行,進程號為21466
6 執行完畢,耗時0.75
9開始執行,進程號為21468
7 執行完畢,耗時1.03
8 執行完畢,耗時1.05
9 執行完畢,耗時1.69
-----end-----
multiprocessing.Pool常用函數解析:
- apply_async(func[, args[, kwds]]) :使用非阻塞方式調用func(並行執行,堵塞方式必須等待上一個進程退出才能執行下一個進程),args為傳遞給func的參數列表,kwds為傳遞給func的關鍵字參數列表;
- close():關閉Pool,使其不再接受新的任務;
- terminate():不管任務是否完成,立即終止;
- join():主進程阻塞,等待子進程的退出, 必須在close或terminate之後使用;
進程池中的Queue
如果要使用Pool創建進程,就需要使用multiprocessing.Manager()中的Queue(),而不是multiprocessing.Queue(),否則會得到一條如下的錯誤信息:
RuntimeError: Queue objects should only be shared between processes through inheritance.
下面的實例演示了進程池中的進程如何通信:
# 修改import中的Queue為Manager
from multiprocessing import Manager, Pool
import os, time, random
def reader(q):
print("reader啟動(%s),父進程為(%s)" % (os.getpid(), os.getppid()))
for i in range(q.qsize()):
print("reader從Queue獲取到消息:%s" % q.get(True))
def writer(q):
print("writer啟動(%s),父進程為(%s)" % (os.getpid(), os.getppid()))
for i in "helloworld":
q.put(i)
if __name__ == "__main__":
print("(%s) start" % os.getpid())
q = Manager().Queue() # 使用Manager中的Queue
po = Pool()
po.apply_async(writer, (q,))
time.sleep(1) # 先讓上面的任務向Queue存入數據,然後再讓下面的任務開始從中取數據
po.apply_async(reader, (q,))
po.close()
po.join()
print("(%s) End" % os.getpid())
運行結果:
(4171) start
writer啟動(4173),父進程為(4171)
reader啟動(4174),父進程為(4171)
reader從Queue獲取到消息:h
reader從Queue獲取到消息:e
reader從Queue獲取到消息:l
reader從Queue獲取到消息:l
reader從Queue獲取到消息:o
reader從Queue獲取到消息:w
reader從Queue獲取到消息:o
reader從Queue獲取到消息:r
reader從Queue獲取到消息:l
reader從Queue獲取到消息:d
(4171) End
join方法的使用
# join 線程和進程都有join方法
import threading
import time
x = 10
def test(a, b):
time.sleep(1)
global x
x = a + b
# test(1, 1)
# print(x) # 2
t = threading.Thread(target=test, args=(1, 1))
t.start()
t.join() # 讓主線程等待
print(x) # 10