2017/06/01更新

[Python] マルチプロセスな処理を実装して、処理を高速化する

このエントリーをはてなブックマークに追加            

こんにちは、@yoheiMuneです。
Pythonでは、multiprocessingモジュールを利用して、簡単にマルチプロセスプログラミングを行うことができます。今回はプロセスを複数立ち上げる実装方法を、ブログに書きたいと思います。
画像

目次




Pythonで並列処理を実装するには

多くのプログラミング言語では並列処理の実装方法として、マルチスレッドとマルチプロセスの2つが提供されています。マルチスレッドはプロセス1つでスレッドを複数にする方法で、マルチプロセスはプロセスを複数立ち上げる(ps auxなどで複数見える)方法です。

Pythonの場合、グローバルインタプリタロック(GIL)という仕組みがあり、同一プロセス上では(例えマルチスレッドだったとしても)同時に1つの処理しか並列して行われません(その恩恵として、変数のスレッドセーフが保たれる)。ただこの仕組みのため、マルチコアなPCなどでは思うようにパフォーマンスが出ない場合があります(特にCPUバウンドな処理の場合)。

プロセスを分けるとGILの仕組みから解放されるため、CPUがマルチコアな場合にマシン性能を生かすことができます。今日はその、マルチプロセスなお話です。



準備

Pythonにおけるマルチプロセスな実装はmultiprocessingモジュールを用いますが、これは標準モジュールなので、以下のインポートするだけで利用できます。
import multiprocessing



サブプロセスを作成して、マルチプロセスに処理を実行する

multiprocessing.Processを用いると、指定した関数をマルチプロセスで処理することができます。
from multiprocessing import Process

# 呼び出したい関数
def f1(name):
    print("Hello", name)
    print("Sleeping... 3s")
    time.sleep(3)
    print("Good morning", name)

if __name__ == "__main__":
    # サブプロセスを作成します
    p = Process(target=f1, args=("Bob",))
    # 開始します
    p.start()
    print("Process started.")
    # サブプロセス終了まで待ちます
    p.join()
    print("Process joined.")
上記を実行すると以下のような出力となり、f1の中でsleepする処理も含め、メインプロセスがそれを待つようになっています。
Process started.
Hello Bob
Sleeping... 3s
Good morning Bob
Process joined.



キューを用いて、プロセス間でデータのやり取りを行う

キュー(Queue)を用いることで、安全に(プロセスセーフでかつスレッドセーフに)値の受け渡しを行うことができます。
from multiprocessing import Queue

def f2(q):
    time.sleep(3)
    # 3秒後に、キューに値を渡します.
    q.put([42, None, "Hello"])

if __name__ == "__main__":
    # スレッド間でやり取りするためのキューを作成します.
    q = Queue()
    # キューを引数に渡して、サブプロセスを作成します.
    p = Process(target=f2, args=(q,))
    # サブプロセスを開始します.
    p.start()
    # q.get()できるまで待ちます.
    print(q.get())
    # サブプロセス完了を待ちます.
    p.join()
上記を実行すると以下のような出力となります。
[42, None, 'Hello']



パイプを用いて、プロセス間でデータのやり取りを行う

キューと同じように、パイプ(Pipe)を使うことでもプロセス間で値のやり取りをすることができます。ただし、パイプの両端で自由に読み書きをすると、データが破損する可能性があるので、基本的にはどちらかを読み取り専用、もう一方を書き込み専用に使うと良いです(異なるプロセス/スレッドが同じ端で同時に読み書きすると、データ破損する可能性があり、このバグは毎回再現するわけではないので、デバッグが非常に大変です・・)。
from multiprocessing import Pipe

def f3(conn):
    time.sleep(3)
    # パイプにデータを送信します.
    conn.send({ "age" : 30, "name" : "Yohei" })
    # パイプをクローズします.
    conn.close()

    # クローズ後に書き込もうとすると、エラーになります(OSError: handle is closed).
    # conn.send([42, None, "Hello"])

if __name__ == "__main__":
    # Pipeを生成します(デフォルトでは双方向にやり取りできるパイプ)
    # 双方向にやり取りできますが、両端で自由に読み書きしているとデータが壊れる可能性があるので、
    # 基本的にはどちらかを書き込み専用、どちらかを読み込み専用に扱います.
    parent_conn, child_conn = Pipe()
    # Pipeの片方の端を、サブプロセスに渡します.
    p = Process(target=f3, args=(child_conn,))
    # サブプロセスを開始します.
    p.start()
    # Pipeから値が取得できるまで待ちます.
    print(parent_conn.recv())
    # サブプロセスの終了を待ちます.
    p.join()
上記を実行すると、以下のように出力されます。
{'age': 30, 'name': 'Yohei'}



共有メモリを用いて、プロセス間でデータを共有する

今まではキューやパイプという仕組みを通してデータをやり取りする方法でしたが、共有メモリ(Shared memory)を使うことでプロセスセーフ(スレッドセーフ)に変数自体を共有することができます。
from multiprocessing import Value, Array

def f5(n, a):
    n.value = 3.1415926
    for i in range(len(a)):
        a[i] *= -1

