MPIRE(1) | mpire | MPIRE(1) |
NAME¶
mpire - mpire Documentation
MPIRE, short for MultiProcessing Is Really Easy, is a Python package for multiprocessing. MPIRE is faster in most scenarios, packs more features, and is generally more user-friendly than the default multiprocessing package. It combines the convenient map like functions of multiprocessing.Pool with the benefits of using copy-on-write shared objects of multiprocessing.Process, together with easy-to-use worker state, worker insights, worker init and exit functions, timeouts, and progress bar functionality.
FEATURES¶
- Faster execution than other multiprocessing libraries. See benchmarks.
- Intuitive, Pythonic syntax
- Multiprocessing with map/map_unordered/imap/imap_unordered/apply/apply_async functions
- Easy use of copy-on-write shared objects with a pool of workers (copy-on-write is only available for start method fork, so it’s not supported on Windows)
- Each worker can have its own state and with convenient worker init and exit functionality this state can be easily manipulated (e.g., to load a memory-intensive model only once for each worker without the need of sending it through a queue)
- Progress bar support using tqdm (rich and notebook widgets are supported)
- Progress dashboard support
- Worker insights to provide insight into your multiprocessing efficiency
- Graceful and user-friendly exception handling
- Timeouts, including for worker init and exit functions
- Automatic task chunking for all available map functions to speed up processing of small task queues (including numpy arrays)
- Adjustable maximum number of active tasks to avoid memory problems
- Automatic restarting of workers after a specified number of tasks to reduce memory footprint
- Nested pool of workers are allowed when setting the daemon option
- Child processes can be pinned to specific or a range of CPUs
- Optionally utilizes dill as serialization backend through multiprocess, enabling parallelizing more exotic objects, lambdas, and functions in iPython and Jupyter notebooks.
MPIRE has been tested on Linux, macOS, and Windows. There are a few minor known caveats for Windows and macOS users, which can be found at Windows.
CONTENTS¶
Installation¶
MPIRE builds are distributed through PyPi.
MPIRE can be installed through pip:
pip install mpire
and is available through conda-forge:
conda install -c conda-forge mpire
Dependencies¶
- •
- Python >= 3.8
Python packages (installed automatically when installing MPIRE):
- tqdm
- pygments
- pywin32 (Windows only)
- importlib_resources (Python < 3.9 only)
NOTE:
Dill¶
For some functions or tasks it can be useful to not rely on pickle, but on some more powerful serialization backend, like dill. dill isn’t installed by default as it has a BSD license, while MPIRE has an MIT license. If you want to use it, the license of MPIRE will change to a BSD license as well, as required by the original BSD license. See the BSD license of multiprocess for more information.
You can enable dill by executing:
pip install mpire[dill]
This will install multiprocess, which uses dill under the hood. You can enable the use of dill by setting use_dill=True in the mpire.WorkerPool constructor.
Rich progress bars¶
If you want to use rich progress bars, you have to install the dependencies for it manually:
pip install rich
Dashboard¶
Optionally, you can install the dependencies for the MPIRE dashboard, which depends on Flask. Similarly as with dill, Flask has a BSD-license. Installing these dependencies will change the license of MPIRE to BSD as well. See the BSD license of Flask for more information.
The dashboard allows you to see progress information from a browser. This is convenient when running scripts in a notebook or screen, or want to share the progress information with others. Install the appropriate dependencies to enable this:
pip install mpire[dashboard]
Getting started¶
Suppose you have a time consuming function that receives some input and returns its results. This could look like the following:
import time def time_consuming_function(x):
time.sleep(1) # Simulate that this function takes long to complete
return ... results = [time_consuming_function(x) for x in range(10)]
Running this function takes about 10 seconds to complete.
Functions like these are known as embarrassingly parallel problems, functions that require little to no effort to turn into a parallel task. Parallelizing a simple function as this can be as easy as importing multiprocessing and using the multiprocessing.Pool class:
from multiprocessing import Pool with Pool(processes=5) as pool:
results = pool.map(time_consuming_function, range(10))
We configured to have 5 workers, so we can handle 5 tasks in parallel. As a result, this function will complete in about 2 seconds.
MPIRE can be used almost as a drop-in replacement to multiprocessing. We use the mpire.WorkerPool class and call one of the available map functions:
from mpire import WorkerPool with WorkerPool(n_jobs=5) as pool:
results = pool.map(time_consuming_function, range(10))
Similarly, this will complete in about 2 seconds. The differences in code are small: there’s no need to learn a completely new multiprocessing syntax, if you’re used to vanilla multiprocessing. The additional available functionality, though, is what sets MPIRE apart.
Progress bar¶
Suppose we want to know the status of the current task: how many tasks are completed, how long before the work is ready? It’s as simple as setting the progress_bar parameter to True:
with WorkerPool(n_jobs=5) as pool:
results = pool.map(time_consuming_function, range(10), progress_bar=True)
And it will output a nicely formatted tqdm progress bar.
MPIRE also offers a dashboard, for which you need to install additional dependencies. See Dashboard for more information.
Shared objects¶
If you have one or more objects that you want to share between all workers you can make use of the copy-on-write shared_objects option of MPIRE. MPIRE will pass on these objects only once for each worker without copying/serialization. Only when the object is altered in the worker function it will start copying it for that worker.
NOTE:
def time_consuming_function(some_object, x):
time.sleep(1) # Simulate that this function takes long to complete
return ... def main():
some_object = ...
with WorkerPool(n_jobs=5, shared_objects=some_object, start_method='fork') as pool:
results = pool.map(time_consuming_function, range(10), progress_bar=True)
See Shared objects for more details.
Worker initialization¶
Need to initialize each worker before starting the work? Have a look at the worker_state and worker_init functionality:
def init(worker_state):
# Load a big dataset or model and store it in a worker specific worker_state
worker_state['dataset'] = ...
worker_state['model'] = ... def task(worker_state, idx):
# Let the model predict a specific instance of the dataset
return worker_state['model'].predict(worker_state['dataset'][idx]) with WorkerPool(n_jobs=5, use_worker_state=True) as pool:
results = pool.map(task, range(10), worker_init=init)
Similarly, you can use the worker_exit parameter to let MPIRE call a function whenever a worker terminates. You can even let this exit function return results, which can be obtained later on. See the Worker init and exit section for more information.
Worker insights¶
When your multiprocessing setup isn’t performing as you want it to and you have no clue what’s causing it, there’s the worker insights functionality. This will give you some insight in your setup, but it will not profile the function you’re running (there are other libraries for that). Instead, it profiles the worker start up time, waiting time and working time. When worker init and exit functions are provided it will time those as well.
Perhaps you’re sending a lot of data over the task queue, which makes the waiting time go up. Whatever the case, you can enable and grab the insights using the enable_insights flag and mpire.WorkerPool.get_insights() function, respectively:
with WorkerPool(n_jobs=5, enable_insights=True) as pool:
results = pool.map(time_consuming_function, range(10))
insights = pool.get_insights()
See Worker insights for a more detailed example and expected output.
Usage¶
WorkerPool¶
This section describes how to setup a mpire.WorkerPool instance.
Starting a WorkerPool¶
Contents¶
- •
- Nested WorkerPools
The mpire.WorkerPool class controls a pool of worker processes similarly to a multiprocessing.Pool. It contains all the map like functions (with the addition of mpire.WorkerPool.map_unordered()), together with the apply and apply_async functions (see Apply family).
An mpire.WorkerPool can be started in two different ways. The first and recommended way to do so is using a context manager:
from mpire import WorkerPool # Start a pool of 4 workers with WorkerPool(n_jobs=4) as pool:
# Do some processing here
pass
The with statement takes care of properly joining/terminating the spawned worker processes after the block has ended.
The other way is to do it manually:
# Start a pool of 4 workers pool = WorkerPool(n_jobs=4) # Do some processing here pass # Only needed when keep_alive=True: # Clean up pool (this will block until all processing has completed) pool.stop_and_join() # or use pool.join() which is an alias of stop_and_join() # In the case you want to kill the processes, even though they are still busy pool.terminate()
When using n_jobs=None MPIRE will spawn as many processes as there are CPUs on your system. Specifying more jobs than you have CPUs is, of course, possible as well.
WARNING:
Nested WorkerPools¶
By default, the mpire.WorkerPool class spawns daemon child processes who are not able to create child processes themselves, so nested pools are not allowed. There’s an option to create non-daemon child processes to allow for nested structures:
def job(...)
with WorkerPool(n_jobs=4) as p:
# Do some work
results = p.map(...) with WorkerPool(n_jobs=4, daemon=True, start_method='spawn') as pool:
# This will raise an AssertionError telling you daemon processes
# can't start child processes
pool.map(job, ...) with WorkerPool(n_jobs=4, daemon=False, start_method='spawn') as pool:
# This will work just fine
pool.map(job, ...)
NOTE:
WARNING:
NOTE:
WARNING:
When a function is guaranteed to finish successfully, using nested pools is absolutely fine.
Process start method¶
Contents¶
- •
- Spawn and forkserver
The multiprocessing package allows you to start processes using a few different methods: 'fork', 'spawn' or 'forkserver'. Threading is also available by using 'threading'. For detailed information on the multiprocessing contexts, please refer to the multiprocessing documentation and caveats section. In short:
- fork
- Copies the parent process such that the child process is effectively identical. This includes copying everything currently in memory. This is sometimes useful, but other times useless or even a serious bottleneck. fork enables the use of copy-on-write shared objects (see Shared objects).
- spawn
- Starts a fresh python interpreter where only those resources necessary are inherited.
- forkserver
- First starts a server process (using 'spawn'). Whenever a new process is needed the parent process requests the server to fork a new process.
- threading
- Starts child threads. Suffers from the Global Interpreter Lock (GIL), but works fine for I/O intensive tasks.
For an overview of start method availability and defaults, please refer to the following table:
Start method | Available on Unix | Available on Windows |
fork | Yes (default) | No |
spawn | Yes | Yes (default) |
forkserver | Yes | No |
threading | Yes | Yes |
Spawn and forkserver¶
When using spawn or forkserver as start method, be aware that global variables (constants are fine) might have a different value than you might expect. You also have to import packages within the called function:
import os def failing_job(folder, filename):
return os.path.join(folder, filename) # This will fail because 'os' is not copied to the child processes with WorkerPool(n_jobs=2, start_method='spawn') as pool:
pool.map(failing_job, [('folder', '0.p3'), ('folder', '1.p3')])
def working_job(folder, filename):
import os
return os.path.join(folder, filename) # This will work with WorkerPool(n_jobs=2, start_method='spawn') as pool:
pool.map(working_job, [('folder', '0.p3'), ('folder', '1.p3')])
A lot of effort has been put into making the progress bar, dashboard, and nested pools (with multiple progress bars) work well with spawn and forkserver. So, everything should work fine.
CPU pinning¶
You can pin the child processes of mpire.WorkerPool to specific CPUs by using the cpu_ids parameter in the constructor:
# Pin the two child processes to CPUs 2 and 3 with WorkerPool(n_jobs=2, cpu_ids=[2, 3]) as pool:
... # Pin the child processes to CPUs 40-59 with WorkerPool(n_jobs=20, cpu_ids=list(range(40, 60))) as pool:
... # All child processes have to share a single core: with WorkerPool(n_jobs=4, cpu_ids=[0]) as pool:
... # All child processes have to share multiple cores, namely 4-7: with WorkerPool(n_jobs=4, cpu_ids=[[4, 5, 6, 7]]) as pool:
... # Each child process can use two distinctive cores: with WorkerPool(n_jobs=4, cpu_ids=[[0, 1], [2, 3], [4, 5], [6, 7]]) as pool:
...
CPU IDs have to be positive integers, not exceeding the number of CPUs available (which can be retrieved by using mpire.cpu_count()). Use None to disable CPU pinning (which is the default).
NOTE:
Accessing the worker ID¶
Contents¶
- •
- Elaborate example
Each worker in MPIRE is given an integer ID to distinguish them. Worker #1 will have ID 0, #2 will have ID 1, etc. Sometimes it can be useful to have access to this ID.
By default, the worker ID is not passed on. You can enable/disable this by setting the pass_worker_id flag:
def task(worker_id, x):
pass with WorkerPool(n_jobs=4, pass_worker_id=True) as pool:
pool.map(task, range(10))
IMPORTANT:
Instead of passing the flag to the mpire.WorkerPool constructor you can also make use of mpire.WorkerPool.pass_on_worker_id():
with WorkerPool(n_jobs=4) as pool:
pool.pass_on_worker_id()
pool.map(task, range(10))
Elaborate example¶
Here’s a more elaborate example of using the worker ID together with a shared array, where each worker can only access the element corresponding to its worker ID, making the use of locking unnecessary:
def square_sum(worker_id, shared_objects, x):
# Even though the shared objects is a single container, we 'unpack' it anyway
results_container = shared_objects
# Square and sum
results_container[worker_id] += x * x # Use a shared array of size equal to the number of jobs to store the results results_container = Array('f', 4, lock=False) with WorkerPool(n_jobs=4, shared_objects=results_container, pass_worker_id=True) as pool:
# Square the results and store them in the results container
pool.map_unordered(square_sum, range(100))
Shared objects¶
Contents¶
- •
- Copy-on-write alternatives
MPIRE allows you to provide shared objects to the workers in a similar way as is possible with the multiprocessing.Process class. For the start method fork these shared objects are treated as copy-on-write, which means they are only copied once changes are made to them. Otherwise they share the same memory address. This is convenient if you want to let workers access a large dataset that wouldn’t fit in memory when copied multiple times.
NOTE:
For threading these shared objects are readable and writable without copies being made. For the start methods spawn and forkserver the shared objects are copied once for each worker, in contrast to copying it for each task which is done when using a regular multiprocessing.Pool.
def task(dataset, x):
# Do something with this copy-on-write dataset
... def main():
dataset = ... # Load big dataset
with WorkerPool(n_jobs=4, shared_objects=dataset, start_method='fork') as pool:
... = pool.map(task, range(100))
Multiple objects can be provided by placing them, for example, in a tuple container.
Apart from sharing regular Python objects between workers, you can also share multiprocessing synchronization primitives such as multiprocessing.Lock using this method. Objects like these require to be shared through inheritance, which is exactly how shared objects in MPIRE are passed on.
IMPORTANT:
Instead of passing the shared objects to the mpire.WorkerPool constructor you can also use the mpire.WorkerPool.set_shared_objects() function:
def main():
dataset = ... # Load big dataset
with WorkerPool(n_jobs=4, start_method='fork') as pool:
pool.set_shared_objects(dataset)
... = pool.map(task, range(100))
Shared objects have to be specified before the workers are started. Workers are started once the first map call is executed. When keep_alive=True and the workers are reused, changing the shared objects between two consecutive map calls won’t work.
Copy-on-write alternatives¶
When copy-on-write is not available for you, you can also use shared objects to share a multiprocessing.Array, multiprocessing.Value, or another object with multiprocessing.Manager. You can then store results in the same object from multiple processes. However, you should keep the amount of synchronization to a minimum when the resources are protected with a lock, or disable locking if your situation allows it as is shown here:
from multiprocessing import Array def square_add_and_modulo_with_index(shared_objects, idx, x):
# Unpack results containers
square_results_container, add_results_container = shared_objects
# Square, add and modulo
square_results_container[idx] = x * x
add_results_container[idx] = x + x
return x % 2 def main():
# Use a shared array of size 100 and type float to store the results
square_results_container = Array('f', 100, lock=False)
add_results_container = Array('f', 100, lock=False)
shared_objects = square_results_container, add_results_container
with WorkerPool(n_jobs=4, shared_objects=shared_objects) as pool:
# Square, add and modulo the results and store them in the results containers
modulo_results = pool.map(square_add_and_modulo_with_index,
enumerate(range(100)), iterable_len=100)
In the example above we create two results containers, one for squaring and for adding the given value, and disable locking for both. Additionally, we also return a value, even though we use shared objects for storing results. We can safely disable locking here as each task writes to a different index in the array, so no race conditions can occur. Disabling locking is, of course, a lot faster than having it enabled.
Worker state¶
Contents¶
- Combining worker state with worker_init and worker_exit
- Combining worker state with keep_alive
If you want to let each worker have its own state you can use the use_worker_state flag:
def task(worker_state, x):
if "local_sum" not in worker_state:
worker_state["local_sum"] = 0
worker_state["local_sum"] += x with WorkerPool(n_jobs=4, use_worker_state=True) as pool:
results = pool.map(task, range(100))
IMPORTANT:
Instead of passing the flag to the mpire.WorkerPool constructor you can also make use of mpire.WorkerPool.set_use_worker_state():
with WorkerPool(n_jobs=4) as pool:
pool.set_use_worker_state()
pool.map(task, range(100))
Combining worker state with worker_init and worker_exit¶
The worker state can be combined with the worker_init and worker_exit parameters of each map function, leading to some really useful capabilities:
import numpy as np import pickle def load_big_model(worker_state):
# Load a model which takes up a lot of memory
with open('./a_really_big_model.p3', 'rb') as f:
worker_state['model'] = pickle.load(f) def model_predict(worker_state, x):
# Predict
return worker_state['model'].predict(x) with WorkerPool(n_jobs=4, use_worker_state=True) as pool:
# Let the model predict
data = np.array([[...]])
results = pool.map(model_predict, data, worker_init=load_big_model)
More information about the worker_init and worker_exit parameters can be found at Worker init and exit.
Combining worker state with keep_alive¶
By default, workers are restarted each time a map function is executed. As described in Keep alive this can be circumvented by using keep_alive=True. This also ensures worker state is kept across consecutive map calls:
with WorkerPool(n_jobs=4, use_worker_state=True, keep_alive=True) as pool:
# Let the model predict
data = np.array([[...]])
results = pool.map(model_predict, data, worker_init=load_big_model)
# Predict some more
more_data = np.array([[...]])
more_results = pool.map(model_predict, more_data)
In this example we don’t need to supply the worker_init function to the second map call, as the workers will be reused. When worker_lifespan is set, though, this rule doesn’t apply.
Keep alive¶
Contents¶
- •
- Caveats
By default, workers are restarted on each map call. This is done to clean up resources as quickly as possible when the work is done.
Workers can be kept alive in between consecutive map calls using the keep_alive flag. This is useful when your workers have a long startup time and you need to call one of the map functions multiple times.
def foo(x):
pass with WorkerPool(n_jobs=4, keep_alive=True) as pool:
pool.map(task, range(100))
pool.map(task, range(100)) # Workers are reused here
Instead of passing the flag to the mpire.WorkerPool constructor you can also make use of mpire.WorkerPool.set_keep_alive():
with WorkerPool(n_jobs=4) as pool:
pool.map(task, range(100))
pool.map(task, range(100)) # Workers are restarted
pool.set_keep_alive()
pool.map(task, range(100)) # Workers are reused here
Caveats¶
Changing some WorkerPool init parameters do require a restart. These include pass_worker_id, shared_objects, and use_worker_state.
Keeping workers alive works even when the function to be called or any other parameter passed on to the map function changes.
However, when you’re changing either the worker_init and/or worker_exit function while keep_alive is enabled, you need to be aware this can have undesired side-effects. worker_init functions are only executed when a worker is started and worker_exit functions when a worker is terminated. When keep_alive is enabled, workers aren’t restarted in between consecutive map calls, so those functions are not called.
def init_func_1(): pass def exit_func_1(): pass def init_func_2(): pass def init_func_2(): pass with WorkerPool(n_jobs=4, keep_alive=True) as pool:
pool.map(task, range(100), worker_init=init_func_1, worker_exit=exit_func_1)
pool.map(task, range(100), worker_init=init_func_2, worker_exit=exit_func_2)
In the above example init_func_1 is called for each worker when the workers are started. After the first map call exit_func_1 is not called because workers are kept alive. During the second map call init_func_2 isn’t called as well, because the workers are still alive. When exiting the context manager the workers are shut down and exit_func_2 is called.
It gets even trickier when you also enable worker_lifespan. In this scenario during the first map call a worker could’ve reached its maximum lifespan and is forced to restart, while others haven’t. The exit function of the worker to be restarted is called (i.e., exit_func_1). When calling map for the second time and the exit function is changed, the other workers will execute the new exit function when they need to be restarted (i.e., exit_func_2).
Worker insights¶
Worker insights gives you insight in your multiprocessing efficiency by tracking worker start up time, waiting time and time spend on executing tasks. Tracking is disabled by default, but can be enabled by setting enable_insights:
with WorkerPool(n_jobs=4, enable_insights=True) as pool:
pool.map(task, range(100))
The overhead is very minimal and you shouldn’t really notice it, even on very small tasks. You can view the tracking results using mpire.WorkerPool.get_insights() or use mpire.WorkerPool.print_insights() to directly print the insights to console:
import time def sleep_and_square(x):
# For illustration purposes
time.sleep(x / 1000)
return x * x with WorkerPool(n_jobs=4, enable_insights=True) as pool:
pool.map(sleep_and_square, range(100))
insights = pool.get_insights()
print(insights) # Output: {'n_completed_tasks': [28, 24, 24, 24],
'total_start_up_time': '0:00:00.038',
'total_init_time': '0:00:00',
'total_waiting_time': '0:00:00.798',
'total_working_time': '0:00:04.980',
'total_exit_time': '0:00:00',
'total_time': '0:00:05.816',
'start_up_time': ['0:00:00.010', '0:00:00.008', '0:00:00.008', '0:00:00.011'],
'start_up_time_mean': '0:00:00.009',
'start_up_time_std': '0:00:00.001',
'start_up_ratio': 0.006610452621805033,
'init_time': ['0:00:00', '0:00:00', '0:00:00', '0:00:00'],
'init_time_mean': '0:00:00',
'init_time_std': '0:00:00',
'init_ratio': 0.0,
'waiting_time': ['0:00:00.309', '0:00:00.311', '0:00:00.165', '0:00:00.012'],
'waiting_time_mean': '0:00:00.199',
'waiting_time_std': '0:00:00.123',
'waiting_ratio': 0.13722942739284952,
'working_time': ['0:00:01.142', '0:00:01.135', '0:00:01.278', '0:00:01.423'],
'working_time_mean': '0:00:01.245',
'working_time_std': '0:00:00.117',
'working_ratio': 0.8561601182661567,
'exit_time': ['0:00:00', '0:00:00', '0:00:00', '0:00:00']
'exit_time_mean': '0:00:00',
'exit_time_std': '0:00:00',
'exit_ratio': 0.0,
'top_5_max_task_durations': ['0:00:00.099', '0:00:00.098', '0:00:00.097', '0:00:00.096',
'0:00:00.095'],
'top_5_max_task_args': ['Arg 0: 99', 'Arg 0: 98', 'Arg 0: 97', 'Arg 0: 96', 'Arg 0: 95']}
We specified 4 workers, so there are 4 entries in the n_completed_tasks, start_up_time, init_time, waiting_time, working_time, and exit_time containers. They show per worker the number of completed tasks, the total start up time, the total time spend on the worker_init function, the total time waiting for new tasks, total time spend on main function, and the total time spend on the worker_exit function, respectively. The insights also contain mean, standard deviation, and ratio of the tracked time. The ratio is the time for that part divided by the total time. In general, the higher the working ratio the more efficient your multiprocessing setup is. Of course, your setup might still not be optimal because the task itself is inefficient, but timing that is beyond the scope of MPIRE.
Additionally, the insights keep track of the top 5 tasks that took the longest to run. The data is split up in two containers: one for the duration and one for the arguments that were passed on to the task function. Both are sorted based on task duration (desc), so index 0 of the args list corresponds to index 0 of the duration list, etc.
When using the MPIRE Dashboard you can track these insights in real-time. See Dashboard for more information.
NOTE:
Dill¶
For some functions or tasks it can be useful to not rely on pickle, but on some more powerful serialization backends like dill. dill isn’t installed by default. See Dill for more information on installing the dependencies.
One specific example where dill shines is when using start method spawn (the default on Windows) in combination with iPython or Jupyter notebooks. dill enables parallelizing more exotic objects like lambdas and functions defined in iPython and Jupyter notebooks. For all benefits of dill, please refer to the dill documentation.
Once the dependencies have been installed, you can enable it using the use_dill flag:
with WorkerPool(n_jobs=4, use_dill=True) as pool:
...
NOTE:
Order tasks¶
In some settings it can be useful to supply the tasks to workers in a round-robin fashion. This means worker 0 will get task 0, worker 1 will get task 1, etc. After each worker got a task, we start with worker 0 again instead of picking the worker that has most recently completed a task.
When the chunk size is larger than 1, the tasks are distributed to the workers in order, but in chunks. I.e., when chunk_size=3 tasks 0, 1, and 2 will be assigned to worker 0, tasks 3, 4, and 5 to worker 1, and so on.
When keep_alive is set to True and the second map call is made, MPIRE resets the worker order and starts at worker 0 again.
WARNING:
You can enable/disable task ordering by setting the order_tasks flag:
def task(x):
pass with WorkerPool(n_jobs=4, order_tasks=True) as pool:
pool.map(task, range(10))
Instead of passing the flag to the mpire.WorkerPool constructor you can also make use of mpire.WorkerPool.set_order_tasks():
with WorkerPool(n_jobs=4) as pool:
pool.set_order_tasks()
pool.map(task, range(10))
Map family¶
This section describes the different ways of interacting with a mpire.WorkerPool instance.
map family of functions¶
Contents¶
- Iterable of arguments
- Circumvent argument unpacking
- Mixing map functions
- Not exhausting a lazy imap function
mpire.WorkerPool implements four types of parallel map functions, being:
- mpire.WorkerPool.map()
- Blocks until results are ready, results are ordered in the same way as the provided arguments.
- mpire.WorkerPool.map_unordered()
- The same as mpire.WorkerPool.map(), but results are ordered by task completion time. Usually faster than mpire.WorkerPool.map().
- mpire.WorkerPool.imap()
- Lazy version of mpire.WorkerPool.map(), returns a generator. The generator will give results back whenever new results are ready. Results are ordered in the same way as the provided arguments.
- mpire.WorkerPool.imap_unordered()
- The same as mpire.WorkerPool.imap(), but results are ordered by task completion time. Usually faster than mpire.WorkerPool.imap().
When using a single worker the unordered versions are equivalent to their ordered counterparts.
Iterable of arguments¶
Each map function should receive a function and an iterable of arguments, where the elements of the iterable can be single values or iterables that are unpacked as arguments. If an element is a dictionary, the (key, value) pairs will be unpacked with the **-operator.
def square(x):
return x * x with WorkerPool(n_jobs=4) as pool:
# 1. Square the numbers, results should be: [0, 1, 4, 9, 16, 25, ...]
results = pool.map(square, range(100))
The first example should work as expected, the numbers are simply squared. MPIRE knows how many tasks there are because a range object implements the __len__ method (see Task chunking).
with WorkerPool(n_jobs=4) as pool:
# 2. Square the numbers, results should be: [0, 1, 4, 9, 16, 25, ...]
# Note: don't execute this, it will take a long time ...
results = pool.map(square, range(int(1e30)), iterable_len=int(1e30), chunk_size=1)
In the second example the 1e30 number is too large for Python: try calling len(range(int(1e30))), this will throw an OverflowError (don’t get me started …). Therefore, we must use the iterable_len parameter to let MPIRE know how large the tasks list is. We also have to specify a chunk size here as the chunk size should be lower than sys.maxsize.
def multiply(x, y):
return x * y with WorkerPool(n_jobs=4) as pool:
# 3. Multiply the numbers, results should be [0, 101, 204, 309, 416, ...]
for result in pool.imap(multiply, zip(range(100), range(100, 200)), iterable_len=100):
...
The third example shows an example of using multiple function arguments. Note that we use imap in this example, which allows us to process the results whenever they come available, not having to wait for all results to be ready.
with WorkerPool(n_jobs=4) as pool:
# 4. Multiply the numbers, results should be [0, 101, ...]
for result in pool.imap(multiply, [{'x': 0, 'y': 100}, {'y': 101, 'x': 1}, ...]):
...
The final example shows the use of an iterable of dictionaries. The (key, value) pairs are unpacked with the **-operator, as you would expect. So it doesn’t matter in what order the keys are stored. This should work for collection.OrderedDict objects as well.
Circumvent argument unpacking¶
If you want to avoid unpacking and pass the tuples in example 3 or the dictionaries in example 4 as a whole, you can. We’ll continue on example 4, but the workaround for example 3 is similar.
Suppose we have the following function which expects a dictionary:
def multiply_dict(d):
return d['x'] * d['y']
Then you would have to convert the list of dictionaries to a list of single argument tuples, where each argument is a dictionary:
with WorkerPool(n_jobs=4) as pool:
# Multiply the numbers, results should be [0, 101, ...]
for result in pool.imap(multiply_dict, [({'x': 0, 'y': 100},),
({'y': 101, 'x': 1},),
...]):
...
There is a utility function available that does this transformation for you:
from mpire.utils import make_single_arguments with WorkerPool(n_jobs=4) as pool:
# Multiply the numbers, results should be [0, 101, ...]
for result in pool.imap(multiply_dict, make_single_arguments([{'x': 0, 'y': 100},
{'y': 101, 'x': 1}, ...],
generator=False)):
...
mpire.utils.make_single_arguments() expects an iterable of arguments and converts them to tuples accordingly. The second argument of this function specifies if you want the function to return a generator or a materialized list. If we would like to return a generator we would need to pass on the iterable length as well.
Mixing map functions¶
map functions cannot be used while another map function is still running. E.g., the following will raise an exception:
with WorkerPool(n_jobs=4) as pool:
imap_results = pool.imap(multiply, zip(range(100), range(100, 200)), iterable_len=100)
next(imap_results) # We actually have to start the imap function
# Will raise because the imap function is still running
map_results = pool.map(square, range(100))
Make sure to first finish the imap function before starting a new map function. This holds for all map functions.
Not exhausting a lazy imap function¶
If you don’t exhaust a lazy imap function, but do close the pool, the remaining tasks and results will be lost. E.g., the following will raise an exception:
with WorkerPool(n_jobs=4) as pool:
imap_results = pool.imap(multiply, zip(range(100), range(100, 200)), iterable_len=100)
first_result = next(imap_results) # We actually have to start the imap function
pool.terminate()
# This will raise
results = list(imap_results)
Similarly, exiting the with block terminates the pool as well:
with WorkerPool(n_jobs=4) as pool:
imap_results = pool.imap(multiply, zip(range(100), range(100, 200)), iterable_len=100)
first_result = next(imap_results) # We actually have to start the imap function # This will raise results = list(imap_results)
Progress bar¶
Contents¶
- •
- Progress bar style
- •
- Changing the default style
- •
- Progress bar options
- •
- Progress bar position
Progress bar support is added through the tqdm package (installed by default when installing MPIRE). The most easy way to include a progress bar is by enabling the progress_bar flag in any of the map functions:
with WorkerPool(n_jobs=4) as pool:
pool.map(task, range(100), progress_bar=True)
This will display a basic tqdm progress bar displaying the time elapsed and remaining, number of tasks completed (including a percentage value) and the speed (i.e., number of tasks completed per time unit).
Progress bar style¶
You can switch to a different progress bar style by changing the progress_bar_style parameter. For example, when you require a notebook widget use 'notebook' as the style:
with WorkerPool(n_jobs=4) as pool:
pool.map(task, range(100), progress_bar=True, progress_bar_style='notebook')
The available styles are:
- None: use the default style (= 'std' , see below)
- 'std': use the standard tqdm progress bar
- 'rich': use the rich progress bar (requires the rich package to be installed, see Rich progress bars)
- 'notebook': use the Jupyter notebook widget
- 'dashboard': use only the progress bar on the dashboard
When in a terminal and using the 'notebook' style, the progress bar will behave weirdly. This is not recommended.
NOTE:
Changing the default style¶
You can change the default style by setting the mpire.tqdm_utils.PROGRESS_BAR_DEFAULT_STYLE variable:
import mpire.tqdm_utils mpire.tqdm_utils.PROGRESS_BAR_DEFAULT_STYLE = 'notebook'
Progress bar options¶
The tqdm progress bar can be configured using the progress_bar_options parameter. This parameter accepts a dictionary with keyword arguments that will be passed to the tqdm constructor.
Some options in tqdm will be overwritten by MPIRE. These include the iterable, total and leave parameters. The iterable is set to the iterable passed on to the map function. The total parameter is set to the number of tasks to be completed. The leave parameter is always set to True. Some other parameters have a default value assigned to them, but can be overwritten by the user.
Here’s an example where we change the description, the units, and the colour of the progress bar:
with WorkerPool(n_jobs=4) as pool:
pool.map(some_func, some_data, progress_bar=True,
progress_bar_options={'desc': 'Processing', 'unit': 'items', 'colour': 'green'})
For a complete list of available options, check out the tqdm docs.
Progress bar position¶
You can easily print a progress bar on a different position on the terminal using the position parameter of tqdm, which facilitates the use of multiple progress bars. Here’s an example of using multiple progress bars using nested WorkerPools:
def dispatcher(worker_id, X):
with WorkerPool(n_jobs=4) as nested_pool:
return nested_pool.map(task, X, progress_bar=True,
progress_bar_options={'position': worker_id + 1}) def main():
with WorkerPool(n_jobs=4, daemon=False, pass_worker_id=True) as pool:
pool.map(dispatcher, ((range(x, x + 100),) for x in range(100)), iterable_len=100,
n_splits=4, progress_bar=True) main()
We use worker_id + 1 here because the worker IDs start at zero and we reserve position 0 for the progress bar of the main WorkerPool (which is the default).
It goes without saying that you shouldn’t specify the same progress bar position multiple times.
NOTE:
NOTE:
Worker init and exit¶
When you want to initialize a worker you can make use of the worker_init parameter of any map function. This will call the initialization function only once per worker. Similarly, if you need to clean up the worker at the end of its lifecycle you can use the worker_exit parameter. Additionally, the exit function can return anything you like, which can be collected using mpire.WorkerPool.get_exit_results() after the workers are done.
Both init and exit functions receive the worker ID, shared objects, and worker state in the same way as the task function does, given they’re enabled.
For example:
def init_func(worker_state):
# Initialize a counter for each worker
worker_state['count_even'] = 0 def square_and_count_even(worker_state, x):
# Count number of even numbers and return the square
if x % 2 == 0:
worker_state['count_even'] += 1
return x * x def exit_func(worker_state):
# Return the counter
return worker_state['count_even'] with WorkerPool(n_jobs=4, use_worker_state=True) as pool:
pool.map(square_and_count_even, range(100), worker_init=init_func, worker_exit=exit_func)
print(pool.get_exit_results()) # Output, e.g.: [13, 13, 12, 12]
print(sum(pool.get_exit_results())) # Output: 50
IMPORTANT:
IMPORTANT:
Task chunking¶
By default, MPIRE chunks the given tasks in to 64 * n_jobs chunks. Each worker is given one chunk of tasks at a time before returning its results. This usually makes processing faster when you have rather small tasks (computation wise) and results are pickled/unpickled when they are send to a worker or main process. Chunking the tasks and results ensures that each process has to pickle/unpickle less often.
However, to determine the number of tasks in the argument list the iterable should implement the __len__ method, which is available in default containers like list or tuple, but isn’t available in most generator objects (the range object is one of the exceptions). To allow working with generators each map function has the option to pass the iterable length:
with WorkerPool(n_jobs=4) as pool:
# 1. This will issue a warning and sets the chunk size to 1
results = pool.map(square, ((x,) for x in range(1000)))
# 2. This will issue a warning as well and sets the chunk size to 1
results = pool.map(square, ((x,) for x in range(1000)), n_splits=4)
# 3. Square the numbers using a generator using a specific number of splits
results = pool.map(square, ((x,) for x in range(1000)), iterable_len=1000, n_splits=4)
# 4. Square the numbers using a generator using automatic chunking
results = pool.map(square, ((x,) for x in range(1000)), iterable_len=1000)
# 5. Square the numbers using a generator using a fixed chunk size
results = pool.map(square, ((x,) for x in range(1000)), chunk_size=4)
In the first two examples the function call will issue a warning because MPIRE doesn’t know how large the chunks should be as the total number of tasks is unknown, therefore it will fall back to a chunk size of 1. The third example should work as expected where 4 chunks are used. The fourth example uses 256 chunks (the default 64 times the number of workers). The last example uses a fixed chunk size of four, so MPIRE doesn’t need to know the iterable length.
You can also call the chunk function manually:
from mpire.utils import chunk_tasks # Convert to list because chunk_tasks returns a generator print(list(chunk_tasks(range(10), n_splits=3))) print(list(chunk_tasks(range(10), chunk_size=2.5))) print(list(chunk_tasks((x for x in range(10)), iterable_len=10, n_splits=6)))
will output:
[(0, 1, 2, 3), (4, 5, 6), (7, 8, 9)] [(0, 1, 2), (3, 4), (5, 6, 7), (8, 9)] [(0, 1), (2, 3), (4,), (5, 6), (7, 8), (9,)]
Maximum number of active tasks¶
When you have tasks that take up a lot of memory you can do a few things:
- Limit the number of jobs (i.e., the number of tasks currently being available to the workers, tasks that are in the queue ready to be processed).
- Limit the number of active tasks
The first option is the most obvious one to save memory when the processes themselves use up much memory. The second is convenient when the argument list takes up too much memory. For example, suppose you want to kick off an enormous amount of jobs (let’s say a billion) of which the arguments take up 1 KB per task (e.g., large strings), then that task queue would take up ~1 TB of memory!
In such cases, a good rule of thumb would be to have twice the amount of active chunks of tasks than there are jobs. This means that when all workers complete their task at the same time each would directly be able to continue with another task. When workers take on their new tasks the generator of tasks is iterated to the point that again there would be twice the amount of active chunks of tasks.
In MPIRE, the maximum number of active tasks by default is set to n_jobs * chunk_size * 2, so you don’t have to tweak it for memory optimization. If, for whatever reason, you want to change this behavior, you can do so by setting the max_active_tasks parameter:
with WorkerPool(n_jobs=4) as pool:
results = pool.map(task, range(int(1e300)), iterable_len=int(1e300),
chunk_size=int(1e5), max_tasks_active=4 * int(1e5))
NOTE:
Worker lifespan¶
Occasionally, workers that process multiple, memory intensive tasks do not release their used up memory properly, which results in memory usage building up. This is not a bug in MPIRE, but a consequence of Python’s poor garbage collection. To avoid this type of problem you can set the worker lifespan: the number of tasks after which a worker should restart.
with WorkerPool(n_jobs=4) as pool:
results = pool.map(task, range(100), worker_lifespan=1, chunk_size=1)
In this example each worker is restarted after finishing a single task.
NOTE:
Timeouts¶
Timeouts can be set separately for the target, worker_init and worker_exit functions. When a timeout has been set and reached, it will throw a TimeoutError:
# Will raise TimeoutError, provided that the target function takes longer # than half a second to complete with WorkerPool(n_jobs=5) as pool:
pool.map(time_consuming_function, range(10), task_timeout=0.5) # Will raise TimeoutError, provided that the worker_init function takes longer # than 3 seconds to complete or the worker_exit function takes longer than # 150.5 seconds to complete with WorkerPool(n_jobs=5) as pool:
pool.map(time_consuming_function, range(10), worker_init=init, worker_exit=exit_,
worker_init_timeout=3.0, worker_exit_timeout=150.5)
Use None (=default) to disable timeouts.
imap and imap_unordered¶
When you’re using one of the lazy map functions (e.g., imap or imap_unordered) then an exception will only be raised when the function is actually running. E.g. when executing:
with WorkerPool(n_jobs=5) as pool:
results = pool.imap(time_consuming_function, range(10), task_timeout=0.5)
this will never raise. This is because imap and imap_unordered return a generator object, which stops executing until it gets the trigger to go beyond the yield statement. When iterating through the results, it will raise as expected:
with WorkerPool(n_jobs=5) as pool:
results = pool.imap(time_consuming_function, range(10), task_timeout=0.5)
for result in results:
...
Threading¶
When using threading as start method MPIRE won’t be able to interrupt certain functions, like time.sleep.
Numpy arrays¶
Contents¶
- Chunking
- Return value
Chunking¶
Numpy arrays are treated a little bit differently when passed on to the map functions. Usually MPIRE uses itertools.islice for chunking, which depends on the __iter__ special function of the container object. But applying that to numpy arrays:
import numpy as np # Create random array arr = np.random.rand(10, 3) # Chunk the array using default chunking arr_iter = iter(arr) chunk_size = 3 while True:
chunk = list(itertools.islice(arr_iter, chunk_size))
if chunk:
yield chunk
else:
break
would yield:
[array([0.68438994, 0.9701514 , 0.40083965]), array([0.88428556, 0.2083905 , 0.61490443]),
array([0.89249174, 0.39902235, 0.70762541])] [array([0.18850964, 0.1022777 , 0.41539432]), array([0.07327858, 0.18608165, 0.75862301]),
array([0.69215651, 0.4211941 , 0.31029439])] [array([0.82571272, 0.72257819, 0.86079131]), array([0.91285817, 0.49398461, 0.27863929]),
array([0.146981 , 0.84671211, 0.30122806])] [array([0.11783283, 0.12585031, 0.39864368])]
In other words, each row of the array is now in its own array and each one of them is given to the target function individually. Instead, MPIRE will chunk them in to something more reasonable using numpy slicing instead:
from mpire.utils import chunk_tasks for chunk in chunk_tasks(arr, chunk_size=chunk_size):
print(repr(chunk))
Output:
array([[0.68438994, 0.9701514 , 0.40083965],
[0.88428556, 0.2083905 , 0.61490443],
[0.89249174, 0.39902235, 0.70762541]]) array([[0.18850964, 0.1022777 , 0.41539432],
[0.07327858, 0.18608165, 0.75862301],
[0.69215651, 0.4211941 , 0.31029439]]) array([[0.82571272, 0.72257819, 0.86079131],
[0.91285817, 0.49398461, 0.27863929],
[0.146981 , 0.84671211, 0.30122806]]) array([[0.11783283, 0.12585031, 0.39864368]])
Each chunk is now a single numpy array containing as many rows as the chunk size, except for the last chunk as there aren’t enough rows left.
Return value¶
When the user provided function returns numpy arrays and you’re applying the mpire.WorkerPool.map() function MPIRE will concatenate the resulting numpy arrays to a single array by default. For example:
def add_five(x):
return x + 5 with WorkerPool(n_jobs=4) as pool:
results = pool.map(add_five, arr, chunk_size=chunk_size)
will return:
array([[5.68438994, 5.9701514 , 5.40083965],
[5.88428556, 5.2083905 , 5.61490443],
[5.89249174, 5.39902235, 5.70762541],
[5.18850964, 5.1022777 , 5.41539432],
[5.07327858, 5.18608165, 5.75862301],
[5.69215651, 5.4211941 , 5.31029439],
[5.82571272, 5.72257819, 5.86079131],
[5.91285817, 5.49398461, 5.27863929],
[5.146981 , 5.84671211, 5.30122806],
[5.11783283, 5.12585031, 5.39864368]])
This behavior can be cancelled by using the concatenate_numpy_output flag:
with WorkerPool(n_jobs=4) as pool:
results = pool.map(add_five, arr, chunk_size=chunk_size, concatenate_numpy_output=False)
This will return individual arrays:
[array([[5.68438994, 5.9701514 , 5.40083965],
[5.88428556, 5.2083905 , 5.61490443],
[5.89249174, 5.39902235, 5.70762541]]),
array([[5.18850964, 5.1022777 , 5.41539432],
[5.07327858, 5.18608165, 5.75862301],
[5.69215651, 5.4211941 , 5.31029439]]),
array([[5.82571272, 5.72257819, 5.86079131],
[5.91285817, 5.49398461, 5.27863929],
[5.146981 , 5.84671211, 5.30122806]]),
array([[5.11783283, 5.12585031, 5.39864368]])]
Apply family¶
Contents¶
- apply
- apply_async
- AsyncResult
- Callbacks
- Worker init and exit
- Timeouts
mpire.WorkerPool implements two apply functions, which are very similar to the ones in the multiprocessing module:
- mpire.WorkerPool.apply()
- Apply a function to a single task. This is a blocking call.
- mpire.WorkerPool.apply_async()
- A variant of the above, but which is non-blocking. This returns an mpire.async_result.AsyncResult object.
apply¶
The apply function is a blocking call, which means that it will not return until the task is completed. If you want to run multiple different tasks in parallel, you should use the apply_async function instead. If you require to run the same function for many tasks in parallel, use the map functions instead.
The apply function takes a function, positional arguments, and keyword arguments, similar to how multiprocessing does it.
def task(a, b, c, d):
return a + b + c + d with WorkerPool(n_jobs=1) as pool:
result = pool.apply(task, args=(1, 2), kwargs={'d': 4, 'c': 3})
print(result)
apply_async¶
The apply_async function is a non-blocking call, which means that it will return immediately. It returns an mpire.async_result.AsyncResult object, which can be used to get the result of the task at a later moment in time.
The apply_async function takes the same parameters as the apply function.
def task(a, b):
return a + b with WorkerPool(n_jobs=4) as pool:
async_results = [pool.apply_async(task, args=(i, i)) for i in range(10)]
results = [async_result.get() for async_result in async_results]
Obtaining the results should happen while the pool is still running! E.g., the following will deadlock:
with WorkerPool(n_jobs=4) as pool:
async_results = [pool.apply_async(task, args=(i, i)) for i in range(10)] # Will wait forever results = [async_result.get() for async_result in async_results]
You can, however, make use of the mpire.WorkerPool.stop_and_join() function to stop the workers and join the pool. This will make sure that all tasks are completed before the pool exits.
with WorkerPool(n_jobs=4) as pool:
async_results = [pool.apply_async(task, args=(i, i)) for i in range(10)]
pool.stop_and_join() # Will not deadlock results = [async_result.get() for async_result in async_results]
AsyncResult¶
The mpire.async_result.AsyncResult object has the following convenient methods:
with WorkerPool(n_jobs=1) as pool:
async_result = pool.apply_async(task, args=(1, 1))
# Check if the task is completed
is_completed = async_result.ready()
# Wait until the task is completed, or until the timeout is reached.
async_result.wait(timeout=10)
# Get the result of the task. This will block until the task is completed,
# or until the timeout is reached.
result = async_result.get(timeout=None)
# Check if the task was successful (i.e., did not raise an exception).
# This will raise an exception if the task is not completed yet.
is_successful = async_result.successful()
Callbacks¶
Each apply function has a callback and error_callback argument. These are functions which are called when the task is finished. The callback function is called with the result of the task when the task was completed successfully, and the error_callback is called with the exception when the task failed.
def task(a):
return a + 1 def callback(result):
print("Task completed successfully with result:", result) def error_callback(exception):
print("Task failed with exception:", exception) with WorkerPool(n_jobs=1) as pool:
pool.apply(task, 42, callback=callback, error_callback=error_callback)
Worker init and exit¶
As with the map family of functions, the apply family of functions also has worker_init and worker_exit arguments. These are functions which are called when a worker is started and stopped, respectively. See Worker init and exit for more information on these functions.
def worker_init():
print("Worker started") def worker_exit():
print("Worker stopped") with WorkerPool(n_jobs=5) as pool:
pool.apply(task, 42, worker_init=worker_init, worker_exit=worker_exit)
There’s a caveat though. When the first apply or apply_async function is executed, the entire pool of workers is started. This means that in the above example all five workers are started, while only one was needed. This also means that the worker_init function is set for all those workers at once. This means you cannot have a different worker_init function for each apply task. A second, different worker_init function will simply be ignored.
Similarly, the worker_exit function can only be set once as well. Additionally, exit functions are only called when a worker exits, which in this case translates to when the pool exits. This means that if you call apply or apply_async multiple times, the worker_exit function is only called once at the end. Use mpire.WorkerPool.stop_and_join() to stop the workers, which will cause the worker_exit function to be triggered for each worker.
Timeouts¶
The apply family of functions also has task_timeout, worker_init_timeout and worker_exit_timeout arguments. These are timeouts for the task, the worker_init function and the worker_exit function, respectively. They work similarly as those for the map functions.
When a single task times out, only that task is cancelled. The other tasks will continue to run. When a worker init or exit times out, the entire pool is stopped.
See Timeouts for more information.
Dashboard¶
The dashboard allows you to see progress information from a browser. This is convenient when running scripts in a notebook or screen, if you want to share the progress information with others, or if you want to get real-time worker insight information.
The dashboard dependencies are not installed by default. See Dashboard for more information.
Contents¶
- Starting the dashboard
- Connecting to an existing dashboard
- Using the dashboard
- Stack level
Starting the dashboard¶
You can start the dashboard programmatically:
from mpire.dashboard import start_dashboard # Will return a dictionary with dashboard details dashboard_details = start_dashboard() print(dashboard_details)
which will print:
{'dashboard_port_nr': 8080,
'manager_host': 'localhost',
'manager_port_nr': 8081}
This will start a dashboard on your local machine on port 8080. When the port is already in use MPIRE will try the next, until it finds an unused one. In the rare case that no ports are available up to port 8099 the function will raise an OSError. By default, MPIRE tries ports 8080-8100. You can override this range by passing on a custom range object:
dashboard_details = start_dashboard(range(9000, 9100))
The returned dictionary contains the port number that is ultimately chosen. It also contains information on how to connect to this dashboard remotely.
Another way of starting a dashboard is by using the bash script (this doesn’t work on Windows!):
$ mpire-dashboard
This will start a dashboard with the connection details printed on screen. It will say something like:
Starting MPIRE dashboard... MPIRE dashboard started on http://localhost:8080 Server is listening on localhost:8098 --------------------------------------------------
The server part corresponds to the manager_host and manager_port_nr from the dictionary returned by mpire.dashboard.start_dashboard(). Similarly to earlier, a custom port range can be provided:
$ mpire-dashboard --port-range 9000-9100
The benefit of starting a dashboard this way is that your dashboard keeps running in case of errors in your script. You will be able to see what the error was, when it occurred and where it occurred in your code.
Connecting to an existing dashboard¶
If you have started a dashboard elsewhere, you can connect to it using:
from mpire.dashboard import connect_to_dashboard connect_to_dashboard(manager_port_nr=8081, manager_host='localhost')
Make sure you use the manager_port_nr, not the dashboard_port_nr in the examples above.
You can connect to an existing dashboard on the same, but also on a remote machine (if the ports are open). If manager_host is omitted it will fall back to using 'localhost'.
Using the dashboard¶
Once connected to a dashboard you don’t need to change anything to your code. When you have enabled the use of a progress bar in your map call the progress bar will automatically register itself to the dashboard server and show up, like here:
from mpire import WorkerPool from mpire.dashboard import connect_to_dashboard connect_to_dashboard(8099) def square(x):
import time
time.sleep(0.01) # To be able to show progress
return x * x with WorkerPool(4) as pool:
pool.map(square, range(10000), progress_bar=True)
This will show something like: [image: ] [image]
You can click on a progress bar row to view details about the function that is called (which has already been done in the screenshot above).
It will let you know when a KeyboardInterrupt signal was send to the running process: [image: ] [image]
or show the traceback information in case of an exception: [image: ] [image]
In case you have enabled Worker insights these insights will be shown real-time in the dashboard: [image: ] [image]
Click on the Insights (click to expand/collapse) to either expand or collapse the insight details.
The dashboard will refresh automatically every 0.5 seconds.
Stack level¶
By default, the dashboard will show information about the function that is called and where it is called from. However, in some cases where you have wrapped the function in another function, you might be less interested in the wrapper function and more interested in the function that is calling this wrapper. In such cases you can use mpire.dashboard.set_stacklevel() to set the stack level. This is the number of levels in the stack to go back in order to find the frame that contains the function that is invoking MPIRE. For example:
from mpire import WorkerPool from mpire.dashboard import set_stacklevel, start_dashboard class WorkerPoolWrapper:
def __init__(self, n_jobs, progress_bar=True):
self.n_jobs = n_jobs
self.progress_bar = progress_bar
def __call__(self, func, data):
with WorkerPool(self.n_jobs) as pool:
return pool.map(func, data, progress_bar=self.progress_bar) def square(x):
return x * x if __name__ == '__main__':
start_dashboard()
executor = WorkerPoolWrapper(4, progress_bar=True)
set_stacklevel(1) # default
results = executor(square, range(10000))
set_stacklevel(2)
results = executor(square, range(10000))
When you run this code you will see that the dashboard will show two progress bars. In both cases, the dashboard will show the square function as the function that is called. However, in the first case, it will show return pool.map(func, data, progress_bar=self.progress_bar) as the line where it is called from. In the second case, it will show the results = executor(square, range(10000)) line.
Troubleshooting¶
This section describes some known problems that can arise when using MPIRE.
Contents¶
- •
- Progress bar issues with Jupyter notebooks
- IProgress not found
- Widget Javascript not detected
- Unit tests
- Shutting down takes a long time on error
- Unpicklable tasks/results
- AttributeError: Can’t get attribute ‘<some_function>’ on <module ‘__main__’ (built-in)>
- Windows
- macOS
Progress bar issues with Jupyter notebooks¶
When using the progress bar in a Jupyter notebook you might encounter some issues. A few of these are described below, together with possible solutions.
IProgress not found¶
When you something like ImportError: IProgress not found. Please update jupyter and ipywidgets., this means ipywidgets is not installed. You can install it using pip:
pip install ipywidgets
or conda:
conda install -c conda-forge ipywidgets
Have a look at the ipywidgets documentation for more information.
Widget Javascript not detected¶
When you see something like Widget Javascript not detected. It may not be enabled properly., this means the Javascript extension is not enabled. You can enable it using the following command before starting your notebook:
jupyter nbextension enable --py --sys-prefix widgetsnbextension
Note that you have to restart your notebook server after enabling the extension, simply restarting the kernel won’t be enough.
Unit tests¶
When using the 'spawn' or 'forkserver' method you’ll probably run into one or two issues when running unittests in your own package. One problem that might occur is that your unittests will restart whenever the piece of code containing such a start method is called, leading to very funky terminal output. To remedy this problem make sure your setup call in setup.py is surrounded by an if __name__ == '__main__': clause:
from setuptools import setup if __name__ == '__main__':
# Call setup and install any dependencies you have inside the if-clause
setup(...)
See the ‘Safe importing of main module’ section at caveats.
The second problem you might encounter is that the semaphore tracker of multiprocessing will complain when you run individual (or a selection of) unittests using python setup.py test -s tests.some_test. At the end of the tests you will see errors like:
Traceback (most recent call last):
File ".../site-packages/multiprocess/semaphore_tracker.py", line 132, in main
cache.remove(name) KeyError: b'/mp-d3i13qd5' .../site-packages/multiprocess/semaphore_tracker.py:146: UserWarning: semaphore_tracker: There appear to be 58
leaked semaphores to clean up at shutdown
len(cache)) .../site-packages/multiprocess/semaphore_tracker.py:158: UserWarning: semaphore_tracker: '/mp-f45dt4d6': [Errno 2]
No such file or directory
warnings.warn('semaphore_tracker: %r: %s' % (name, e)) ...
Your unittests will still succeed and run OK. Unfortunately, I’ve not found a remedy to this problem using python setup.py test yet. What you can use instead is something like the following:
python -m unittest tests.some_test
This will work just fine. See the unittest documentation for more information.
Shutting down takes a long time on error¶
When you issue a KeyboardInterrupt or when an error occured in the function that’s run in parallel, there are situations where MPIRE needs a few seconds to gracefully shutdown. This has to do with the fact that in these situations the task or results queue can be quite full, still. MPIRE drains these queues until they’re completely empty, as to properly shutdown and clean up every communication channel.
To remedy this issue you can use the max_tasks_active parameter and set it to n_jobs * 2, or similar. Aside from the added benefit that the workers can start more quickly, the queues won’t get that full anymore and shutting down will be much quicker. See Maximum number of active tasks for more information.
When you’re using a lazy map function also be sure to iterate through the results, otherwise that queue will be full and draining it will take a longer time.
Unpicklable tasks/results¶
Sometimes you can encounter deadlocks in your code when using MPIRE. When you encounter this, chances are some tasks or results from your script can’t be pickled. MPIRE makes use of multiprocessing queues for inter-process communication and if your function returns unpicklable results the queue will unfortunately deadlock.
The only way to remedy this problem in MPIRE would be to manually pickle objects before sending it to a queue and quit gracefully when encountering a pickle error. However, this would mean objects would always be pickled twice. This would add a heavy performance penalty and is therefore not an acceptable solution.
Instead, the user should make sure their tasks and results are always picklable (which in most cases won’t be a problem), or resort to setting use_dill=True. The latter is capable of pickling a lot more exotic types. See Dill for more information.
AttributeError: Can’t get attribute ‘<some_function>’ on <module ‘__main__’ (built-in)>¶
This error can occur when inside an iPython or Jupyter notebook session and the function to parallelize is defined in that session. This is often the result of using spawn as start method (the default on Windows), which starts a new process without copying the function in question.
This error is actually related to the Unpicklable tasks/results problem and can be solved in a similar way. I.e., you can define your function in a file that can be imported by the child process, or you can resort to using dill by setting use_dill=True. See Dill for more information.
Windows¶
- When using dill and an exception occurs, or when the exception occurs in an exit function, it can print additional OSError messages in the terminal, but they can be safely ignored.
- The mpire-dashboard script does not work on Windows.
macOS¶
- When encountering OSError: [Errno 24] Too many open files errors, use ulimit -n <number> to increase the limit of the number of open files. This is required because MPIRE uses file-descriptor based synchronization primitives and macOS has a very low default limit. For example, MPIRE uses about 190 file descriptors when using 10 workers.
- Pinning of processes to CPU cores is not supported on macOS. This is because macOS does not support the sched_setaffinity system call. A warning will be printed when trying to use this feature.
API Reference¶
Contents¶
- WorkerPool
- AsyncResult
- Task chunking
- Converting iterable of arguments
- Dashboard
- Other
WorkerPool¶
- class mpire.WorkerPool(n_jobs=None, daemon=True, cpu_ids=None, shared_objects=None, pass_worker_id=False, use_worker_state=False, start_method='fork', keep_alive=False, use_dill=False, enable_insights=False, order_tasks=False)
- A multiprocessing worker pool which acts like a multiprocessing.Pool, but is faster and has more options.
- __exit__(*_)
- Enable the use of the with statement. Gracefully terminates workers, if there are any
- Return type
- None
- __init__(n_jobs=None, daemon=True, cpu_ids=None, shared_objects=None, pass_worker_id=False, use_worker_state=False, start_method='fork', keep_alive=False, use_dill=False, enable_insights=False, order_tasks=False)
- n_jobs (Optional[int]) – Number of workers to spawn. If None, will use mpire.cpu_count()
- daemon (bool) – Whether to start the child processes as daemon
- cpu_ids (List[Union[int, List[int]]]) – List of CPU IDs to use for pinning child processes to specific CPUs. The list must be as long as the number of jobs used (if n_jobs equals None it must be equal to mpire.cpu_count()), or the list must have exactly one element. In the former case, element i specifies the CPU ID(s) to use for child process i. In the latter case the single element specifies the CPU ID(s) for all child processes to use. A single element can be either a single integer specifying a single CPU ID, or a list of integers specifying that a single child process can make use of multiple CPU IDs. If None, CPU pinning will be disabled
- shared_objects (Any) – Objects to be passed on as shared objects to the workers once. It will be passed on to the target, worker_init, and worker_exit functions. shared_objects is only passed on when it’s not None. Shared objects will be copy-on-write when using fork as start method. When enabled, functions receive the shared objects as second argument, depending on other settings. The order is: worker_id, shared_objects, worker_state, and finally the arguments passed on from iterable_of_args
- pass_worker_id (bool) – Whether to pass on a worker ID to the target, worker_init, and worker_exit functions. When enabled, functions receive the worker ID as first argument, depending on other settings. The order is: worker_id, shared_objects, worker_state, and finally the arguments passed on from iterable_of_args
- use_worker_state (bool) – Whether to let a worker have a worker state. The worker state will be passed on to the target, worker_init, and worker_exit functions. When enabled, functions receive the worker state as third argument, depending on other settings. The order is: worker_id, shared_objects, worker_state, and finally the arguments passed on from iterable_of_args
- start_method (str) – Which process start method to use. Options for multiprocessing: 'fork' (default, if available), 'forkserver' and 'spawn' (default, if 'fork' isn’t available). For multithreading use 'threading'. See https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods for more information and https://docs.python.org/3/library/multiprocessing.html#the-spawn-and-forkserver-start-methods for some caveats when using the 'spawn' or 'forkserver' methods
- keep_alive (bool) – When True it will keep workers alive after completing a map call, allowing to reuse workers
- use_dill (bool) – Whether to use dill as serialization backend. Some exotic types (e.g., lambdas, nested functions) don’t work well when using spawn as start method. In such cased, use dill (can be a bit slower sometimes)
- enable_insights (bool) – Whether to enable worker insights. Might come at a small performance penalty (often neglible)
- order_tasks (bool) – Whether to provide tasks to the workers in order, such that worker 0 will get chunk 0, worker 1 will get chunk 1, etc.
- __weakref__
- list of weak references to the object
- apply(func, args=(), kwargs=None, callback=None, error_callback=None, worker_init=None, worker_exit=None, task_timeout=None, worker_init_timeout=None, worker_exit_timeout=None)
- Apply a function to a single task. This is a blocking call.
- func (Callable) – Function to apply to the task. When passing on the worker ID the function should receive the worker ID as its first argument. If shared objects are provided the function should receive those as the next argument. If the worker state has been enabled it should receive a state variable as the next argument
- args (Any) – Arguments to pass to a worker, which passes it to the function func as func(*args)
- kwargs (Dict) – Keyword arguments to pass to a worker, which passes it to the function func as func(**kwargs)
- callback (Optional[Callable]) – Callback function to call when the task is finished. The callback function receives the output of the function func as its argument
- error_callback (Optional[Callable]) – Callback function to call when the task has failed. The callback function receives the exception as its argument
- worker_init (Optional[Callable]) – Function to call each time a new worker starts. When passing on the worker ID the function should receive the worker ID as its first argument. If shared objects are provided the function should receive those as the next argument. If the worker state has been enabled it should receive a state variable as the next argument
- worker_exit (Optional[Callable]) – Function to call each time a worker exits. Return values will be fetched and made available through mpire.WorkerPool.get_exit_results. When passing on the worker ID the function should receive the worker ID as its first argument. If shared objects are provided the function should receive those as the next argument. If the worker state has been enabled it should receive a state variable as the next argument
- task_timeout (Optional[float]) – Timeout in seconds for a single task. When the timeout is exceeded, MPIRE will raise a TimeoutError. Use None to disable (default). Note: the timeout doesn’t apply to worker_init and worker_exit functions, use worker_init_timeout and worker_exit_timeout for that, respectively
- worker_init_timeout (Optional[float]) – Timeout in seconds for the worker_init function. When the timeout is exceeded, MPIRE will raise a TimeoutError. Use None to disable (default).
- worker_exit_timeout (Optional[float]) – Timeout in seconds for the worker_exit function. When the timeout is exceeded, MPIRE will raise a TimeoutError. Use None to disable (default).
- Return type
- Any
- Returns
- Result of the function func applied to the task
- apply_async(func, args=(), kwargs=None, callback=None, error_callback=None, worker_init=None, worker_exit=None, task_timeout=None, worker_init_timeout=None, worker_exit_timeout=None)
- Apply a function to a single task. This is a non-blocking call.
- func (Callable) – Function to apply to the task. When passing on the worker ID the function should receive the worker ID as its first argument. If shared objects are provided the function should receive those as the next argument. If the worker state has been enabled it should receive a state variable as the next argument
- args (Any) – Arguments to pass to a worker, which passes it to the function func as func(*args)
- kwargs (Dict) – Keyword arguments to pass to a worker, which passes it to the function func as func(**kwargs)
- callback (Optional[Callable]) – Callback function to call when the task is finished. The callback function receives the output of the function func as its argument
- error_callback (Optional[Callable]) – Callback function to call when the task has failed. The callback function receives the exception as its argument
- worker_init (Optional[Callable]) – Function to call each time a new worker starts. When passing on the worker ID the function should receive the worker ID as its first argument. If shared objects are provided the function should receive those as the next argument. If the worker state has been enabled it should receive a state variable as the next argument
- worker_exit (Optional[Callable]) – Function to call each time a worker exits. Return values will be fetched and made available through mpire.WorkerPool.get_exit_results. When passing on the worker ID the function should receive the worker ID as its first argument. If shared objects are provided the function should receive those as the next argument. If the worker state has been enabled it should receive a state variable as the next argument
- task_timeout (Optional[float]) – Timeout in seconds for a single task. When the timeout is exceeded, MPIRE will raise a TimeoutError. Use None to disable (default). Note: the timeout doesn’t apply to worker_init and worker_exit functions, use worker_init_timeout and worker_exit_timeout for that, respectively
- worker_init_timeout (Optional[float]) – Timeout in seconds for the worker_init function. When the timeout is exceeded, MPIRE will raise a TimeoutError. Use None to disable (default).
- worker_exit_timeout (Optional[float]) – Timeout in seconds for the worker_exit function. When the timeout is exceeded, MPIRE will raise a TimeoutError. Use None to disable (default).
- Return type
- AsyncResult
- Returns
- Result of the function func applied to the task
- get_exit_results()
- Obtain a list of exit results when an exit function is defined.
- Return type
- List
- Returns
- Exit results list
- get_insights()
- Creates insights from the raw insight data
- Return type
- Dict
- Returns
- Dictionary containing worker insights
- imap(func, iterable_of_args, iterable_len=None, max_tasks_active=None, chunk_size=None, n_splits=None, worker_lifespan=None, progress_bar=False, worker_init=None, worker_exit=None, task_timeout=None, worker_init_timeout=None, worker_exit_timeout=None, progress_bar_options=None, progress_bar_style=None)
- Same as multiprocessing.imap_unordered(), but ordered. Also allows a user to set the maximum number of tasks available in the queue.
- func (Callable) – Function to call each time new task arguments become available. When passing on the worker ID the function should receive the worker ID as its first argument. If shared objects are provided the function should receive those as the next argument. If the worker state has been enabled it should receive a state variable as the next argument
- iterable_of_args (Union[Sized, Iterable]) – A numpy array or an iterable containing tuples of arguments to pass to a worker, which passes it to the function func
- iterable_len (Optional[int]) – Number of elements in the iterable_of_args. When chunk_size is set to None it needs to know the number of tasks. This can either be provided by implementing the __len__ function on the iterable object, or by specifying the number of tasks
- max_tasks_active (Optional[int]) – Maximum number of active tasks in the queue. If None it will be converted to n_jobs * chunk_size * 2
- chunk_size (Optional[int]) – Number of simultaneous tasks to give to a worker. When None it will use n_splits.
- n_splits (Optional[int]) – Number of splits to use when chunk_size is None. When both chunk_size and n_splits are None, it will use n_splits = n_jobs * 64.
- worker_lifespan (Optional[int]) – Number of tasks a worker can handle before it is restarted. If None, workers will stay alive the entire time. Use this when workers use up too much memory over the course of time
- progress_bar (bool) – When True it will display a progress bar
- worker_init (Optional[Callable]) – Function to call each time a new worker starts. When passing on the worker ID the function should receive the worker ID as its first argument. If shared objects are provided the function should receive those as the next argument. If the worker state has been enabled it should receive a state variable as the next argument
- worker_exit (Optional[Callable]) – Function to call each time a worker exits. Return values will be fetched and made available through mpire.WorkerPool.get_exit_results. When passing on the worker ID the function should receive the worker ID as its first argument. If shared objects are provided the function should receive those as the next argument. If the worker state has been enabled it should receive a state variable as the next argument
- task_timeout (Optional[float]) – Timeout in seconds for a single task. When the timeout is exceeded, MPIRE will raise a TimeoutError. Use None to disable (default). Note: the timeout doesn’t apply to worker_init and worker_exit functions, use worker_init_timeout and worker_exit_timeout for that, respectively
- worker_init_timeout (Optional[float]) – Timeout in seconds for the worker_init function. When the timeout is exceeded, MPIRE will raise a TimeoutError. Use None to disable (default).
- worker_exit_timeout (Optional[float]) – Timeout in seconds for the worker_exit function. When the timeout is exceeded, MPIRE will raise a TimeoutError. Use None to disable (default).
- progress_bar_options (Optional[Dict[str, Any]]) – Dictionary containing keyword arguments to pass to the tqdm progress bar. See tqdm.tqdm() for details. The arguments total and leave will be overwritten by MPIRE.
- progress_bar_style (Optional[str]) – The progress bar style to use. Can be one of None, 'std', or 'notebook'
- Return type
- Generator[Any, None, None]
- Returns
- Generator yielding ordered results
- imap_unordered(func, iterable_of_args, iterable_len=None, max_tasks_active=None, chunk_size=None, n_splits=None, worker_lifespan=None, progress_bar=False, worker_init=None, worker_exit=None, task_timeout=None, worker_init_timeout=None, worker_exit_timeout=None, progress_bar_options=None, progress_bar_style=None)
- Same as multiprocessing.imap_unordered(). Also allows a user to set the maximum number of tasks available in the queue.
- func (Callable) – Function to call each time new task arguments become available. When passing on the worker ID the function should receive the worker ID as its first argument. If shared objects are provided the function should receive those as the next argument. If the worker state has been enabled it should receive a state variable as the next argument
- iterable_of_args (Union[Sized, Iterable]) – A numpy array or an iterable containing tuples of arguments to pass to a worker, which passes it to the function func
- iterable_len (Optional[int]) – Number of elements in the iterable_of_args. When chunk_size is set to None it needs to know the number of tasks. This can either be provided by implementing the __len__ function on the iterable object, or by specifying the number of tasks
- max_tasks_active (Optional[int]) – Maximum number of active tasks in the queue. If None it will be converted to n_jobs * chunk_size * 2
- chunk_size (Optional[int]) – Number of simultaneous tasks to give to a worker. When None it will use n_splits.
- n_splits (Optional[int]) – Number of splits to use when chunk_size is None. When both chunk_size and n_splits are None, it will use n_splits = n_jobs * 64.
- worker_lifespan (Optional[int]) – Number of tasks a worker can handle before it is restarted. If None, workers will stay alive the entire time. Use this when workers use up too much memory over the course of time
- progress_bar (bool) – When True it will display a progress bar
- worker_init (Optional[Callable]) – Function to call each time a new worker starts. When passing on the worker ID the function should receive the worker ID as its first argument. If shared objects are provided the function should receive those as the next argument. If the worker state has been enabled it should receive a state variable as the next argument
- worker_exit (Optional[Callable]) – Function to call each time a worker exits. Return values will be fetched and made available through mpire.WorkerPool.get_exit_results. When passing on the worker ID the function should receive the worker ID as its first argument. If shared objects are provided the function should receive those as the next argument. If the worker state has been enabled it should receive a state variable as the next argument
- task_timeout (Optional[float]) – Timeout in seconds for a single task. When the timeout is exceeded, MPIRE will raise a TimeoutError. Use None to disable (default). Note: the timeout doesn’t apply to worker_init and worker_exit functions, use worker_init_timeout and worker_exit_timeout for that, respectively
- worker_init_timeout (Optional[float]) – Timeout in seconds for the worker_init function. When the timeout is exceeded, MPIRE will raise a TimeoutError. Use None to disable (default).
- worker_exit_timeout (Optional[float]) – Timeout in seconds for the worker_exit function. When the timeout is exceeded, MPIRE will raise a TimeoutError. Use None to disable (default).
- progress_bar_options (Optional[Dict[str, Any]]) – Dictionary containing keyword arguments to pass to the tqdm progress bar. See tqdm.tqdm() for details. The arguments total and leave will be overwritten by MPIRE.
- progress_bar_style (Optional[str]) – The progress bar style to use. Can be one of None, 'std', or 'notebook'
- Return type
- Generator[Any, None, None]
- Returns
- Generator yielding unordered results
- join(keep_alive=False)
- When keep_alive=False: inserts a poison pill, grabs the exit
results, waits until the tasks/results queues are done, and waits until
all workers are finished. When keep_alive=True: inserts a
non-lethal poison pill, and waits until the tasks/results queues are done.
join``and ``stop_and_join are aliases.
- Parameters
- keep_alive (bool) – Whether to keep the workers alive
- Return type
- None
- map(func, iterable_of_args, iterable_len=None, max_tasks_active=None, chunk_size=None, n_splits=None, worker_lifespan=None, progress_bar=False, concatenate_numpy_output=True, worker_init=None, worker_exit=None, task_timeout=None, worker_init_timeout=None, worker_exit_timeout=None, progress_bar_options=None, progress_bar_style=None)
- Same as multiprocessing.map(). Also allows a user to set the maximum number of tasks available in the queue. Note that this function can be slower than the unordered version.
- func (Callable) – Function to call each time new task arguments become available. When passing on the worker ID the function should receive the worker ID as its first argument. If shared objects are provided the function should receive those as the next argument. If the worker state has been enabled it should receive a state variable as the next argument
- iterable_of_args (Union[Sized, Iterable]) – A numpy array or an iterable containing tuples of arguments to pass to a worker, which passes it to the function func
- iterable_len (Optional[int]) – Number of elements in the iterable_of_args. When chunk_size is set to None it needs to know the number of tasks. This can either be provided by implementing the __len__ function on the iterable object, or by specifying the number of tasks
- max_tasks_active (Optional[int]) – Maximum number of active tasks in the queue. If None it will be converted to n_jobs * chunk_size * 2
- chunk_size (Optional[int]) – Number of simultaneous tasks to give to a worker. When None it will use n_splits.
- n_splits (Optional[int]) – Number of splits to use when chunk_size is None. When both chunk_size and n_splits are None, it will use n_splits = n_jobs * 64.
- worker_lifespan (Optional[int]) – Number of tasks a worker can handle before it is restarted. If None, workers will stay alive the entire time. Use this when workers use up too much memory over the course of time
- progress_bar (bool) – When True it will display a progress bar
- concatenate_numpy_output (bool) – When True it will concatenate numpy output to a single numpy array
- worker_init (Optional[Callable]) – Function to call each time a new worker starts. When passing on the worker ID the function should receive the worker ID as its first argument. If shared objects are provided the function should receive those as the next argument. If the worker state has been enabled it should receive a state variable as the next argument
- worker_exit (Optional[Callable]) – Function to call each time a worker exits. Return values will be fetched and made available through mpire.WorkerPool.get_exit_results. When passing on the worker ID the function should receive the worker ID as its first argument. If shared objects are provided the function should receive those as the next argument. If the worker state has been enabled it should receive a state variable as the next argument
- task_timeout (Optional[float]) – Timeout in seconds for a single task. When the timeout is exceeded, MPIRE will raise a TimeoutError. Use None to disable (default). Note: the timeout doesn’t apply to worker_init and worker_exit functions, use worker_init_timeout and worker_exit_timeout for that, respectively
- worker_init_timeout (Optional[float]) – Timeout in seconds for the worker_init function. When the timeout is exceeded, MPIRE will raise a TimeoutError. Use None to disable (default).
- worker_exit_timeout (Optional[float]) – Timeout in seconds for the worker_exit function. When the timeout is exceeded, MPIRE will raise a TimeoutError. Use None to disable (default).
- progress_bar_options (Optional[Dict[str, Any]]) – Dictionary containing keyword arguments to pass to the tqdm progress bar. See tqdm.tqdm() for details. The arguments total and leave will be overwritten by MPIRE.
- progress_bar_style (Optional[str]) – The progress bar style to use. Can be one of None, 'std', or 'notebook'
- Return type
- Any
- Returns
- List with ordered results
- map_unordered(func, iterable_of_args, iterable_len=None, max_tasks_active=None, chunk_size=None, n_splits=None, worker_lifespan=None, progress_bar=False, worker_init=None, worker_exit=None, task_timeout=None, worker_init_timeout=None, worker_exit_timeout=None, progress_bar_options=None, progress_bar_style=None)
- Same as multiprocessing.map(), but unordered. Also allows a user to set the maximum number of tasks available in the queue.
- func (Callable) – Function to call each time new task arguments become available. When passing on the worker ID the function should receive the worker ID as its first argument. If shared objects are provided the function should receive those as the next argument. If the worker state has been enabled it should receive a state variable as the next argument
- iterable_of_args (Union[Sized, Iterable]) – A numpy array or an iterable containing tuples of arguments to pass to a worker, which passes it to the function func
- iterable_len (Optional[int]) – Number of elements in the iterable_of_args. When chunk_size is set to None it needs to know the number of tasks. This can either be provided by implementing the __len__ function on the iterable object, or by specifying the number of tasks
- max_tasks_active (Optional[int]) – Maximum number of active tasks in the queue. If None it will be converted to n_jobs * chunk_size * 2
- chunk_size (Optional[int]) – Number of simultaneous tasks to give to a worker. When None it will use n_splits.
- n_splits (Optional[int]) – Number of splits to use when chunk_size is None. When both chunk_size and n_splits are None, it will use n_splits = n_jobs * 64.
- worker_lifespan (Optional[int]) – Number of tasks a worker can handle before it is restarted. If None, workers will stay alive the entire time. Use this when workers use up too much memory over the course of time
- progress_bar (bool) – When True it will display a progress bar
- worker_init (Optional[Callable]) – Function to call each time a new worker starts. When passing on the worker ID the function should receive the worker ID as its first argument. If shared objects are provided the function should receive those as the next argument. If the worker state has been enabled it should receive a state variable as the next argument
- worker_exit (Optional[Callable]) – Function to call each time a worker exits. Return values will be fetched and made available through mpire.WorkerPool.get_exit_results. When passing on the worker ID the function should receive the worker ID as its first argument. If shared objects are provided the function should receive those as the next argument. If the worker state has been enabled it should receive a state variable as the next argument
- task_timeout (Optional[float]) – Timeout in seconds for a single task. When the timeout is exceeded, MPIRE will raise a TimeoutError. Use None to disable (default). Note: the timeout doesn’t apply to worker_init and worker_exit functions, use worker_init_timeout and worker_exit_timeout for that, respectively
- worker_init_timeout (Optional[float]) – Timeout in seconds for the worker_init function. When the timeout is exceeded, MPIRE will raise a TimeoutError. Use None to disable (default).
- worker_exit_timeout (Optional[float]) – Timeout in seconds for the worker_exit function. When the timeout is exceeded, MPIRE will raise a TimeoutError. Use None to disable (default).
- progress_bar_options (Optional[Dict[str, Any]]) – Dictionary containing keyword arguments to pass to the tqdm progress bar. See tqdm.tqdm() for details. The arguments total and leave will be overwritten by MPIRE.
- progress_bar_style (Optional[str]) – The progress bar style to use. Can be one of None, 'std', or 'notebook'
- Return type
- Any
- Returns
- List with unordered results
- pass_on_worker_id(pass_on=True)
- Set whether to pass on the worker ID to the function to be executed or not (default= False).
- Parameters
- pass_on (bool) – Whether to pass on a worker ID to the target, worker_init, and worker_exit functions. When enabled, functions receive the worker ID depending on other settings. The order is: worker_id, shared_objects, worker_state, and finally the arguments passed on using iterable_of_args
- Return type
- None
- set_keep_alive(keep_alive=True)
- Set whether workers should be kept alive in between consecutive map calls.
- Parameters
- keep_alive (bool) – When True it will keep workers alive after completing a map call, allowing to reuse workers
- Return type
- None
- set_order_tasks(order_tasks=True)
- Set whether to provide tasks to the workers in order, such that worker 0 will get chunk 0, worker 1 will get chunk 1, etc.
- Parameters
- order_tasks (bool) – Whether to provide tasks to the workers in order, such that worker 0 will get chunk 0, worker 1 will get chunk 1, etc.
- Return type
- None
- Set shared objects to pass to the workers.
- Parameters
- shared_objects (Any) – Objects to be passed on as shared objects to the workers once. It will be passed on to the target, worker_init, and worker_exit functions. shared_objects is only passed on when it’s not None. Shared objects will be copy-on-write when using fork as start method. When enabled, functions receive the shared objects depending on other settings. The order is: worker_id, shared_objects, worker_state, and finally the arguments passed on using iterable_of_args`
- Return type
- None
- set_use_worker_state(use_worker_state=True)
- Set whether or not each worker should have its own state variable. Each worker has its own state, so it’s not shared between the workers.
- Parameters
- use_worker_state (bool) – Whether to let a worker have a worker state. The worker state will be passed on to the target, worker_init, and worker_exit functions. When enabled, functions receive the worker state depending on other settings. The order is: worker_id, shared_objects, worker_state, and finally the arguments passed on using iterable_of_args
- Return type
- None
- stop_and_join(keep_alive=False)
- When keep_alive=False: inserts a poison pill, grabs the exit
results, waits until the tasks/results queues are done, and waits until
all workers are finished. When keep_alive=True: inserts a
non-lethal poison pill, and waits until the tasks/results queues are done.
join``and ``stop_and_join are aliases.
- Parameters
- keep_alive (bool) – Whether to keep the workers alive
- Return type
- None
- terminate()
- Tries to do a graceful shutdown of the workers, by interrupting them. In the case processes deadlock it will send a sigkill.
- Return type
- None
AsyncResult¶
- class mpire.async_result.AsyncResult(cache, callback, error_callback, job_id=None, delete_from_cache=True, timeout=None)
- Adapted from multiprocessing.pool.ApplyResult.
- __init__(cache, callback, error_callback, job_id=None, delete_from_cache=True, timeout=None)
- cache (Dict) – Cache for storing intermediate results
- callback (Optional[Callable]) – Callback function to call when the task is finished. The callback function receives the output of the function as its argument
- error_callback (Optional[Callable]) – Callback function to call when the task has failed. The callback function receives the exception as its argument
- job_id (Optional[int]) – Job ID of the task. If None, a new job ID is generated
- delete_from_cache (bool) – If True, the result is deleted from the cache when the task is finished
- timeout (Optional[float]) – Timeout in seconds for a single task. When the timeout is exceeded, MPIRE will raise a TimeoutError. Use None to disable (default)
- __weakref__
- list of weak references to the object
- get(timeout=None)
- Wait until the task is finished and return the output of the function
- Parameters
- timeout (Optional[float]) – Timeout in seconds. If None, wait indefinitely
- Return type
- Any
- Returns
- Output of the function
- Raises
- TimeoutError if the task is not finished within the timeout. When the task has failed, the exception raised by the function is re-raised
- Return type
- bool
- Returns
- Returns True if the task has finished successfully
- Raises
- ValueError if the task is not finished yet
- wait(timeout=None)
- Wait until the task is finished
- Parameters
- timeout (Optional[float]) – Timeout in seconds. If None, wait indefinitely
- Return type
- None
Task chunking¶
- mpire.utils.chunk_tasks(iterable_of_args, iterable_len=None, chunk_size=None, n_splits=None)
- Chunks tasks such that individual workers will receive chunks of tasks rather than individual ones, which can speed up processing drastically.
- iterable_of_args (Iterable) – A numpy array or an iterable containing tuples of arguments to pass to a worker, which passes it to the function
- iterable_len (Optional[int]) – Number of tasks available in iterable_of_args. Only needed when iterable_of_args is a generator
- chunk_size (Union[int, float, None]) – Number of simultaneous tasks to give to a worker. If None, will use n_splits to determine the chunk size
- n_splits (Optional[int]) – Number of splits to use when chunk_size is None
- Return type
- Generator[Collection, None, None]
- Returns
- Generator of chunked task arguments
Converting iterable of arguments¶
- mpire.utils.make_single_arguments(iterable_of_args, generator=True)
- Converts an iterable of single arguments to an iterable of single argument tuples
- iterable_of_args (Iterable) – A numpy array or an iterable containing tuples of arguments to pass to a worker, which passes it to the function
- generator (bool) – Whether or not to return a generator, otherwise a materialized list will be returned
- Return type
- Union[List, Generator]
- Returns
- Iterable of single argument tuples
Dashboard¶
- mpire.dashboard.start_dashboard(port_range=range(8080, 8100))
- Starts a new MPIRE dashboard
- Parameters
- port_range (Sequence) – Port range to try.
- Return type
- Dict[str, Union[str, int]]
- Returns
- A dictionary containing the dashboard port number and manager host and port number being used
- mpire.dashboard.connect_to_dashboard(manager_port_nr, manager_host=None)
- Connects to an existing MPIRE dashboard
- manager_port_nr (int) – Port to use when connecting to a manager
- manager_host (Union[bytes, str, None]) – Host to use when connecting to a manager. If None it will use localhost
- Return type
- None
- mpire.dashboard.get_stacklevel()
- Gets the stack level to use when obtaining function details (used for the dashboard)
- Return type
- int
- Returns
- Stack level
- mpire.dashboard.set_stacklevel(stacklevel)
- Sets the stack level to use when obtaining function details (used for the dashboard)
- Parameters
- stacklevel (int) – Stack level
- Return type
- None
Other¶
- mpire.cpu_count()
- Returns the number of CPUs in the system
Contribution guidelines¶
If you want to contribute to MPIRE, great! Please follow the steps below to ensure a smooth process:
- 1.
- Clone the project.
- 2.
- Create a new branch for your feature or bug fix. Give you branch a meaningful name.
- 3.
- Make your feature addition or bug fix.
- 4.
- Add tests for it and test it yourself. Make sure it both works for Unix and Windows based systems, or make sure to document why it doesn’t work for one of the platforms.
- 5.
- Add documentation for it. Don’t forget about the changelog:
- Reference the issue number from GitHub in the changelog, if applicable (see current changelog for examples).
- Don’t mention a date or a version number here, but use Unreleased instead.
- 6.
- Commit with a meaningful commit message (e.g. the changelog).
- 7.
- Open a pull request.
- 8.
- Resolve any issues or comments by the reviewer.
- 9.
- Merge PR by squashing all your individual commits.
Making a release¶
A release is only made by the project maintainer. The following steps are required:
- 1.
- Update the changelog with the release date and version number. Version numbers follow the Semantic Versioning guidelines
- 2.
- Update the version number in setup.py and docs/conf.py.
- 3.
- Commit and push the changes.
- 4.
- Make sure the tests pass on GitHub Actions.
- 5.
- Create a tag for the release by using git tag -a vX.Y.Z -m "vX.Y.Z".
- 6.
- Push the tag to GitHub by using git push origin vX.Y.Z.
Changelog¶
2.10.2¶
(2024-05-07)
- Function details in progress_bar.py are only obtained when the dashboard is running (#128)
- Obtaining the user name is now put in a try-except block to prevent MPIRE from crashing when the user name cannot be obtained. which can happen when running in a container as a non-root user (#128)
2.10.1¶
(2024-03-19)
- Fixed a bug in the timeout handler where the cache dictionary could be changed during iteration (#123)
- Fixed an authentication error when using a progress bar or insights in a spawn or forkserver context when using dill (#124)
2.10.0¶
(2024-02-19)
- •
- Added support for macOS (#27, #79, #91)
- Fixes memory leaks on macOS
- Reduced the amount of semaphores used
- Issues a warning when cpu_ids is used on macOS
- Added mpire.dashboard.set_stacklevel() to set the stack level in the dashboard. This influences what line to display in the ‘Invoked on line’ section. (#118)
- Use function details from the __call__ method on the dashboard in case the callable being executed is a class instance (#117)
- Use (global) average rate for the estimate on the dashboard when smoothing=0 (#117)
- Make it possible to reuse the same progress_bar_options without raising warnings (#117)
- Removed deprecated progress_bar_position parameter from the map functions. Use progress_bar_options[‘position’] instead (added since v2.6.0)
2.9.0¶
(2024-01-08)
- Added support for the rich progress bar style (#96)
- Added the option to only show progress on the dashboard. (#107)
- Progress bars are now supported on Windows when using threading as start method.
- Insights now also work when using the forkserver and spawn start methods. (#104)
- When using insights on Windows the arguments of the top 5 longest tasks are now available as well.
- Fixed deprecated escape import from flask by importing directly from markupsafe. (#106)
- Fixed mpire.dashboard.start_dashboard() freeze when there are no two ports available. (#112)
- Added mpire.dashboard.shutdown_dashboard() to shutdown the dashboard.
- Added py.typed file to prompt mypy for type checking. (#108)
2.8.1¶
(2023-11-08)
- Excluded the tests folder from MPIRE distributions (#89)
- Added a workaround for semaphore leakage on macOS and fixed a bug when working in a fork context while the system default is spawn (#92)
- Fix progressbar percentage on dashboard (#101)
- Fixed a bug where starting multiple apply_async tasks with a task timeout didn’t interrupt all tasks when the timeout was reached (#98)
- Add testing python 3.12 to workflow and drop 3.6 and 3.7 (#102)
2.8.0¶
(2023-08-16)
- •
- Added support for Python 3.11 (#67)
2.7.1¶
(2023-04-14)
- •
- Transfered ownership of the project from Slimmer AI to sybrenjansen
2.7.0¶
(2023-03-17)
- Added the mpire.WorkerPool.apply() and mpire.WorkerPool.apply_async() functions (#63)
- When inside a Jupyter notebook, the progress bar will not automatically switch to a widget anymore. tqdm cannot always determine with certainty that someone is in a notebook or, e.g., a Jupyter console. Another reason is to avoid the many errors people get when having widgets or javascript disabled. See Progress bar style for changing the progress bar to a widget (#71)
- The mpire.dashboard.connect_to_dashboard() function now raises a ConnectionRefused error when the dashboard isn’t running, instead of silently failing and deadlocking the next map call with a progress bar (#68)
- Added support for a progress bar without knowing the size of the iterable. It used to disable the progress bar when the size was unknown
- Changed how max_tasks_active is handled. It now applies to the number of tasks that are currently being processed, instead of the number of chunks of tasks, as you would expect from the name. Previously, when the chunk size was set to anything other than 1, the number of active tasks could be higher than max_tasks_active
- Updated some exception messages and docs (#69)
- Changed how worker results, restarts, timeouts, unexpected deaths, and exceptions are handled. They are now handled by individual threads such that the main thread is more responsive. The API is the same, so no user changes are needed
- Mixing multiple map calls now raises an error (see Mixing map functions)
- Fixed a bug where calling a map function with a progress bar multiple times in a row didn’t display the progress bar correctly
- Fixed a bug where the dashboard didn’t show an error when an exit function raised an exception
2.6.0¶
(2022-08-29)
- Added Python 3.10 support
- The tqdm progress bar can now be customized using the progress_bar_options parameter in the map functions (#57)
- Using progress_bar_position from a map function is now deprecated and will be removed in MPIRE v2.10.0. Use progress_bar_options['position'] instead
- Deprecated enable_insights from a map function, use enable_insights in the WorkerPool constructor instead
- Fixed a bug where a worker could exit before an exception was entirely sent over the queue, causing a deadlock (#56)
- Fixed a bug where exceptions with init arguments weren’t handled correctly (#58)
- Fixed a rare and weird bug in Windows that could cause a deadlock (probably fixes #55)
2.5.0¶
(2022-07-25)
- Added the option to fix the order of tasks given to the workers (#46)
- Fixed a bug where updated WorkerPool parameters aren’t used in subsequent map calls when keep_alive is enabled
2.4.0¶
(2022-05-25)
- A timeout for the target, worker_init, and worker_exit functions can be specified after which a worker is stopped (#36)
- A WorkerPool can now be started within a thread which isn’t the main thread (#44)
2.3.5¶
(2022-04-25)
- MPIRE now handles defunct child processes properly, instead of deadlocking (#34)
- Added benchmark highlights to README (#38)
2.3.4¶
(2022-03-29)
- Platform specific dependencies are now handled using environment markers as defined in PEP-508 (#30)
- Fixes hanging WorkerPool when using worker_lifespan and returning results that exceed the pipe capacity (#32)
- Fixes insights unit tests that could sometime fail because it was too fast
2.3.3¶
(2021-11-29)
- Changed progress bar handler process to thread, making it more stable (especially in notebooks)
- Changed progress bar tasks completed queue to array, to make it more responsive and faster
- Disabled the tqdm monitor thread which, in combination with MPIRE’s own tqdm lock, could result in deadlocks
2.3.2¶
(2021-11-19)
- •
- Included license file in source distribution (#25)
2.3.1¶
(2021-11-16)
- •
- Made connecting to the tqdm manager more robust (#23)
2.3.0¶
(2021-10-15)
- Fixed progress bar in a particular setting with iPython and django installed (#13)
- keep_alive now works even when the function to be called or any other parameter passed to the map function is changed (#15)
- Moved enable_insights to the WorkerPool constructor. Using enable_insights from a map function is now deprecated and will be removed in MPIRE v2.6.0.
- Restructured docs and updated several sections for Windows users.
2.2.1¶
(2021-08-31)
- •
- Fixed compatibility with newer tqdm versions (>= 4.62.2) (#11)
2.2.0¶
(2021-08-30)
- •
- Added support for Windows (#6, #7). Support has a few caveats:
- When using worker insights the arguments of the top 5 longest tasks are not available
- Progress bar is not supported when using threading as start method
- When using dill and an exception occurs, or when the exception occurs in an exit function, it can print additional OSError messages in the terminal, but these can be safely ignored.
2.1.1¶
(2021-08-26)
- Fixed a bug with newer versions of tqdm. The progress bar would throw an AttributeError when connected to a dashboard.
- README and documentation updated
2.1.0¶
(2021-08-06)
- Workers now have their own task queue, which speeds up tasks with bigger payloads
- Fixed progress bar showing error information when completed without error
- Fixed progress bar and worker insights not displaying properly when using threading
- Progress bar handling improved accross several scenarios
- Dashboard can now handle progress bars when using spawn or forkserver as start method
- Added closing of multiprocessing.JoinableQueue objects, to clean up intermediate junk
- Removed numpy dependency
- Made dill optional again. In many cases it slows processing down
2.0.0¶
(2021-07-07)
- Worker insights added, providing users insight in multiprocessing efficiency
- worker_init and worker_exit parameters added to each map function
- max_active_tasks is now set to n_jobs * 2 when max_active_tasks=None, to speed up most jobs
- n_splits is now set to n_jobs * 64 when both chunk_size and n_splits are None
- Dashboard ports can now be configured
- Renamed func_pointer to func in each map function
- Fixed a bug with the threading backend not terminating correctly
- Fixed a bug with the progress bar not showing correctly in notebooks
- Using multiprocess is now the default
- Added some debug logging
- Refactored a lot of code
- Minor bug fixes, which should make things more stable.
- Removed Python 3.5 support
- Removed add_task, get_result, insert_poison_pill, stop_workers, and join functions from mpire.WorkerPool. Made start_workers private. There wasn’t any reason to use these functions.
1.2.2¶
(2021-04-23)
- •
- Updated documentation CSS which fixes bullet lists not showing properly
1.2.1¶
(2021-04-22)
- Updated some unittests and fixed some linting issues
- Minor improvements in documentation
1.2.0¶
(2021-04-22)
- Workers can be kept alive in between consecutive map calls
- Setting CPU affinity is no longer restricted to Linux platforms
- README updated to use RST format for better compatibility with PyPI
- Added classifiers to the setup file
1.1.3¶
(2020-09-03)
- •
- First public release on Github and PyPi
1.1.2¶
(2020-08-27)
- Added missing typing information
- Updated some docstrings
- Added license
1.1.1¶
(2020-02-19)
- •
- Changed collections.Iterable to collections.abc.Iterable due to deprecation of the former
1.1.0¶
(2019-10-31)
- Removed custom progress bar support to fix Jupyter notebook support
- New progress_bar_position parameter is now available to set the position of the progress bar when using nested worker pools
- Screen resizing is now supported when using a progress bar
1.0.0¶
(2019-10-29)
- Added the MPIRE dashboard
- Added threading as a possible backend
- Progress bar handling now occurs in a separate process, instead of a thread, to improve responsiveness
- Refactoring of code and small bug fixes in error handling
- Removed deprecated functionality
0.9.0¶
(2019-03-11)
- Added support for using different start methods (‘spawn’ and ‘forkserver’) instead of only the default method ‘fork’
- Added optional support for using dill in multiprocessing by utilizing the multiprocess library
- The mpire.Worker class is no longer directly available
0.8.1¶
(2019-02-06)
- •
- Fixed bug when process would hang when progress bar was set to True and an empty iterable was provided
0.8.0¶
(2018-11-01)
- Added support for worker state
- Chunking numpy arrays is now done using numpy slicing
- mpire.WorkerPool.map() now supports automatic concatenation of numpy array output
0.7.2¶
(2018-06-14)
- •
- Small bug fix when not passing on a boolean or tqdm object for the progress_bar parameter
0.7.1¶
(2017-12-20)
- You can now pass on a dictionary as an argument which will be unpacked accordingly using the **-operator.
- New function mpire.utils.make_single_arguments() added which allows you to create an iterable of single argument tuples out of an iterable of single arguments
0.7.0¶
(2017-12-11)
- mpire.utils.chunk_tasks() is now available as a public function
- Chunking in above function and map functions now accept a n_splits parameter
- iterable_of_args in map functions can now contain single values instead of only iterables
- tqdm is now available from the MPIRE package which automatically switches to the Jupyter/IPython notebook widget when available
- Small bugfix in cleaning up a worker pool when no map function was called
0.6.2¶
(2017-11-07)
- •
- Fixed a second bug where the main process could get unresponsive when an exception was raised
0.6.1¶
(2017-11-06)
- Fixed bug where sometimes exceptions fail to pickle
- Fixed a bug where the main process could get unresponsive when an exception was raised
- Child processes are now cleaned up in parallel when an exception was raised
0.6.0¶
(2017-11-03)
- restart_workers parameter is now deprecated and will be removed from v1.0.0
- Progress bar functionality added (using tqdm)
- Improved error handling in user provided functions
- Fixed randomly occurring BrokenPipeErrors and deadlocks
0.5.1¶
(2017-10-12)
- •
- Child processes can now also be pinned to a range of CPUs, instead of only a single one. You can also specify a single CPU or range of CPUs that have to be shared between all child processes
0.5.0¶
(2017-10-06)
- Added CPU pinning.
- Default number of processes to spawn when using n_jobs=None is now set to the number of CPUs available, instead of cpu_count() - 1
0.4.0¶
(2017-10-05)
- •
- Workers can now be started as normal child processes (non-deamon) such that nested mpire.WorkerPool s are possible
0.3.0¶
(2017-09-15)
- The worker ID can now be passed on the function to be executed by using the mpire.WorkerPool.pass_on_worker_id() function
- Removed the use of has_return_value_with_shared_objects when using mpire.WorkerPool.set_shared_objects(). MPIRE now handles both cases out of the box
0.2.0¶
(2017-06-27)
- •
- Added docs
0.1.0¶
First release
AUTHOR¶
Sybren Jansen
COPYRIGHT¶
2025, Sybren Jansen
April 14, 2025 | 2.10.2 |