Day 3. multiprocessing (& threading)
前一篇提到了subprocess, 難免會想到多執行緒的問題,所以起初只是想整理關於python如何開啟thread的方法。
然而從官方的文件查下去看,突然發現一個不認識的東西:GIL(Global Interpreter lock).
這東西一次只允許一個thread工作,隔一段規定的時間後才會把這個鎖釋放出來讓其他的thread去拿。搶到這個鎖的thread可以開始工作,而其他的thread繼續沈睡。
這個機制可能導致多執行緒的作業反而效率變得更差(即使是多核心的電腦)。
之所以有GIL,是在實作CPython編譯器時,基於某些考量以及編譯器的實作難度才做的機制(見:官方的說明)。在文中也提到過去曾經有過把GIL機制拿掉後的結果:雖然multi-thread的速度增加,但對於一般single processor的情況效能變得更糟了。而若是要克服這個問題,則會使編譯器變得過度複雜而放棄。
儘管從3.2版開始,GIL的實作機制有所改變(見:What's New in 3.2),但依據個人測試的結果來看,在多核心的機器上跑multi-thread的效能依舊會變得比不做還差(儘管沒有像網路上其他人的情況這麼糟):
[圖1] Single thread.
[圖2] Multi-thread.
如果想要用多執行緒,但想要避開GIL, 有以下幾種方法:
- 別用CPython, 改用以其他語言實作的Python, 如Jython, IronPython.
- 將需要用到的部分改用C/C++包出去做。(如[心得] C++搭Python 高效地圖引擎開發的作法)
- 用python的另一個module, multiprocessing.
- 除了以上幾種外,stackoverflow的這篇文章底下的回應還介紹了另外幾種方法。
這裡決定採用multiprocessing的方法。
multiprocessing
由於multiprocessing的內容相當多,底下盡量採取條列重點式的呈現法。
某種程度上能看成簡化、中文化的官方文件條目。
multiprocessing有兩種方式分配工作:Process和Pool.
Process:
(和threading的操作方式幾乎一樣,把multiprocess換成threading, Process換成Thread就是了。唯一的不同點是需要檢查 name是不是等於main.)
from multiprocessing import Process
import os
def f():
print('my pid: ', os.getpid())
if __name__ == '__main__':
for i in range(4):
p = Process(target=f, args=())
p.start()
p.join()
Pool:
from multiprocessing import Pool
import os
def f(x):
return x*x
if __name__ == '__main__':
pool = Pool()
print( pool.map(f, range(10)) )
multi_res = [ pool.apply_async(os.getpid, ()) for i in range(4) ]
print( [res.get() for res in multi_res] )
pool.close() # close只是關閉pool, 但已開啟的process還是會繼續執行
pool.join() # 以這個例子來說其實都已經結束了,所以這裡不需要join.
Pool如果不指定worker process的個數,預設是能使用的cpu核心數。
Pool和Process不同,不需要呼叫start之類的function,因為worker process被分配到工作的瞬間就開始運作了。
Pool.map(func, iterable, ...):
後面代入的iterable會被拆成幾個部分平行去做。
Pool.apply_async(func, args, ...):
會回傳result object, 要在用get()去等待、取得結果。
如果怕執行時間過久,可以設定get( timeout=sec )去限制執行時間上限。
可以搭配with使用:with Pool(4) as pool, 離開時會自動close
交換資訊有相當多的方法可以使用: Pipe, Queue, Array, Value(都是multiprocessing底下的)
Queue: (下面的code來自官方說明文件)
用法和queue.Queue幾乎相同。
from multiprocessing import Process, Queue
def f(q):
q.put([42, None, 'hello'])
if __name__ == '__main__':
q = Queue()
p = Process(target=f, args=(q,))
p.start()
print(q.get()) # prints "[42, None, 'hello']"
p.join()
和一般的queue一樣的操作方法。
如果沒有特別的需求,也可以考慮使用SimpleQueue, 裡頭只有empty(), get(), put(item)三種方法。
Pipe: (code同樣來自官方文件)
from multiprocessing import Process, Pipe
def f(conn):
conn.send([42, None, 'hello'])
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print(parent_conn.recv()) # prints "[42, None, 'hello']"
p.join()
需要說明的點是, Pipe()本身會回傳一組溝通用的pipe.
兩端的pipe其實沒有方向性,兩邊都可以send(), recv()。
以上兩種 (Queue和Pipe) 的特色是都不一定需要synchronization。
以下兩種 (Value和Array) 因為是利用ctypes來達到shared memory的,則需要synchronization來保護資料的正確與否(不是process-safe),預設上會有鎖去保護資料,確保同步完畢才能取值。
如果不想用lock或是想自己指定lock,那麼可以在初始化的時候給定
Value( ..., lock=False),
或lock = multiprocessing.Lock; Value( ..., lock=lock).
底下是修改過後的官方的example, 綜合示範了Value, Array。和要怎麼利用ctypes的特性自己宣告一個Struct來包裝傳遞的資料。
from multiprocessing import Process, Value, Array
from ctypes import Structure, c_double
class Point(Structure):
_fields_ = [('x', c_double), ('y', c_double)]
def modify(n, x, s, A):
n.value **= 2
x.value **= 2
s.value = s.value.upper()
for a in A:
a.x **= 2
a.y **= 2
if __name__ == '__main__':
lock = Lock()
n = Value('i', 7)
x = Value(c_double, 1.0/3.0, lock=False)
s = Array('c', b'hello world')
A = Array(Point, [(1.875,-6.25), (-5.75,2.0), (2.375,9.5)])
p = Process(target=modify, args=(n, x, s, A))
p.start()
p.join()
print(n.value)
print(x.value)
print(s.value)
print([(a.x, a.y) for a in A])
還有一些關於manager的東西,這次就省略不談吧...
至少有以上幾種方法目前應該很夠用了吧QQ
參考資料:
- 官方文件: multiprocessing, 非常詳細講述裡面的物件和參數。極度建議在實際編寫前看過裡頭的17.2.3. Programming guidelines章節 以及想使用的各個物件!
- Inside the Python GIL, 這篇詳細的描述python的GIL機制如何造成多核心的機器反而效能更差(3.2版以前)。
備註:
可惡,想不到python的threading不是正常的threading,假的。
為了查什麼是GIL就查了好多資料和拖慢速度的原因就查了好久,找替代方案又爬了一堆文。
原本以為Threading很熟了,可以看很快,這次應該可以在過夜前打完。
果然人算不如天算(淚)...
而且multiprocessing又有一些感覺很重要,但寫出來太拖篇幅的東西。
與其寫出來,不如直接翻官方文件...但這樣感覺又失去紀錄重點和筆記的目標了。
不過老實說昨天太累直接從晚上8點半睡到早上11點半...所以昨天的文也沒發。
這次的東西很多(雖然我還是省略的一堆東西),這次就自己放過自己吧~(咦)