# This file is part of pyunicorn.
# Copyright (C) 2008--2024 Jonathan F. Donges and pyunicorn authors
# URL: <https://www.pik-potsdam.de/members/donges/software-2/software>
# License: BSD (3-clause)
#
# Please acknowledge and cite the use of this software and its authors
# when results are used in publications or published elsewhere.
#
# You can use the following reference:
# J.F. Donges, J. Heitzig, B. Beronov, M. Wiedermann, J. Runge, Q.-Y. Feng,
# L. Tupikina, V. Stolbova, R.V. Donner, N. Marwan, H.A. Dijkstra,
# and J. Kurths, "Unified functional network and nonlinear time series analysis
# for complex systems science: The pyunicorn package"
"""
Module for parallelization using mpi4py.
Allows for easy parallelization in master/slaves mode with one master
submitting function or method calls to slaves.
Uses mpi4py if available, otherwise processes calls sequentially in one
process.
Examples:
=========
Save the following lines in ``demo_mpi.py`` and run::
> mpirun -n 10 python demo_mpi.py
1. Use master/slaves parallelization with the Network class:
.. literalinclude:: ../../../../examples/modules/mpi/network_large.py
2. Do a Monte Carlo simulation as master/slaves:
.. literalinclude:: ../../../../examples/modules/mpi/network_mc.py
3. Do a parameter scan without communication with a master, and just save
the results in files:
.. literalinclude:: ../../../../examples/modules/mpi/network_scan_no_comm.py
"""
#
# Imports
#
import sys
import time
import traceback
import numpy
# try to get the communicator object to see whether mpi is available:
try:
from mpi4py import MPI
comm = MPI.COMM_WORLD
"""(mpi4py.MPI.Comm instance) MPI communicator."""
available = True
"""(bool) indicates that slaves are available."""
except ImportError:
available = False
[docs]
class MPIException(Exception):
[docs]
def __init__(self, value):
Exception.__init__()
self.value = value
[docs]
def __str__(self):
return repr(self.value)
# initialize:
if available:
size = comm.size
"""(int) number of MPI nodes (master and slaves)."""
rank = comm.rank
"""(int) rank of this MPI node (0 is the master)."""
am_master = (rank == 0)
"""(bool) indicates that this MPI node is the master."""
if size < 2:
available = False
else:
size = 1
rank = 0
am_master = True
am_slave = not am_master
"""(bool) indicates that this MPI node is a slave."""
n_slaves = size - 1
"""(int) no. of slaves available."""
start_time = time.time()
"""(float) starting time of this MPI node."""
stats = []
"""
(list of dictionaries)
stats[id] contains processing statistics for the last call with this id. Keys:
- "id": id of the call
- "rank": MPI node who processed the call
- "this_time": wall time for processing the call
- "time_over_est": quotient of actual over estimated wall time
- "n_processed": no. of calls processed so far by that slave, including this
- "total_time": total wall time until this call was finished
"""
# initialization on master with slaves:
if am_master:
total_time_est = numpy.zeros(size)
"""
(numpy array of ints)
total_time_est[i] is the current estimate of the total time
MPI slave i will work on already submitted calls.
On slave i, only total_time_est[i] is available.
"""
total_time_est[0] = numpy.inf
queue = []
"""(list) ids of submitted calls"""
assigned = {}
"""
(dictionary)
assigned[id] is the slave assigned to the call with that id.
"""
slave_queue = [[] for i in range(0, size)]
"""
(list of lists)
slave_queue[i] contains the ids of calls assigned to slave i.
"""
n_processed = numpy.zeros(size).astype("int")
"""
(list of ints)
n_processed[rank] is the total number of calls processed by MPI node rank.
On slave i, only total_time[i] is available.
"""
total_time = numpy.zeros(size)
"""
(list of floats)
total_time[rank] is the total wall time until that node finished its last
call. On slave i, only total_time[i] is available.
"""
if not available:
# dictionary for results:
results = {}
"""
(dictionary)
if mpi is not available, the result of submit_call(..., id=a) will be
cached in results[a] until get_result(a).
"""
[docs]
def submit_call(name_to_call, args=(), kwargs={},
module="__main__", time_est=1, id=None, slave=None):
"""
Submit a call for parallel execution.
If called by the master and slaves are available, the call is submitted
to a slave for asynchronous execution.
If called by a slave or if no slaves are available, the call is instead
executed synchronously on this MPI node.
**Examples:**
1. Provide ids and time estimate explicitly:
.. code-block:: python
for n in range(0,10):
mpi.submit_call("doit", (n,A[n]), id=n, time_est=n**2)
for n in range(0,10):
result[n] = mpi.get_result(n)
2. Use generated ids stored in a list:
.. code-block:: python
for n in range(0,10):
ids.append(mpi.submit_call("doit", (n,A[n])))
for n in range(0,10):
results.append(mpi.get_result(ids.pop()))
3. Ignore ids altogether:
.. code-block:: python
for n in range(0,10):
mpi.submit_call("doit", (n,A[n]))
for n in range(0,10):
results.append(mpi.get_next_result())
4. Call a module function and use keyword arguments:
.. code-block:: python
mpi.submit_call("solve", (), {"a":a, "b":b},
module="numpy.linalg")
5. Call a static class method from a package:
.. code-block:: python
mpi.submit_call("Network._get_histogram", (values, n_bins),
module="pyunicorn")
Note that it is module="pyunicorn" and not
module="pyunicorn.network" here.
:arg str name_to_call: name of callable object (usually a function or
static method of a class) as contained in the namespace specified
by module.
:arg tuple args: the positional arguments to provide to the callable
object. Tuples of length 1 must be written (arg,). Default: ()
:arg dict kwargs: the keyword arguments to provide to the callable
object. Default: {}
:arg str module: optional name of the imported module or submodule in
whose namespace the callable object is contained. For objects
defined on the script level, this is "__main__", for objects
defined in an imported package, this is the package name. Must be a
key of the dictionary sys.modules (check there after import if in
doubt). Default: "__main__"
:arg float time_est: estimated relative completion time for this call;
used to find a suitable slave. Default: 1
:type id: object or None
:arg id: unique id for this call. Must be a possible dictionary key.
If None, a random id is assigned and returned. Can be re-used after
get_result() for this is. Default: None
:type slave: int > 0 and < mpi.size, or None
:arg slave: optional no. of slave to assign the call to. If None, the
call is assigned to the slave with the smallest current total time
estimate. Default: None
:return object: id of call, to be used in get_result().
"""
if id is None:
id = numpy.random.uniform()
if id in assigned:
raise MPIException("id ", str(id), " already in queue!")
if slave is not None and am_slave:
raise MPIException(
"only the master can use slave= in submit_call()")
if slave is None or slave < 1 or slave >= size:
# find slave with least estimated total time:
slave = numpy.argmin(total_time_est)
if available:
# send name to call, args, time_est to slave:
if _verbose:
print(f"MPI master : assigning call with id {id} to slave "
f"{slave}: {name_to_call} {args} {kwargs} ...")
comm.send((name_to_call, args, kwargs, module, time_est),
dest=slave)
else:
# do it myself right now:
slave = 0
if _verbose:
print(f"MPI master : calling {name_to_call} {args} {kwargs} "
"...")
try:
object_to_call = eval(name_to_call,
sys.modules[module].__dict__)
except NameError:
sys.stderr.write(str(sys.modules[module].__dict__.keys()))
raise
call_time = time.time()
results[id] = object_to_call(*args, **kwargs)
this_time = time.time() - call_time
n_processed[0] += 1
total_time[0] = time.time() - start_time
stats.append({"id": id, "rank": 0,
"this_time": this_time,
"time_over_est": this_time / time_est,
"n_processed": n_processed[0],
"total_time": total_time[0]})
total_time_est[slave] += time_est
queue.append(id)
slave_queue[slave].append(id)
assigned[id] = slave
return id
[docs]
def get_result(id):
"""
Return result of earlier submitted call.
Can only be called by the master.
If the call is not yet finished, waits for it to finish.
Results should be collected in the same order as calls were submitted.
For each slave, the results of calls assigned to that slave must be
collected in the same order as those calls were submitted.
Can only be called once per call.
:type id: object
:arg id: id of an earlier submitted call, as provided to or returned
by submit_call().
:rtype: object
:return: return value of call.
"""
source = assigned[id]
if available:
if slave_queue[source][0] != id:
raise MPIException("get_result(" + str(id)
+ ") called before get_result("
+ str(slave_queue[source][0]) + ")!")
if _verbose:
print(f"MPI master : retrieving result for call with id {id} "
f"from slave {source} ...")
(result, this_stats) = comm.recv(source=source)
stats.append(this_stats)
n_processed[source] = this_stats["n_processed"]
total_time[source] = this_stats["total_time"]
else:
if _verbose:
print(f"MPI master : returning result for call with id {id} "
"...")
result = results[id]
# TODO: rather return a copy and del the original?
queue.remove(id)
slave_queue[source].remove(id)
assigned.pop(id)
return result
[docs]
def get_next_result():
"""
Return result of next earlier submitted call whose result has not yet
been got.
Can only be called by the master.
If the call is not yet finished, waits for it to finish.
:rtype: object
:return: return value of call, or None of there are no more calls in
the queue.
"""
if len(queue) > 0:
id = queue[0]
return get_result(id)
else:
return None
[docs]
def info():
"""
Print processing statistics.
Can only be called by the master.
"""
call_times = numpy.array([s["this_time"] for s in stats])
call_quotients = numpy.array([s["time_over_est"] for s in stats])
if available:
slave_quotients = total_time/total_time_est
print("\n"
"MPI: processing statistics\n"
" =====================\n"
" results collected: "
f"{n_processed[1:].sum()}\n"
" results not yet collected: "
f"{len(queue)}\n"
" total reported time: "
f"{call_times.sum()}\n"
" mean time per call: "
f"{call_times.mean()}\n"
" std.dev. of time per call: "
f"{call_times.std()}\n"
" coeff. of var. of actual over estd. time per call: "
f"{call_quotients.std()/call_quotients.mean()}\n"
" slaves: "
f"{n_slaves}\n"
" mean calls per slave: "
f"{n_processed[1:].mean()}\n"
" std.dev. of calls per slave: "
f"{n_processed[1:].std()}\n"
" min calls per slave: "
f"{n_processed[1:].min()}\n"
" max calls per slave: "
f"{n_processed[1:].max()}\n"
" mean time per slave: "
f"{total_time.mean()}\n"
" std.dev. of time per slave: "
f"{total_time.std()}\n"
" coeff. of var. of actual over estd. time per slave: "
f"{slave_quotients.std()/slave_quotients.mean()}\n")
else:
print("\n"
"MPI: processing statistics\n"
" =====================\n"
" results collected: "
f"{n_processed[0]}\n"
" results not yet collected: "
f"{len(queue)}\n"
" total reported time: "
f"{call_times.sum()}\n"
" mean time per call: "
f"{call_times.mean()}\n"
" std.dev. of time per call: "
f"{call_times.std()}\n"
" coeff. of var. of actual over estd. time per call: "
f"{call_quotients.std()/call_quotients.mean()}\n")
[docs]
def terminate():
"""
Tell all slaves to terminate.
Can only be called by the master.
"""
global available
if available:
# tell slaves to terminate:
for slave in range(1, size):
if _verbose:
print(f"MPI master : telling slave {slave} "
"to terminate...")
comm.send(("terminate", (), {}, "", 0), dest=slave)
available = False
def abort():
"""
Abort execution on all MPI nodes immediately.
Can be called by master and slaves.
"""
traceback.print_exc()
if _verbose:
print("MPI master : aborting...")
comm.Abort()
else: # am_slave and available:
total_time_est = numpy.zeros(size)*numpy.nan
total_time_est[rank] = 0
n_processed = numpy.zeros(size)*numpy.nan
n_processed[rank] = 0
total_time = numpy.zeros(size)*numpy.nan
total_time[rank] = 0
def serve():
"""
Serve submitted calls until told to finish.
Can only be called by slaves.
Call this function from inside your definition of slave() if slaves
need to perform initializations different from the master, like this:
>>> def slave():
>>> do = whatever + initialization - is * necessary
>>> mpi.serve()
>>> do = whatever + cleanup - is * necessary
If you don't define slave(), serve() will be called automatically by
mpi.run().
"""
if _verbose:
print("MPI slave ", rank, ": waiting for calls.")
# wait for orders:
while True:
# get next task from queue:
(name_to_call, args, kwargs, module, time_est) = \
comm.recv(source=0)
# TODO: add some timeout and check whether master lives!
if name_to_call == "terminate":
if _verbose:
print("MPI slave", rank, ": terminating...")
break
if _verbose:
print(f"MPI slave {rank}: calling {name_to_call} {args} ...")
try:
object_to_call = eval(name_to_call,
sys.modules[module].__dict__)
except NameError:
sys.stderr.write(str(sys.modules[module].__dict__.keys()))
raise
total_time_est[rank] += time_est
call_time = time.time()
result = object_to_call(*args, **kwargs)
this_time = time.time() - call_time
n_processed[rank] += 1
stats.append({"id": id, "rank": rank,
"this_time": this_time,
"time_over_est": this_time / time_est,
"n_processed": n_processed[rank],
"total_time": time.time() - start_time})
if _verbose:
print("MPI slave", rank, ": sending result...")
comm.send((result, stats[-1]), dest=0)
[docs]
def abort():
traceback.print_exc()
if _verbose:
print("MPI slave", rank, ": aborting...")
comm.Abort()
# TODO: update total_time_est at return time
_verbose = False
[docs]
def run(verbose=False):
"""
Run in master/slaves mode until master() finishes.
Must be called on all MPI nodes after function master() was defined.
On the master, run() calls master() and returns when master() returns.
On each slave, run() calls slave() if that is defined, or calls serve()
otherwise, and returns when slave() returns, or when master() returns on
the master, or when master calls terminate().
:arg bool verbose: whether processing information should be printed.
"""
# transfer verbose into global environment:
global _verbose
_verbose = verbose
"""
(bool) indicated whether processing information should be printed.
"""
_globals = sys.modules['__main__'].__dict__
if available: # run in mpi mode
if am_master: # I'm master
if verbose:
print("MPI master : started, using", size-1, "slaves.")
try: # put everything in a try block to be able to terminate!
if "master" in _globals:
_globals["master"]()
else:
print("MPI master : function master() not found!")
except ValueError:
abort()
terminate()
if verbose:
print("MPI master : finished.")
else: # I'm slave
if "slave" in _globals:
_globals["slave"]()
else:
serve()
else: # run as single processor
if verbose:
print("MPI master : not available, running as a single process.")
if "master" in _globals:
_globals["master"]()
else:
print("MPI master : function master() not found!")
if verbose:
print("MPI master : finished.")