2023-03-31

Concurrency in Python

Introduction to Concurrency in Python

Concurrency in Python is the ability to execute multiple tasks simultaneously, making the best use of available resources. It's essential for improving the performance of CPU-bound and I/O-bound applications. Python provides several modules to achieve concurrency, including the threading, multiprocessing, and concurrent.futures modules.

The Threading Module

The threading module allows you to create and manage threads in Python. Threads are lightweight and share the same memory space as the parent process, making them suitable for I/O-bound tasks.

Creating Threads

To create a thread, you can use the Thread class from the threading module. Here's an example of creating and starting two threads:

python
import threading

def print_numbers():
    for i in range(1, 6):
        print(f"Number: {i}")

def print_letters():
    for letter in 'abcde':
        print(f"Letter: {letter}")

# Creating threads
thread1 = threading.Thread(target=print_numbers)
thread2 = threading.Thread(target=print_letters)

# Starting threads
thread1.start()
thread2.start()

# Joining threads to wait for their completion
thread1.join()
thread2.join()

print("All threads are done.")

Thread Synchronization

To avoid race conditions, you can use thread synchronization mechanisms such as locks. The following example demonstrates using a lock to protect a shared resource:

python
import threading

counter = 0
counter_lock = threading.Lock()

def increment_counter():
    global counter
    with counter_lock:
        counter += 1

threads = [threading.Thread(target=increment_counter) for _ in range(100)]

for thread in threads:
    thread.start()

for thread in threads:
    thread.join()

print(f"Counter value: {counter}")

Daemon Threads

Daemon threads are background threads that automatically terminate when the main program exits. To create a daemon thread, set the daemon attribute to True:

python
daemon_thread = threading.Thread(target=some_function, daemon=True)
daemon_thread.start()

Thread Communication

Threads can communicate using shared data structures or by using higher-level synchronization objects like Queue:

python
import threading
import queue

def worker(q):
    while True:
        item = q.get()
        if item is None:
            break
        print(f"Processing: {item}")
        q.task_done()

work_queue = queue.Queue()

for i in range(10):
    work_queue.put(i)

worker_thread = threading.Thread(target=worker, args=(work_queue,))
worker_thread.start()

work_queue.join()
work_queue.put(None)
worker_thread.join()

Multiprocessing Module

The multiprocessing module provides support for parallelism using processes. It's suitable for CPU-bound tasks, as processes can run in separate memory spaces and take full advantage of multiple CPU cores.

Creating Processes

To create a process, use the Process class from the multiprocessing module:

python
import multiprocessing

def square(x):
    print(f"{x} squared is {x*x}")

processes = [multiprocessing.Process(target=square, args=(i,)) for i in range(5)]

for process in processes:
    process.start()

for process in processes:
    process.join()

print("All processes are done.")

Process Synchronization

You can use synchronization primitives like Lock and Semaphore from the multiprocessing module to protect shared resources:

python
import multiprocessing

counter = multiprocessing.Value('i', 0)
counter_lock = multiprocessing.Lock()

def increment_counter(counter, counter_lock):
    with counter_lock:
        counter.value += 1

processes = [multiprocessing.Process(target=increment_counter, args=(counter, counter_lock)) for _ in range(100)]

for process in processes:
    process.start()

for process in processes:
    process.join()

print(f"Counter value: {counter.value}")

Inter-Process Communication

You can use Pipe or Queue from the multiprocessing module for inter-process communication:

python
import multiprocessing

def square(x, output_queue):
    output_queue.put(x * x)

input_data = [1, 2, 3, 4, 5]
output_queue = multiprocessing.Queue()

processes = [multiprocessing.Process(target=square, args=(i, output_queue)) for i in input_data]

for process in processes:
    process.start()

for process in processes:
    process.join()

while not output_queue.empty():
    print(f"Squared value: {output_queue.get()}")

The Concurrent.futures Module

The concurrent.futures module provides a high-level interface for asynchronously executing callables using threads or processes.

ThreadPoolExecutor

ThreadPoolExecutor is a high-level interface for submitting tasks to a thread pool:

