Day 3. multiprocessing (& threading)

前一篇提到了subprocess, 難免會想到多執行緒的問題,所以起初只是想整理關於python如何開啟thread的方法。
然而從官方的文件查下去看,突然發現一個不認識的東西:GIL(Global Interpreter lock).
這東西一次只允許一個thread工作,隔一段規定的時間後才會把這個鎖釋放出來讓其他的thread去拿。搶到這個鎖的thread可以開始工作,而其他的thread繼續沈睡。
這個機制可能導致多執行緒的作業反而效率變得更差(即使是多核心的電腦)。

之所以有GIL,是在實作CPython編譯器時,基於某些考量以及編譯器的實作難度才做的機制(見:官方的說明)。在文中也提到過去曾經有過把GIL機制拿掉後的結果:雖然multi-thread的速度增加,但對於一般single processor的情況效能變得更糟了。而若是要克服這個問題,則會使編譯器變得過度複雜而放棄。

儘管從3.2版開始,GIL的實作機制有所改變(見:What's New in 3.2),但依據個人測試的結果來看,在多核心的機器上跑multi-thread的效能依舊會變得比不做還差(儘管沒有像網路上其他人的情況這麼糟):
[圖1] Single thread.

[圖2] Multi-thread.


如果想要用多執行緒,但想要避開GIL, 有以下幾種方法:

這裡決定採用multiprocessing的方法。

multiprocessing

由於multiprocessing的內容相當多,底下盡量採取條列重點式的呈現法。
某種程度上能看成簡化、中文化的官方文件條目。

multiprocessing有兩種方式分配工作:Process和Pool.

Process:

(和threading的操作方式幾乎一樣,把multiprocess換成threading, Process換成Thread就是了。唯一的不同點是需要檢查 name是不是等於main.)

from multiprocessing import Process    
import os    

def f():    
    print('my pid: ', os.getpid())    

if __name__ == '__main__':    
    for i in range(4):    
        p = Process(target=f, args=())    
        p.start()    
        p.join()

Pool:

from multiprocessing import Pool  
import os  

def f(x):  
    return x*x  

if __name__ == '__main__':  
    pool = Pool()  
    print( pool.map(f, range(10)) )  

    multi_res = [ pool.apply_async(os.getpid, ()) for i in range(4) ]  
    print( [res.get() for res in multi_res] )  

    pool.close() # close只是關閉pool, 但已開啟的process還是會繼續執行  
    pool.join()   # 以這個例子來說其實都已經結束了,所以這裡不需要join.

Pool如果不指定worker process的個數,預設是能使用的cpu核心數。
Pool和Process不同,不需要呼叫start之類的function,因為worker process被分配到工作的瞬間就開始運作了。
Pool.map(func, iterable, ...):
後面代入的iterable會被拆成幾個部分平行去做。
Pool.apply_async(func, args, ...):
會回傳result object, 要在用get()去等待、取得結果。
如果怕執行時間過久,可以設定get( timeout=sec )去限制執行時間上限。
可以搭配with使用:with Pool(4) as pool, 離開時會自動close


交換資訊有相當多的方法可以使用: Pipe, Queue, Array, Value(都是multiprocessing底下的)

Queue: (下面的code來自官方說明文件)

用法和queue.Queue幾乎相同。

from multiprocessing import Process, Queue  

def f(q):  
    q.put([42, None, 'hello'])  

if __name__ == '__main__':  
    q = Queue()  
    p = Process(target=f, args=(q,))  
    p.start()  
    print(q.get())    # prints "[42, None, 'hello']"  
    p.join()

和一般的queue一樣的操作方法。
如果沒有特別的需求,也可以考慮使用SimpleQueue, 裡頭只有empty(), get(), put(item)三種方法。

Pipe: (code同樣來自官方文件)

from multiprocessing import Process, Pipe  

def f(conn):  
    conn.send([42, None, 'hello'])  
    conn.close()  

if __name__ == '__main__':  
    parent_conn, child_conn = Pipe()  
    p = Process(target=f, args=(child_conn,))  
    p.start()  
    print(parent_conn.recv())   # prints "[42, None, 'hello']"  
    p.join()

需要說明的點是, Pipe()本身會回傳一組溝通用的pipe.
兩端的pipe其實沒有方向性,兩邊都可以send(), recv()。


以上兩種 (Queue和Pipe) 的特色是都不一定需要synchronization。
以下兩種 (Value和Array) 因為是利用ctypes來達到shared memory的,則需要synchronization來保護資料的正確與否(不是process-safe),預設上會有鎖去保護資料,確保同步完畢才能取值。

如果不想用lock或是想自己指定lock,那麼可以在初始化的時候給定
Value( ..., lock=False),
或lock = multiprocessing.Lock; Value( ..., lock=lock).
底下是修改過後的官方的example, 綜合示範了Value, Array。和要怎麼利用ctypes的特性自己宣告一個Struct來包裝傳遞的資料。

from multiprocessing import Process, Value, Array  
from ctypes import Structure, c_double  

class Point(Structure):  
    _fields_ = [('x', c_double), ('y', c_double)]  

def modify(n, x, s, A):  
    n.value **= 2  
    x.value **= 2  
    s.value = s.value.upper()  
    for a in A:  
        a.x **= 2  
        a.y **= 2  

if __name__ == '__main__':  
    lock = Lock()  

    n = Value('i', 7)  
    x = Value(c_double, 1.0/3.0, lock=False)  
    s = Array('c', b'hello world')  
    A = Array(Point, [(1.875,-6.25), (-5.75,2.0), (2.375,9.5)])  

    p = Process(target=modify, args=(n, x, s, A))  
    p.start()  
    p.join()  

    print(n.value)  
    print(x.value)  
    print(s.value)  
    print([(a.x, a.y) for a in A])

還有一些關於manager的東西,這次就省略不談吧...
至少有以上幾種方法目前應該很夠用了吧QQ

參考資料:

  • 官方文件: multiprocessing, 非常詳細講述裡面的物件和參數。極度建議在實際編寫前看過裡頭的17.2.3. Programming guidelines章節 以及想使用的各個物件!
  • Inside the Python GIL, 這篇詳細的描述python的GIL機制如何造成多核心的機器反而效能更差(3.2版以前)。

備註:

可惡,想不到python的threading不是正常的threading,假的
為了查什麼是GIL就查了好多資料和拖慢速度的原因就查了好久,找替代方案又爬了一堆文。
原本以為Threading很熟了,可以看很快,這次應該可以在過夜前打完。
果然人算不如天算(淚)...

而且multiprocessing又有一些感覺很重要,但寫出來太拖篇幅的東西。
與其寫出來,不如直接翻官方文件...但這樣感覺又失去紀錄重點和筆記的目標了。

不過老實說昨天太累直接從晚上8點半睡到早上11點半...所以昨天的文也沒發。
這次的東西很多(雖然我還是省略的一堆東西),這次就自己放過自己吧~(咦)

results matching ""

    No results matching ""