if __name__ == "__main__":
    # 共有メモリ(Value)を作成します.
    num = Value('d', 0.0)
    # 共有メモリ(Array)を作成します.
    arr = Array('i', range(10))
    # サブプロセスを作り、実行します.
    p = Process(target=f5, args=(num, arr))
    p.start()
    p.join()
    # 共有メモリ(Value)から値を取り出します
    print(num.value)
    # 共有メモリ(Array)から値を取り出します
    print(arr[:])
上記を実行すると以下のような出力にされます。
3.1415926
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]



マネージャーを用いて、プロセス間でデータを共有する

共有メモリと似ていますが、マネージャーを用いることでもプロセス間の状態の共有を行うことができます。共有メモリよりも扱えるデータ型が多いです(辞書型なども扱える)。
from multiprocessing import Manager

def f6(d, l):
    # 辞書型に値を詰め込みます.
    d[1] = '1'
    d["2"] = 2
    d[0.25] = None
    # 配列を操作します(ここでは逆順に).
    l.reverse()

if __name__ == "__main__":
    # マネージャーを生成します.
    with Manager() as manager:
        # マネージャーから辞書型を生成します.
        d = manager.dict()
        # マネージャーから配列を生成します.
        l = manager.list(range(10))
        # サブプロセスを作り実行します.
        p = Process(target=f6, args=(d,l))
        p.start()
        p.join()
        # 辞書からデータを取り出します.
        print(d)
        # 配列からデータを取り出します.
        print(l)
上記を実行すると以下の通りです。
{1: '1', '2': 2, 0.25: None}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]



Lockを用いて、実行順を制御する

ロック(Lock)が提供するロックの取得と解放の機能を使うことで、処理順を制御することができます。
from multiprocessing import Lock

def f4(lock, i):
    # ロックを取得します.
    lock.acquire()
    # ロック中は、他のプロセスやスレッドがロックを取得できません(ロックが解放されるまで待つ)
    try:
        print('Hello', i)
    finally:
        # ロックを解放します.
        lock.release()

if __name__ == "__main__":
    # ロックを作成します.
    lock = Lock()
    for num in range(10):
        Process(target=f4, args=(lock, num)).start()
上記を実行すると、以下のように出力されます。
Hello 0
Hello 1
Hello 2
Hello 3
Hello 4
Hello 5
Hello 6
Hello 7
Hello 8
Hello 9



プロセスプールを使って、サブプロセスを使い回す

プール(Pool)を用いることで、指定した個数のサブプロセスを作り、それらで処理をマルチプロセスに実行することができます。
from multiprocessing import Pool, TimeoutError

def f7(x):
    return x*x

if __name__ == "__main__":
    # 4つのプロセスを開始します.
    with Pool(processes=4) as pool:

        # 0〜9の10個の値を、4つのプロセスで処理します.
        print(pool.map(f7, range(10)))
        # print: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

        # ひとつ上と似ていますが、プロセスの実行順が不定になります.
        for i in pool.imap_unordered(f7, range(10)):
            print(i)
            # print: 0, 1, 4, 16, 9, 25, 36, 49, 64, 81

        # "f7(20)"という処理を非同期に実行します(利用するプロセスは1つのみ)
        res = pool.apply_async(f7, (20,))
        # 処理結果が返ってくるまで、最大1秒間待機します
        print(res.get(timeout=1))
        # print: 400

        # 上記の仕組みを連続して呼ぶと、複数のプロセスで処理を行うことができます.
        multiple_results = [pool.apply_async(os.getpid, ()) for i in range(10)]
        results = [res.get(timeout=1) for res in multiple_results]
        # 10個出力されます
        print(results)
        # print : [37604, 37607, 37606, 37605, 37604, 37607, 37606, 37605, 37604, 37607]

        # ただしユニークにすると4つのみ(=プールしたプロセス数)であることがわかります
        print(set(results), len(set(results)))
        # {37604, 37605, 37606, 37607} 4

        # タイムアウトが発生する場合のサンプルです
        res = pool.apply_async(time.sleep, (10,))
        try:
            res.get(timeout=1)
        except TimeoutError as e:
            print("Timeout.....!!!", e)
この仕組みを用いると、最大同時実行数などを制御しながら処理を行うことができます。



参考資料

Pythonのマルチプロセスは、以下のページを参考にしました。ありがとうございます。

17.2. multiprocessing — プロセスベースの並列処理 — Python 3.6.1 ドキュメント



最後に

Pythonで処理の並列化を行いたい案件があり色々と調べていて、その結果をブログに残しました。Pythonは面白いですね。今後もブログを書いていきたいと思います。

最後になりますが本ブログでは、Python・Go言語・Linux・Node.js・フロントエンド・インフラ・開発関連・Swift・Java・機械学習など雑多に情報発信をしていきます。自分の第2の脳にすべく、情報をブログに貯めています。気になった方は、本ブログのRSSTwitterをフォローして頂けると幸いです ^ ^。

最後までご覧頂きましてありがとうございました!





こんな記事もいかがですか?

RSS画像

もしご興味をお持ち頂けましたら、ぜひRSSへの登録をお願い致します。