[Python] マルチプロセスな処理を実装して、処理を高速化する
こんにちは、@yoheiMuneです。
Pythonでは、multiprocessingモジュールを利用して、簡単にマルチプロセスプログラミングを行うことができます。今回はプロセスを複数立ち上げる実装方法を、ブログに書きたいと思います。
Pythonの場合、グローバルインタプリタロック(GIL)という仕組みがあり、同一プロセス上では(例えマルチスレッドだったとしても)同時に1つの処理しか並列して行われません(その恩恵として、変数のスレッドセーフが保たれる)。ただこの仕組みのため、マルチコアなPCなどでは思うようにパフォーマンスが出ない場合があります(特にCPUバウンドな処理の場合)。
プロセスを分けるとGILの仕組みから解放されるため、CPUがマルチコアな場合にマシン性能を生かすことができます。今日はその、マルチプロセスなお話です。
17.2. multiprocessing — プロセスベースの並列処理 — Python 3.6.1 ドキュメント
最後になりますが本ブログでは、Python・Go言語・Linux・Node.js・フロントエンド・インフラ・開発関連・Swift・Java・機械学習など雑多に情報発信をしていきます。自分の第2の脳にすべく、情報をブログに貯めています。気になった方は、本ブログのRSSやTwitterをフォローして頂けると幸いです ^ ^。
最後までご覧頂きましてありがとうございました!
Pythonでは、multiprocessingモジュールを利用して、簡単にマルチプロセスプログラミングを行うことができます。今回はプロセスを複数立ち上げる実装方法を、ブログに書きたいと思います。
目次
Pythonで並列処理を実装するには
多くのプログラミング言語では並列処理の実装方法として、マルチスレッドとマルチプロセスの2つが提供されています。マルチスレッドはプロセス1つでスレッドを複数にする方法で、マルチプロセスはプロセスを複数立ち上げる(ps aux
などで複数見える)方法です。Pythonの場合、グローバルインタプリタロック(GIL)という仕組みがあり、同一プロセス上では(例えマルチスレッドだったとしても)同時に1つの処理しか並列して行われません(その恩恵として、変数のスレッドセーフが保たれる)。ただこの仕組みのため、マルチコアなPCなどでは思うようにパフォーマンスが出ない場合があります(特にCPUバウンドな処理の場合)。
プロセスを分けるとGILの仕組みから解放されるため、CPUがマルチコアな場合にマシン性能を生かすことができます。今日はその、マルチプロセスなお話です。
準備
Pythonにおけるマルチプロセスな実装はmultiprocessingモジュールを用いますが、これは標準モジュールなので、以下のインポートするだけで利用できます。import multiprocessing
サブプロセスを作成して、マルチプロセスに処理を実行する
multiprocessing.Process
を用いると、指定した関数をマルチプロセスで処理することができます。from multiprocessing import Process # 呼び出したい関数 def f1(name): print("Hello", name) print("Sleeping... 3s") time.sleep(3) print("Good morning", name) if __name__ == "__main__": # サブプロセスを作成します p = Process(target=f1, args=("Bob",)) # 開始します p.start() print("Process started.") # サブプロセス終了まで待ちます p.join() print("Process joined.")上記を実行すると以下のような出力となり、
f1
の中でsleepする処理も含め、メインプロセスがそれを待つようになっています。Process started. Hello Bob Sleeping... 3s Good morning Bob Process joined.
キューを用いて、プロセス間でデータのやり取りを行う
キュー(Queue)を用いることで、安全に(プロセスセーフでかつスレッドセーフに)値の受け渡しを行うことができます。from multiprocessing import Queue def f2(q): time.sleep(3) # 3秒後に、キューに値を渡します. q.put([42, None, "Hello"]) if __name__ == "__main__": # スレッド間でやり取りするためのキューを作成します. q = Queue() # キューを引数に渡して、サブプロセスを作成します. p = Process(target=f2, args=(q,)) # サブプロセスを開始します. p.start() # q.get()できるまで待ちます. print(q.get()) # サブプロセス完了を待ちます. p.join()上記を実行すると以下のような出力となります。
[42, None, 'Hello']
パイプを用いて、プロセス間でデータのやり取りを行う
キューと同じように、パイプ(Pipe)を使うことでもプロセス間で値のやり取りをすることができます。ただし、パイプの両端で自由に読み書きをすると、データが破損する可能性があるので、基本的にはどちらかを読み取り専用、もう一方を書き込み専用に使うと良いです(異なるプロセス/スレッドが同じ端で同時に読み書きすると、データ破損する可能性があり、このバグは毎回再現するわけではないので、デバッグが非常に大変です・・)。from multiprocessing import Pipe def f3(conn): time.sleep(3) # パイプにデータを送信します. conn.send({ "age" : 30, "name" : "Yohei" }) # パイプをクローズします. conn.close() # クローズ後に書き込もうとすると、エラーになります(OSError: handle is closed). # conn.send([42, None, "Hello"]) if __name__ == "__main__": # Pipeを生成します(デフォルトでは双方向にやり取りできるパイプ) # 双方向にやり取りできますが、両端で自由に読み書きしているとデータが壊れる可能性があるので、 # 基本的にはどちらかを書き込み専用、どちらかを読み込み専用に扱います. parent_conn, child_conn = Pipe() # Pipeの片方の端を、サブプロセスに渡します. p = Process(target=f3, args=(child_conn,)) # サブプロセスを開始します. p.start() # Pipeから値が取得できるまで待ちます. print(parent_conn.recv()) # サブプロセスの終了を待ちます. p.join()上記を実行すると、以下のように出力されます。
{'age': 30, 'name': 'Yohei'}
共有メモリを用いて、プロセス間でデータを共有する
今まではキューやパイプという仕組みを通してデータをやり取りする方法でしたが、共有メモリ(Shared memory)を使うことでプロセスセーフ(スレッドセーフ)に変数自体を共有することができます。from multiprocessing import Value, Array def f5(n, a): n.value = 3.1415926 for i in range(len(a)): a[i] *= -1 if __name__ == "__main__": # 共有メモリ(Value)を作成します. num = Value('d', 0.0) # 共有メモリ(Array)を作成します. arr = Array('i', range(10)) # サブプロセスを作り、実行します. p = Process(target=f5, args=(num, arr)) p.start() p.join() # 共有メモリ(Value)から値を取り出します print(num.value) # 共有メモリ(Array)から値を取り出します print(arr[:])上記を実行すると以下のような出力にされます。
3.1415926 [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
マネージャーを用いて、プロセス間でデータを共有する
共有メモリと似ていますが、マネージャーを用いることでもプロセス間の状態の共有を行うことができます。共有メモリよりも扱えるデータ型が多いです(辞書型なども扱える)。from multiprocessing import Manager def f6(d, l): # 辞書型に値を詰め込みます. d[1] = '1' d["2"] = 2 d[0.25] = None # 配列を操作します(ここでは逆順に). l.reverse() if __name__ == "__main__": # マネージャーを生成します. with Manager() as manager: # マネージャーから辞書型を生成します. d = manager.dict() # マネージャーから配列を生成します. l = manager.list(range(10)) # サブプロセスを作り実行します. p = Process(target=f6, args=(d,l)) p.start() p.join() # 辞書からデータを取り出します. print(d) # 配列からデータを取り出します. print(l)上記を実行すると以下の通りです。
{1: '1', '2': 2, 0.25: None} [9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
Lockを用いて、実行順を制御する
ロック(Lock)が提供するロックの取得と解放の機能を使うことで、処理順を制御することができます。from multiprocessing import Lock def f4(lock, i): # ロックを取得します. lock.acquire() # ロック中は、他のプロセスやスレッドがロックを取得できません(ロックが解放されるまで待つ) try: print('Hello', i) finally: # ロックを解放します. lock.release() if __name__ == "__main__": # ロックを作成します. lock = Lock() for num in range(10): Process(target=f4, args=(lock, num)).start()上記を実行すると、以下のように出力されます。
Hello 0 Hello 1 Hello 2 Hello 3 Hello 4 Hello 5 Hello 6 Hello 7 Hello 8 Hello 9
プロセスプールを使って、サブプロセスを使い回す
プール(Pool)を用いることで、指定した個数のサブプロセスを作り、それらで処理をマルチプロセスに実行することができます。from multiprocessing import Pool, TimeoutError def f7(x): return x*x if __name__ == "__main__": # 4つのプロセスを開始します. with Pool(processes=4) as pool: # 0〜9の10個の値を、4つのプロセスで処理します. print(pool.map(f7, range(10))) # print: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81] # ひとつ上と似ていますが、プロセスの実行順が不定になります. for i in pool.imap_unordered(f7, range(10)): print(i) # print: 0, 1, 4, 16, 9, 25, 36, 49, 64, 81 # "f7(20)"という処理を非同期に実行します(利用するプロセスは1つのみ) res = pool.apply_async(f7, (20,)) # 処理結果が返ってくるまで、最大1秒間待機します print(res.get(timeout=1)) # print: 400 # 上記の仕組みを連続して呼ぶと、複数のプロセスで処理を行うことができます. multiple_results = [pool.apply_async(os.getpid, ()) for i in range(10)] results = [res.get(timeout=1) for res in multiple_results] # 10個出力されます print(results) # print : [37604, 37607, 37606, 37605, 37604, 37607, 37606, 37605, 37604, 37607] # ただしユニークにすると4つのみ(=プールしたプロセス数)であることがわかります print(set(results), len(set(results))) # {37604, 37605, 37606, 37607} 4 # タイムアウトが発生する場合のサンプルです res = pool.apply_async(time.sleep, (10,)) try: res.get(timeout=1) except TimeoutError as e: print("Timeout.....!!!", e)この仕組みを用いると、最大同時実行数などを制御しながら処理を行うことができます。
参考資料
Pythonのマルチプロセスは、以下のページを参考にしました。ありがとうございます。17.2. multiprocessing — プロセスベースの並列処理 — Python 3.6.1 ドキュメント
最後に
Pythonで処理の並列化を行いたい案件があり色々と調べていて、その結果をブログに残しました。Pythonは面白いですね。今後もブログを書いていきたいと思います。最後になりますが本ブログでは、Python・Go言語・Linux・Node.js・フロントエンド・インフラ・開発関連・Swift・Java・機械学習など雑多に情報発信をしていきます。自分の第2の脳にすべく、情報をブログに貯めています。気になった方は、本ブログのRSSやTwitterをフォローして頂けると幸いです ^ ^。
最後までご覧頂きましてありがとうございました!