Traffine I/O

日本語

2023-03-31

Pythonにおける並行処理

Pythonにおける並行処理の紹介

Pythonにおける並行処理とは、複数のタスクを同時に実行し、利用可能なリソースを最大限に活用する能力のことです。CPUバウンドやI/Oバウンドのアプリケーションの性能向上に必要不可欠です。Pythonには、threadingmultiprocessingconcurrent.futuresなどのモジュールを使用して並行処理を実現する方法が提供されています。

threadingモジュール

threadingモジュールを使用すると、Pythonでスレッドを作成および管理することができます。スレッドは軽量で、親プロセスと同じメモリ空間を共有するため、I/Oバウンドタスクに適しています。

スレッドの作成

スレッドを作成するには、threadingモジュールのThreadクラスを使用できます。以下は、2つのスレッドを作成して開始する例です。

python
import threading

def print_numbers():
    for i in range(1, 6):
        print(f"Number: {i}")

def print_letters():
    for letter in 'abcde':
        print(f"Letter: {letter}")

# スレッドの作成
thread1 = threading.Thread(target=print_numbers)
thread2 = threading.Thread(target=print_letters)

# スレッドの開始
thread1.start()
thread2.start()

# スレッドの完了を待つための結合
thread1.join()
thread2.join()

print("All threads are done.")

スレッド同期

競合状態を避けるために、ロックなどのスレッド同期メカニズムを使用することができます。次の例は、共有リソースを保護するためにロックを使用する方法を示しています。

python
import threading

counter = 0
counter_lock = threading.Lock()

def increment_counter():
    global counter
    with counter_lock:
        counter += 1

threads = [threading.Thread(target=increment_counter) for _ in range(100)]

for thread in threads:
    thread.start()

for thread in threads:
    thread.join()

print(f"Counter value: {counter}")

デーモンスレッド

デーモンスレッドとは、メインプログラムが終了すると自動的に終了するバックグラウンドスレッドのことです。デーモンスレッドを作成するには、daemon属性をTrueに設定します。

python
daemon_thread = threading.Thread(target=some_function, daemon=True)
daemon_thread.start()

スレッド間通信

スレッドは、共有データ構造を使用するか、Queueなどの高レベルの同期オブジジェクトを使用して通信することができます。

python
import threading
import queue

def worker(q):
    while True:
        item = q.get()
        if item is None:
            break
        print(f"Processing: {item}")
        q.task_done()

work_queue = queue.Queue()

for i in range(10):
    work_queue.put(i)

worker_thread = threading.Thread(target=worker, args=(work_queue,))
worker_thread.start()

work_queue.join()
work_queue.put(None)
worker_thread.join()

Multiprocessingモジュール

multiprocessingモジュールを使用すると、プロセスを使用して並列処理をサポートできます。プロセスは独自のメモリスペースを持つため、複数のCPUコアを完全に活用することができ、CPUバウンドタスクに適しています。

プロセスの作成

プロセスを作成するには、multiprocessingモジュールのProcessクラスを使用します。

python
import multiprocessing

def square(x):
    print(f"{x} squared is {x*x}")

processes = [multiprocessing.Process(target=square, args=(i,)) for i in range(5)]

for process in processes:
    process.start()

for process in processes:
    process.join()

print("All processes are done.")

プロセス同期

multiprocessingモジュールからのLockSemaphoreなどの同期プリミティブを使用して、共有リソースを保護することができます。

python
import multiprocessing

counter = multiprocessing.Value('i', 0)
counter_lock = multiprocessing.Lock()

def increment_counter(counter, counter_lock):
    with counter_lock:
        counter.value += 1

processes = [multiprocessing.Process(target=increment_counter, args=(counter, counter_lock)) for _ in range(100)]

for process in processes:
    process.start()

for process in processes:
    process.join()

print(f"Counter value: {counter.value}")

プロセス間通信

multiprocessingモジュールからのPipeQueueなどを使用して、プロセス間通信を行うことができます。

python
import multiprocessing

def square(x, output_queue):
    output_queue.put(x * x)

input_data = [1, 2, 3, 4, 5]
output_queue = multiprocessing.Queue()

processes = [multiprocessing.Process(target=square, args=(i, output_queue)) for i in input_data]

for process in processes:
    process.start()

for process in processes:
    process.join()

while not output_queue.empty():
    print(f"Squared value: {output_queue.get()}")

Concurrent.futuresモジュール

concurrent.futuresモジュールは、スレッドまたはプロセスを使用して非同期に呼び出し可能なオブジェクトを実行するための高レベルなインターフェイスを提供します。

