Cluster API#
IPython Parallel 7 adds a Cluster
API for starting/stopping clusters.
This is the new implementation of ipcluster
,
which can be more easily re-used in Python programs.
The ipcluster
script is
Controllers and Engines are started with “Launchers”, which are objects representing a running process.
Each Cluster has:
a cluster id
a profile directory
one controller
zero or more engine sets
each of which has one or more engines
The combination of profile_dir
and cluster_id
uniquely identifies a cluster.
You can have many clusters in one profile, but each must have a distinct cluster id.
To create a cluster, instantiate a Cluster object:
import ipyparallel as ipp
cluster = ipp.Cluster()
cluster
<Cluster(cluster_id='touchy-1624880089-y1md', profile='default')>
To start the cluster:
await cluster.start_controller()
cluster
<Cluster(cluster_id='touchy-1623757384-cpbt', profile='default', controller=<running>)>
engine_set_id = await cluster.start_engines(n=4)
cluster
Starting 4 engines with <class 'ipyparallel.cluster.launcher.LocalEngineSetLauncher'>
<Cluster(cluster_id='touchy-1623757384-cpbt', profile='default', controller=<running>, engine_sets=['1623757384-b3pm'])>
As you can see, all methods on the Cluster object are async by default.
Every async method also has a _sync
variant, if you don’t want to / can’t use asyncio.
engine_set_2 = cluster.start_engines_sync(n=2)
engine_set_2
Starting 2 engines with <class 'ipyparallel.cluster.launcher.LocalEngineSetLauncher'>
'1623757385-pe8h'
At this point, we have a cluster with a controller and six engines in two groups.
There is also a start_cluster
method that starts the controller and one engine set, for convenience:
engine_set_id = await cluster.start_cluster(n=4)
We can get a client object connected to the cluster with connect_client()
rc = await cluster.connect_client()
rc.wait_for_engines(6)
rc.ids
[0, 1, 2, 3, 4, 5]
And we can use our classic apply_async(...).get_dict()
pattern to get a dict by engine id of hostname, pid for each engine:
def identify():
import os
import socket
return {"host": socket.gethostname(), "pid": os.getpid()}
rc[:].apply_async(identify).get_dict()
{0: {'host': 'touchy', 'pid': 24774},
1: {'host': 'touchy', 'pid': 24775},
2: {'host': 'touchy', 'pid': 24776},
3: {'host': 'touchy', 'pid': 24762},
4: {'host': 'touchy', 'pid': 24769},
5: {'host': 'touchy', 'pid': 24773}}
We can send signals to engine sets by id
(sending signals to just one engine is still a work in progress)
import signal
import time
ar = rc[:].apply_async(time.sleep, 100)
# oops! I meant 1!
await cluster.signal_engines(signal.SIGINT)
ar.get()
Sending signal 2 to engine(s) 1623757384-b3pm
Sending signal 2 to engine(s) 1623757385-pe8h
[0:apply]:
---------------------------------------------------------------------------KeyboardInterrupt Traceback (most recent call last)<string> in <module>
KeyboardInterrupt:
[1:apply]:
---------------------------------------------------------------------------KeyboardInterrupt Traceback (most recent call last)<string> in <module>
KeyboardInterrupt:
[2:apply]:
---------------------------------------------------------------------------KeyboardInterrupt Traceback (most recent call last)<string> in <module>
KeyboardInterrupt:
[3:apply]:
---------------------------------------------------------------------------KeyboardInterrupt Traceback (most recent call last)<string> in <module>
KeyboardInterrupt:
... 2 more exceptions ...
Now it’s time to cleanup. Every start_
method has a correspinding stop_method
.
We can stop one engine set at a time with stop_engines
:
await cluster.stop_engines(engine_set_2)
Stopping engine(s): 1623757385-pe8h
Or stop the whole cluster
await cluster.stop_cluster()
Stopping engine(s): 1623757384-b3pm
Stopping controller
Controller stopped: {'exit_code': 0, 'pid': 24758}
Cluster as a context manager#
Cluster can also be used as a Context manager, in which case:
entering the context manager starts the cluster
the
as
returns a connected clientthe context is only entered when all the engines are fully registered and available
when the context exits, the cluster is torn down
This makes it a lot easier to scope an IPython cluster for the duration of a computation and ensure that it is cleaned up when you are done.
import os
with Cluster(n=4) as rc:
engine_pids = rc[:].apply_async(os.getpid).get_dict()
engine_pids
Starting 4 engines with <class 'ipyparallel.cluster.launcher.LocalEngineSetLauncher'>
Stopping engine(s): 1623757397-ng0s
Stopping controller
{0: 24989, 1: 24991, 2: 24990, 3: 24992}
It can also be async
async with Cluster(n=2) as rc:
engine_pids = rc[:].apply_async(os.getpid).get_dict()
engine_pids
Starting 2 engines with <class 'ipyparallel.cluster.launcher.LocalEngineSetLauncher'>
Controller stopped: {'exit_code': 0, 'pid': 24988}
Stopping engine(s): 1623757400-5fq1
Stopping controller
{0: 25058, 1: 25059}
Launcher classes#
IPython’s mechanism for launching controllers and engines is called Launchers
.
These are in ipyparallel.cluster.launcher
.
There are two kinds of Launcher:
ControllerLauncher, which starts a controller
EngineSetLauncher, which starts
n
engines
You can use abbreviations to access the launchers that ship with IPython parallel, such as ‘MPI’, ‘Local’, or ‘SGE’, or you can pass classes themselves (or their import strings, such as ‘mymodule.MyEngineSetLauncher’).
I’m going to start a cluster with engines using MPI:
import os
os.environ["OMPI_MCA_rmaps_base_oversubscribe"] = "1"
cluster = Cluster(n=4, engines='MPI')
await cluster.start_cluster()
rc = await cluster.connect_client()
Controller stopped: {'exit_code': 0, 'pid': 25057}
Starting 4 engines with <class 'ipyparallel.cluster.launcher.MPIEngineSetLauncher'>
rc.wait_for_engines(4)
rc.ids
[0, 1, 2, 3]
Now I’m going to run a test with another new feature
def uhoh():
import time
from mpi4py import MPI
rank = MPI.COMM_WORLD.rank
if rank == 0:
print("rank 0: oh no.")
1 / 0
print(f"rank {rank}: barrier")
MPI.COMM_WORLD.barrier()
ar = rc[:].apply_async(uhoh)
ar.get(timeout=2)
---------------------------------------------------------------------------
TimeoutError Traceback (most recent call last)
/var/folders/qr/3vxfnp1x2t1fw55dr288mphc0000gn/T/ipykernel_24747/824703262.py in <module>
12
13 ar = rc[:].apply_async(uhoh)
---> 14 ar.get(timeout=2)
~/dev/ip/parallel/ipyparallel/client/asyncresult.py in get(self, timeout)
227 raise self.exception()
228 else:
--> 229 raise error.TimeoutError("Result not ready.")
230
231 def _check_ready(self):
TimeoutError: Result not ready.
Uh oh! We are stuck in barrier because engine 0 failed.
Let’s try interrupting and getting the errors:
import signal
await cluster.signal_engines(signal.SIGINT)
ar.get(timeout=2)
It didn’t work! This is because MPI.barrier isn’t actually interruptible 😢.
We are going to have to resort to more drastic measures, and restart the engines:
await cluster.restart_engines()
Stopping engine(s): 1623757404-oexv
Starting 4 engines with <class 'ipyparallel.cluster.launcher.MPIEngineSetLauncher'>
rc.wait_for_engines(4)
rc.ids
[0, 1, 2, 3]
engine set stopped 1623757404-oexv: {'exit_code': -9, 'pid': 25078}
We are now back to having 4 responsive engines. Their IPP engine id may have changed, but I can get back to using them.
def get_rank():
from mpi4py import MPI
return MPI.COMM_WORLD.rank
rank_map = rc[:].apply_async(get_rank).get_dict()
rank_map
{4: 0, 5: 2, 6: 3, 7: 1}
Finally, clean everything up
await cluster.stop_cluster()
Stopping engine(s): 1623757404-oexv
Stopping controller
Controller stopped: {'exit_code': 0, 'pid': 25076}
engine set stopped 1623757404-oexv: {'exit_code': -9, 'pid': 25154}
Connecting to existing clusters#
a Cluster object writes its state to disk,
in a file accessible as cluster.cluster_file
.
By default, this willb e $PROFILE_DIR/security/ipcluster-$cluster-id.json
.
Cluster objects can load state from a dictionary with Cluster.from_dict(d)
or from a JSON file containing that information with Cluster.from_file()
.
The default arguments for from_file
are to use the current IPython profile (default: ‘default’)
and empty cluster id,
so if you start a cluster with ipcluster start
, you can connect to it immediately with
cluster = ipp.Cluster.from_file()
import ipyparallel as ipp
cluster = ipp.Cluster.from_file()
cluster
<Cluster(cluster_id='', profile='default', controller=<running>, engine_sets=['1624884556-z9qr'])>
ipp.ClusterManager
provides an API for collecting/discovering/loading all the clusters on your system.
By default, it finds loads clusters in all your IPython profiles, but can be confined to one profile or use explicit profile directories.
clusters = ipp.ClusterManager().load_clusters()
clusters
{'mpi:abc-123': <Cluster(cluster_id='abc-123', profile='mpi', controller=<running>, engine_sets=['1624884663-euj7'])>,
'default:': <Cluster(cluster_id='', profile='default', controller=<running>, engine_sets=['1624884556-z9qr'])>}
This is the class that powers the new ipcluster list
!ipcluster list
PROFILE CLUSTER ID RUNNING ENGINES LAUNCHER
default '' True 4 Local
mpi abc-123 True 4 MPI
!ipcluster stop --profile mpi --cluster-id abc-123
2021-06-28 14:53:00.591 [IPClusterStop] Stopping engine(s): 1624884663-euj7
2021-06-28 14:53:00.592 [IPClusterStop] Stopping controller
!ipcluster list
PROFILE CLUSTER ID RUNNING ENGINES LAUNCHER
default '' True 4 Local
The same operation can be done from the Python API:
cluster = ipp.Cluster.from_file(profile="default", cluster_id="")
await cluster.stop_cluster()
Stopping engine(s): 1624884556-z9qr
Stopping controller
!ipcluster list
PROFILE CLUSTER ID RUNNING ENGINES LAUNCHER