Pythonにおける並行処理の紹介
Pythonにおける並行処理とは、複数のタスクを同時に実行し、利用可能なリソースを最大限に活用する能力のことです。CPUバウンドやI/Oバウンドのアプリケーションの性能向上に必要不可欠です。Pythonには、threading
、multiprocessing
、concurrent.futures
などのモジュールを使用して並行処理を実現する方法が提供されています。
threadingモジュール
threading
モジュールを使用すると、Pythonでスレッドを作成および管理することができます。スレッドは軽量で、親プロセスと同じメモリ空間を共有するため、I/Oバウンドタスクに適しています。
スレッドの作成
スレッドを作成するには、threading
モジュールのThread
クラスを使用できます。以下は、2つのスレッドを作成して開始する例です。
import threading
def print_numbers():
for i in range(1, 6):
print(f"Number: {i}")
def print_letters():
for letter in 'abcde':
print(f"Letter: {letter}")
# スレッドの作成
thread1 = threading.Thread(target=print_numbers)
thread2 = threading.Thread(target=print_letters)
# スレッドの開始
thread1.start()
thread2.start()
# スレッドの完了を待つための結合
thread1.join()
thread2.join()
print("All threads are done.")
スレッド同期
競合状態を避けるために、ロックなどのスレッド同期メカニズムを使用することができます。次の例は、共有リソースを保護するためにロックを使用する方法を示しています。
import threading
counter = 0
counter_lock = threading.Lock()
def increment_counter():
global counter
with counter_lock:
counter += 1
threads = [threading.Thread(target=increment_counter) for _ in range(100)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
print(f"Counter value: {counter}")
デーモンスレッド
デーモンスレッドとは、メインプログラムが終了すると自動的に終了するバックグラウンドスレッドのことです。デーモンスレッドを作成するには、daemon
属性をTrue
に設定します。
daemon_thread = threading.Thread(target=some_function, daemon=True)
daemon_thread.start()
スレッド間通信
スレッドは、共有データ構造を使用するか、Queue
などの高レベルの同期オブジジェクトを使用して通信することができます。
import threading
import queue
def worker(q):
while True:
item = q.get()
if item is None:
break
print(f"Processing: {item}")
q.task_done()
work_queue = queue.Queue()
for i in range(10):
work_queue.put(i)
worker_thread = threading.Thread(target=worker, args=(work_queue,))
worker_thread.start()
work_queue.join()
work_queue.put(None)
worker_thread.join()
Multiprocessingモジュール
multiprocessing
モジュールを使用すると、プロセスを使用して並列処理をサポートできます。プロセスは独自のメモリスペースを持つため、複数のCPUコアを完全に活用することができ、CPUバウンドタスクに適しています。
プロセスの作成
プロセスを作成するには、multiprocessing
モジュールのProcess
クラスを使用します。
import multiprocessing
def square(x):
print(f"{x} squared is {x*x}")
processes = [multiprocessing.Process(target=square, args=(i,)) for i in range(5)]
for process in processes:
process.start()
for process in processes:
process.join()
print("All processes are done.")
プロセス同期
multiprocessing
モジュールからのLock
やSemaphore
などの同期プリミティブを使用して、共有リソースを保護することができます。
import multiprocessing
counter = multiprocessing.Value('i', 0)
counter_lock = multiprocessing.Lock()
def increment_counter(counter, counter_lock):
with counter_lock:
counter.value += 1
processes = [multiprocessing.Process(target=increment_counter, args=(counter, counter_lock)) for _ in range(100)]
for process in processes:
process.start()
for process in processes:
process.join()
print(f"Counter value: {counter.value}")
プロセス間通信
multiprocessing
モジュールからのPipe
やQueue
などを使用して、プロセス間通信を行うことができます。
import multiprocessing
def square(x, output_queue):
output_queue.put(x * x)
input_data = [1, 2, 3, 4, 5]
output_queue = multiprocessing.Queue()
processes = [multiprocessing.Process(target=square, args=(i, output_queue)) for i in input_data]
for process in processes:
process.start()
for process in processes:
process.join()
while not output_queue.empty():
print(f"Squared value: {output_queue.get()}")
Concurrent.futuresモジュール
concurrent.futures
モジュールは、スレッドまたはプロセスを使用して非同期に呼び出し可能なオブジェクトを実行するための高レベルなインターフェイスを提供します。
ThreadPoolExecutor
ThreadPoolExecutor
は、タスクをスレッドプールに送信するための高レベルなインターフェースです。
import concurrent.futures
def square(x):
return x * x
with concurrent.futures.ThreadPoolExecutor() as executor:
results = executor.map(square, range(1, 6))
for result in results:
print(f"Squared value: {result}")
ProcessPoolExecutor
ProcessPoolExecutor
は、タスクをプロセスプールに送信するための高レベルなインターフェースです。
import concurrent.futures
def square(x):
return x * x
with concurrent.futures.ProcessPoolExecutor() as executor:
results = executor.map(square, range(1, 6))
for result in results:
print(f"Squared value: {result}")
Futureオブジェクト
Future
オブジェクトは、まだ完了していない計算の結果をカプセル化するものです。これらは、タスクの進行状況を監視するために実行子と一緒に使用できます。
import concurrent.futures
def square(x):
return x * x
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [executor.submit(square, i) for i in range(1, 6)]
for future in concurrent.futures.as_completed(futures):
print(f"Squared value: {future.result()}")
ユースケース: S3
Amazon S3(Simple Storage Service)は広く使用されているクラウドストレージサービスです。S3からファイルを並列ダウンロードするには、boto3
ライブラリとconcurrent.futures
モジュールを使用できます。
import boto3
import concurrent.futures
s3 = boto3.client("s3")
def download_file(bucket, key, local_filename):
print(f"Downloading {key} from {bucket}")
s3.download_file(bucket, key, local_filename)
print(f"Downloaded {key} from {bucket} to {local_filename}")
bucket_name = "your-bucket-name"
keys = ["file1.txt", "file2.txt", "file3.txt"]
local_filenames = ["downloaded_file1.txt", "downloaded_file2.txt", "downloaded_file3.txt"]
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [executor.submit(download_file, bucket_name, key, local_filename) for key, local_filename in zip(keys, local_filenames)]
for future in concurrent.futures.as_completed(futures):
print(f"Completed download: {future.result()}")
S3バケットにファイルを並列アップロードするには、前と同じライブラリであるboto3
とconcurrent.futures
を使用できます。次に、S3バケットにファイルを並列アップロードする方法の例を示します。
import boto3
import concurrent.futures
s3 = boto3.client("s3")
def upload_file(bucket, key, local_filename):
print(f"Uploading {local_filename} to {bucket}/{key}")
with open(local_filename, "rb") as file:
s3.upload_fileobj(file, bucket, key)
print(f"Uploaded {local_filename} to {bucket}/{key}")
bucket_name = "your-bucket-name"
keys = ["uploaded_file1.txt", "uploaded_file2.txt", "uploaded_file3.txt"]
local_filenames = ["file1.txt", "file2.txt", "file3.txt"]
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [executor.submit(upload_file, bucket_name, key, local_filename) for key, local_filename in zip(keys, local_filenames)]
for future in concurrent.futures.as_completed(futures):
print(f"Completed upload: {future.result()}")
両方の例では、タスクがI/O-boundであるため、concurrent.futures
モジュールからThreadPoolExecutor
を使用しました。boto3
ライブラリは実際のS3操作を処理し、concurrent.futures
モジュールは並列処理を管理します。
パフォーマンスの考慮事項と比較
スレッドは軽量であり、I/O-boundタスクに適していますが、CPythonでのグローバルインタープリタロック(GIL)の制限に苦しみます。これにより、複数のスレッドがPythonバイトコードを同時に実行することができなくなり、CPU-boundタスクに対して最適なパフォーマンスを提供できない可能性があります。
マルチプロセッシングは、独自のメモリスペースを持つ別々のプロセスを使用することにより、GILの制限を克服し、CPU-boundタスクに適しています。ただし、プロセス間通信やデータのシリアル化など、追加のオーバーヘッドが発生する可能性があります。
concurrent.futures
は、threading
およびmultiprocessing
モジュールを使用する際の複雑さを抽象化し、スレッドとプロセスの管理のための高レベルなインターフェースを提供します。
並列処理モデルを選択する際には、タスクの性質(I/O-boundまたはCPU-bound)とスレッド/プロセスのオーバーヘッドと潜在的なパフォーマンス改善のトレードオフを考慮する必要があります。
参考