ThreadPoolExecutor

ThreadPoolExecutorは、タスクをスレッドプールに送信するための高レベルなインターフェースです。

python
import concurrent.futures

def square(x):
    return x * x

with concurrent.futures.ThreadPoolExecutor() as executor:
    results = executor.map(square, range(1, 6))

for result in results:
    print(f"Squared value: {result}")

ProcessPoolExecutor

ProcessPoolExecutorは、タスクをプロセスプールに送信するための高レベルなインターフェースです。

python
import concurrent.futures

def square(x):
    return x * x

with concurrent.futures.ProcessPoolExecutor() as executor:
    results = executor.map(square, range(1, 6))

for result in results:
    print(f"Squared value: {result}")

Futureオブジェクト

Futureオブジェクトは、まだ完了していない計算の結果をカプセル化するものです。これらは、タスクの進行状況を監視するために実行子と一緒に使用できます。

python
import concurrent.futures

def square(x):
    return x * x

with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = [executor.submit(square, i) for i in range(1, 6)]

for future in concurrent.futures.as_completed(futures):
    print(f"Squared value: {future.result()}")

ユースケース: S3

Amazon S3(Simple Storage Service)は広く使用されているクラウドストレージサービスです。S3からファイルを並列ダウンロードするには、boto3ライブラリとconcurrent.futuresモジュールを使用できます。

python
import boto3
import concurrent.futures

s3 = boto3.client("s3")

def download_file(bucket, key, local_filename):
    print(f"Downloading {key} from {bucket}")
    s3.download_file(bucket, key, local_filename)
    print(f"Downloaded {key} from {bucket} to {local_filename}")

bucket_name = "your-bucket-name"
keys = ["file1.txt", "file2.txt", "file3.txt"]
local_filenames = ["downloaded_file1.txt", "downloaded_file2.txt", "downloaded_file3.txt"]

with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = [executor.submit(download_file, bucket_name, key, local_filename) for key, local_filename in zip(keys, local_filenames)]

for future in concurrent.futures.as_completed(futures):
    print(f"Completed download: {future.result()}")

S3バケットにファイルを並列アップロードするには、前と同じライブラリであるboto3concurrent.futuresを使用できます。次に、S3バケットにファイルを並列アップロードする方法の例を示します。

python
import boto3
import concurrent.futures

s3 = boto3.client("s3")

def upload_file(bucket, key, local_filename):
    print(f"Uploading {local_filename} to {bucket}/{key}")
    with open(local_filename, "rb") as file:
        s3.upload_fileobj(file, bucket, key)
    print(f"Uploaded {local_filename} to {bucket}/{key}")

bucket_name = "your-bucket-name"
keys = ["uploaded_file1.txt", "uploaded_file2.txt", "uploaded_file3.txt"]
local_filenames = ["file1.txt", "file2.txt", "file3.txt"]

with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = [executor.submit(upload_file, bucket_name, key, local_filename) for key, local_filename in zip(keys, local_filenames)]

for future in concurrent.futures.as_completed(futures):
    print(f"Completed upload: {future.result()}")

両方の例では、タスクがI/O-boundであるため、concurrent.futuresモジュールからThreadPoolExecutorを使用しました。boto3ライブラリは実際のS3操作を処理し、concurrent.futuresモジュールは並列処理を管理します。

パフォーマンスの考慮事項と比較

スレッドは軽量であり、I/O-boundタスクに適していますが、CPythonでのグローバルインタープリタロック(GIL)の制限に苦しみます。これにより、複数のスレッドがPythonバイトコードを同時に実行することができなくなり、CPU-boundタスクに対して最適なパフォーマンスを提供できない可能性があります。

マルチプロセッシングは、独自のメモリスペースを持つ別々のプロセスを使用することにより、GILの制限を克服し、CPU-boundタスクに適しています。ただし、プロセス間通信やデータのシリアル化など、追加のオーバーヘッドが発生する可能性があります。

concurrent.futuresは、threadingおよびmultiprocessingモジュールを使用する際の複雑さを抽象化し、スレッドとプロセスの管理のための高レベルなインターフェースを提供します。

並列処理モデルを選択する際には、タスクの性質(I/O-boundまたはCPU-bound)とスレッド/プロセスのオーバーヘッドと潜在的なパフォーマンス改善のトレードオフを考慮する必要があります。

参考

https://docs.python.org/3/library/threading.html
https://realpython.com/intro-to-python-threading/
https://www.pythontutorial.net/python-concurrency/python-threading/

Ryusei Kakujo

researchgatelinkedingithub

Focusing on data science for mobility

Bench Press 100kg!