python
import concurrent.futures

def square(x):
    return x * x

with concurrent.futures.ThreadPoolExecutor() as executor:
    results = executor.map(square, range(1, 6))

for result in results:
    print(f"Squared value: {result}")

ProcessPoolExecutor

ProcessPoolExecutor is a high-level interface for submitting tasks to a process pool:

python
import concurrent.futures

def square(x):
    return x * x

with concurrent.futures.ProcessPoolExecutor() as executor:
    results = executor.map(square, range(1, 6))

for result in results:
    print(f"Squared value: {result}")

Future Objects

Future objects encapsulate the result of a computation that may not have completed yet. They can be used with executors to monitor the progress of tasks:

python
import concurrent.futures

def square(x):
    return x * x

with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = [executor.submit(square, i) for i in range(1, 6)]

for future in concurrent.futures.as_completed(futures):
    print(f"Squared value: {future.result()}")

Use Case: S3

Amazon S3 (Simple Storage Service) is a widely-used cloud storage service. To download files concurrently from S3, you can use the boto3 library along with the concurrent.futures module.

python
import boto3
import concurrent.futures

s3 = boto3.client("s3")

def download_file(bucket, key, local_filename):
    print(f"Downloading {key} from {bucket}")
    s3.download_file(bucket, key, local_filename)
    print(f"Downloaded {key} from {bucket} to {local_filename}")

bucket_name = "your-bucket-name"
keys = ["file1.txt", "file2.txt", "file3.txt"]
local_filenames = ["downloaded_file1.txt", "downloaded_file2.txt", "downloaded_file3.txt"]

with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = [executor.submit(download_file, bucket_name, key, local_filename) for key, local_filename in zip(keys, local_filenames)]

for future in concurrent.futures.as_completed(futures):
    print(f"Completed download: {future.result()}")

To upload files concurrently to an S3 bucket, you can use the same libraries as before: boto3 and concurrent.futures. Here's an example of how to upload files concurrently to an S3 bucket:

python
import boto3
import concurrent.futures

s3 = boto3.client("s3")

def upload_file(bucket, key, local_filename):
    print(f"Uploading {local_filename} to {bucket}/{key}")
    with open(local_filename, "rb") as file:
        s3.upload_fileobj(file, bucket, key)
    print(f"Uploaded {local_filename} to {bucket}/{key}")

bucket_name = "your-bucket-name"
keys = ["uploaded_file1.txt", "uploaded_file2.txt", "uploaded_file3.txt"]
local_filenames = ["file1.txt", "file2.txt", "file3.txt"]

with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = [executor.submit(upload_file, bucket_name, key, local_filename) for key, local_filename in zip(keys, local_filenames)]

for future in concurrent.futures.as_completed(futures):
    print(f"Completed upload: {future.result()}")

In both examples, we used ThreadPoolExecutor from the concurrent.futures module, as the tasks are I/O-bound. The boto3 library handles the actual S3 operations, while the concurrent.futures module manages the concurrency.

Performance Considerations and Comparison

While threads are lightweight and suitable for I/O-bound tasks, they suffer from the Global Interpreter Lock (GIL) limitation in CPython, which prevents multiple threads from executing Python bytecodes concurrently. Therefore, they may not provide optimal performance for CPU-bound tasks.

Multiprocessing overcomes the GIL limitation by using separate processes with their own memory space, making it suitable for CPU-bound tasks. However, it introduces additional overhead due to inter-process communication and data serialization.

concurrent.futures provides a high-level interface for managing threads and processes, abstracting away some of the complexities of working with the threading and multiprocessing modules.

When choosing a concurrency model, consider the nature of your tasks (I/O-bound or CPU-bound) and the trade-offs between thread/process overhead and potential performance improvements.

References

https://docs.python.org/3/library/threading.html
https://realpython.com/intro-to-python-threading/
https://www.pythontutorial.net/python-concurrency/python-threading/

Ryusei Kakujo

researchgatelinkedingithub

Focusing on data science for mobility

Bench Press 100kg!