Mastering Large Data Processing with mpi4py in Python
When a dataset is too big — or a computation too slow — for a single core, you split the work across
many processes. The Message Passing Interface (MPI) is the decades-old standard for coordinating
that, and mpi4py brings it to Python. This post shows how to distribute data and combine results
with a few lines of mpi4py.
The one mental model
MPI runs many copies of your program at once, each an independent process with a unique rank (0, 1, 2, …). They don’t share memory — they pass messages. The workhorses are collective operations that move data in a single call:
- scatter — cut a dataset into pieces, one per rank,
- broadcast — send the same data to every rank,
- gather — collect each rank’s result back to the root.
The pattern is almost always divide → compute locally → combine.
Understanding MPI and mpi4py
MPI is a standardized, portable message-passing system for parallel architectures — the backbone of computing clusters and supercomputers, letting processes cooperate by sending and receiving messages.
mpi4py brings those capabilities to Python. It follows the MPI standard closely — recent releases (mpi4py 4.x, 2024 onward) add support for the MPI-4 standard — and scales Python programs up to large, multi-node clusters.
Key MPI concepts
- Processes and ranks: MPI launches multiple processes, each running your program; each gets a unique rank.
- Communicators and groups: a communicator defines a group of processes that participate in an
MPI operation, providing the context for communication (the default is
MPI.COMM_WORLD).
Installing and running mpi4py
Installing mpi4py is straightforward with pip:
pip install mpi4py
To run an MPI-enabled Python script, use mpiexec:
mpiexec -n 4 python your_script.py
This runs your_script.py using 4 processes.
Implementing data processing with mpi4py
Parallel data processing means distributing data across processes, each doing part of the work:
- Distributing data:
Scattersends chunks of an array to different processes. - Collective communication: MPI supports broadcasting (one → all), scattering (divide among processes), and gathering (all → one).
You want each of 4 processes to work on a different quarter of an array. Which collective?
Example 1: analyzing large datasets in parallel using Scatter
Let’s process a large dataset to compute the average of numbers:
- Initialization:
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
- Distribute Data: Here, the root process (rank 0) scatters data to all processes:
if rank == 0:
data = [i+1 for i in range(size*5)] # Create a list of numbers
else:
data = None
data = comm.scatter(data, root=0)
- Compute Partial Averages: Each process computes the average of its chunk of data:
local_avg = sum(data) / len(data)
print(f"Rank {rank} has data {data} with local average {local_avg}")
- Gather Results: All local averages are gathered and the global average is computed:
all_avgs = comm.gather(local_avg, root=0)
if rank == 0:
global_avg = sum(all_avgs) / len(all_avgs)
print(f"Global average is {global_avg}")
The complete script is here:
from mpi4py import MPI
def main():
# Initialize MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
# Prepare data to be distributed
if rank == 0:
# Create an array of data, here simply numbers 1 to size*5
data = [i + 1 for i in range(size * 5)]
# Split data into chunks for each process
chunks = [data[i:i + 5] for i in range(0, len(data), 5)]
else:
chunks = None
# Scatter data chunks across processes
chunk = comm.scatter(chunks, root=0)
# Each process computes the average of its chunk
local_avg = sum(chunk) / len(chunk)
print(f"Rank {rank} calculated local average: {local_avg}")
# Gather all local averages back at the root process
all_avgs = comm.gather(local_avg, root=0)
# Compute the global average at the root process
if rank == 0:
global_avg = sum(all_avgs) / len(all_avgs)
print(f"Global average across all data: {global_avg}")
if __name__ == "__main__":
main()
I saved this as mpi_average.py.
- The script can be executed as:
$ mpiexec -n 4 python mpi_average.py
It returns:
Rank 1 calculated local average: 8.0
Rank 2 calculated local average: 13.0
Rank 3 calculated local average: 18.0
Rank 0 calculated local average: 3.0
Global average across all data: 10.5
Notice the ranks print out of order — they run concurrently, so whichever finishes first prints first. Only the root (rank 0) holds the gathered results and prints the global average.
Example 2: analyzing large datasets in parallel using Broadcast
We’ll set up a problem where the root process generates an array of random numbers, then broadcasts this array to all processes in the MPI world. Each process, including the root, will then compute the square of each element of the array and sum the results. This simple example illustrates how broadcast can be used for distributing identical initial data to all processes, which then carry out their computations.
import numpy as np
from mpi4py import MPI
def main():
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
# Prepare data to be broadcasted
if rank == 0:
# Root process creates an array of 10 random integers
data = np.random.randint(1, 10, size=10)
print(f"Root has data: {data}")
else:
data = None
# Broadcast the data from the root to all processes
data = comm.bcast(data, root=0)
# Each process computes the sum of squares of the received data
result = np.sum(data**2)
print(f"Rank {rank} computed result: {result}")
# Optionally, gather all results at the root and compute the total
all_results = comm.gather(result, root=0)
if rank == 0:
total = sum(all_results)
print(f"Total sum of squares across all ranks: {total}")
if __name__ == "__main__":
main()
- To execute:
$ mpiexec -n 4 python mpi_broadcast_example.py
Root has data: [7 8 8 7 5 9 3 9 2 9]
Rank 0 computed result: 507
Total sum of squares across all ranks: 2028
Rank 1 computed result: 507
Rank 2 computed result: 507
Rank 3 computed result: 507
Every rank prints the same result, 507. Why?
Advanced usage of mpi4py
- Non-blocking communications: standard MPI calls are blocking — the program waits for the
operation to finish before continuing. Non-blocking calls let a process start a communication and
keep computing, which can significantly improve performance when computation and communication
overlap. mpi4py supports this through
Isend,Irecv, andWait. - Custom datatypes: beyond Python’s built-in types, custom datatypes let you define how complex structures are transmitted between processes, preserving their integrity across the network — crucial for applications handling complex objects or large collections.
Challenges in parallel computing with mpi4py
- Load balancing: work must be distributed so no process becomes a bottleneck while others idle. Irregular or dynamic workloads that can’t be split evenly ahead of time make this hard, and poor balance kills scalability.
- Debugging: parallel bugs are harder — multiple concurrent processes can interact unpredictably. Watch for deadlocks (processes waiting on each other forever), race conditions (timing affects behavior), and data inconsistencies. Parallel debuggers, logging/tracing, and selectively serializing code help isolate them.
- Profiling and performance optimization: non-deterministic timing (network delays, async operations, system load) makes performance analysis tricky. MPI profilers and performance tools help measure MPI-call cost and understand behavior across systems.
Recap
Without scrolling up — can you name the pattern? With mpi4py you:
- launch many ranked processes that communicate by messages (no shared memory),
- scatter a dataset into per-rank chunks (or broadcast the same data to all),
- compute locally on each rank,
- gather the partial results back to the root — the classic divide → compute → combine.
Master those collectives and you can push Python from one core to a whole cluster.
Where to go next
- mpi4py documentation — the full API, including
Isend/Irecvand custom datatypes. - Related post here: Using mpi4py for Parallel Computing in Python on Supercomputers.
Disclaimer of liability
The information provided by the Earth Inversion is made available for educational purposes only.
Whilst we endeavor to keep the information up-to-date and correct. Earth Inversion makes no representations or warranties of any kind, express or implied about the completeness, accuracy, reliability, suitability or availability with respect to the website or the information, products, services or related graphics content on the website for any purpose.
UNDER NO CIRCUMSTANCE SHALL WE HAVE ANY LIABILITY TO YOU FOR ANY LOSS OR DAMAGE OF ANY KIND INCURRED AS A RESULT OF THE USE OF THE SITE OR RELIANCE ON ANY INFORMATION PROVIDED ON THE SITE. ANY RELIANCE YOU PLACED ON SUCH MATERIAL IS THEREFORE STRICTLY AT YOUR OWN RISK.
Leave a comment