Cluster API

IPython Parallel 7 adds a Cluster API for starting/stopping clusters.

This is the new implementation of ipcluster, which can be more easily re-used in Python programs. The ipcluster script is

Controllers and Engines are started with “Launchers”, which are objects representing a running process.

Each Cluster has:

  • a cluster id

  • a profile directory

  • one controller

  • zero or more engine sets

  • each of which has one or more engines

The combination of profile_dir and cluster_id uniquely identifies a cluster. You can have many clusters in one profile, but each must have a distinct cluster id.

To create a cluster, instantiate a Cluster object:

[5]:
import ipyparallel as ipp

cluster = ipp.Cluster()
cluster
[5]:
<Cluster(cluster_id='touchy-1624880089-y1md', profile='default')>

To start the cluster:

[2]:
await cluster.start_controller()
cluster
[2]:
<Cluster(cluster_id='touchy-1623757384-cpbt', profile='default', controller=<running>)>
[3]:
engine_set_id = await cluster.start_engines(n=4)
cluster
Starting 4 engines with <class 'ipyparallel.cluster.launcher.LocalEngineSetLauncher'>
[3]:
<Cluster(cluster_id='touchy-1623757384-cpbt', profile='default', controller=<running>, engine_sets=['1623757384-b3pm'])>

As you can see, all methods on the Cluster object are async by default. Every async method also has a _sync variant, if you don’t want to / can’t use asyncio.

[4]:
engine_set_2 = cluster.start_engines_sync(n=2)
engine_set_2
Starting 2 engines with <class 'ipyparallel.cluster.launcher.LocalEngineSetLauncher'>
[4]:
'1623757385-pe8h'

At this point, we have a cluster with a controller and six engines in two groups.

There is also a start_cluster method that starts the controller and one engine set, for convenience:

engine_set_id = await cluster.start_cluster(n=4)

We can get a client object connected to the cluster with connect_client()

[5]:
rc = await cluster.connect_client()
rc.wait_for_engines(6)
rc.ids
[5]:
[0, 1, 2, 3, 4, 5]

And we can use our classic apply_async(...).get_dict() pattern to get a dict by engine id of hostname, pid for each engine:

[6]:
def identify():
    import os
    import socket

    return {"host": socket.gethostname(), "pid": os.getpid()}


rc[:].apply_async(identify).get_dict()
[6]:
{0: {'host': 'touchy', 'pid': 24774},
 1: {'host': 'touchy', 'pid': 24775},
 2: {'host': 'touchy', 'pid': 24776},
 3: {'host': 'touchy', 'pid': 24762},
 4: {'host': 'touchy', 'pid': 24769},
 5: {'host': 'touchy', 'pid': 24773}}

We can send signals to engine sets by id

(sending signals to just one engine is still a work in progress)

[7]:
import signal
import time

ar = rc[:].apply_async(time.sleep, 100)
[8]:
# oops! I meant 1!

await cluster.signal_engines(signal.SIGINT)
ar.get()
Sending signal 2 to engine(s) 1623757384-b3pm
Sending signal 2 to engine(s) 1623757385-pe8h
[0:apply]:
---------------------------------------------------------------------------KeyboardInterrupt                         Traceback (most recent call last)<string> in <module>
KeyboardInterrupt:

[1:apply]:
---------------------------------------------------------------------------KeyboardInterrupt                         Traceback (most recent call last)<string> in <module>
KeyboardInterrupt:

[2:apply]:
---------------------------------------------------------------------------KeyboardInterrupt                         Traceback (most recent call last)<string> in <module>
KeyboardInterrupt:

[3:apply]:
---------------------------------------------------------------------------KeyboardInterrupt                         Traceback (most recent call last)<string> in <module>
KeyboardInterrupt:

... 2 more exceptions ...

Now it’s time to cleanup. Every start_ method has a correspinding stop_method.

We can stop one engine set at a time with stop_engines:

[9]:
await cluster.stop_engines(engine_set_2)
Stopping engine(s): 1623757385-pe8h

Or stop the whole cluster

[10]:
await cluster.stop_cluster()
Stopping engine(s): 1623757384-b3pm
Stopping controller
Controller stopped: {'exit_code': 0, 'pid': 24758}

Cluster as a context manager

Cluster can also be used as a Context manager, in which case:

  1. entering the context manager starts the cluster

  2. the as returns a connected client

  3. the context is only entered when all the engines are fully registered and available

  4. when the context exits, the cluster is torn down

This makes it a lot easier to scope an IPython cluster for the duration of a computation and ensure that it is cleaned up when you are done.

[11]:
import os

with Cluster(n=4) as rc:
    engine_pids = rc[:].apply_async(os.getpid).get_dict()
engine_pids
Starting 4 engines with <class 'ipyparallel.cluster.launcher.LocalEngineSetLauncher'>
Stopping engine(s): 1623757397-ng0s
Stopping controller
[11]:
{0: 24989, 1: 24991, 2: 24990, 3: 24992}

It can also be async

[12]:
async with Cluster(n=2) as rc:
    engine_pids = rc[:].apply_async(os.getpid).get_dict()
engine_pids
Starting 2 engines with <class 'ipyparallel.cluster.launcher.LocalEngineSetLauncher'>
Controller stopped: {'exit_code': 0, 'pid': 24988}
Stopping engine(s): 1623757400-5fq1
Stopping controller
[12]:
{0: 25058, 1: 25059}

Launcher classes

IPython’s mechanism for launching controllers and engines is called Launchers. These are in ipyparallel.cluster.launcher.

There are two kinds of Launcher:

  • ControllerLauncher, which starts a controller

  • EngineSetLauncher, which starts n engines

You can use abbreviations to access the launchers that ship with IPython parallel, such as ‘MPI’, ‘Local’, or ‘SGE’, or you can pass classes themselves (or their import strings, such as ‘mymodule.MyEngineSetLauncher’).

I’m going to start a cluster with engines using MPI:

[13]:
import os
os.environ["OMPI_MCA_rmaps_base_oversubscribe"] = "1"

cluster = Cluster(n=4, engine_launcher_class='MPI')
await cluster.start_cluster()
rc = await cluster.connect_client()
Controller stopped: {'exit_code': 0, 'pid': 25057}
Starting 4 engines with <class 'ipyparallel.cluster.launcher.MPIEngineSetLauncher'>
[14]:
rc.wait_for_engines(4)
rc.ids
[14]:
[0, 1, 2, 3]

Now I’m going to run a test with another new feature

[15]:
def uhoh():
    import time
    from mpi4py import MPI

    rank = MPI.COMM_WORLD.rank
    if rank == 0:
        print("rank 0: oh no.")
        1 / 0
    print(f"rank {rank}: barrier")
    MPI.COMM_WORLD.barrier()


ar = rc[:].apply_async(uhoh)
ar.get(timeout=2)
---------------------------------------------------------------------------
TimeoutError                              Traceback (most recent call last)
/var/folders/qr/3vxfnp1x2t1fw55dr288mphc0000gn/T/ipykernel_24747/824703262.py in <module>
     12
     13 ar = rc[:].apply_async(uhoh)
---> 14 ar.get(timeout=2)

~/dev/ip/parallel/ipyparallel/client/asyncresult.py in get(self, timeout)
    227                 raise self.exception()
    228         else:
--> 229             raise error.TimeoutError("Result not ready.")
    230
    231     def _check_ready(self):

TimeoutError: Result not ready.

Uh oh! We are stuck in barrier because engine 0 failed.

Let’s try interrupting and getting the errors:

[ ]:
import signal
await cluster.signal_engines(signal.SIGINT)
ar.get(timeout=2)

It didn’t work! This is because MPI.barrier isn’t actually interruptible 😢.

We are going to have to resort to more drastic measures, and restart the engines:

[16]:
await cluster.restart_engines()
Stopping engine(s): 1623757404-oexv
Starting 4 engines with <class 'ipyparallel.cluster.launcher.MPIEngineSetLauncher'>
[17]:
rc.wait_for_engines(4)
rc.ids
[17]:
[0, 1, 2, 3]
engine set stopped 1623757404-oexv: {'exit_code': -9, 'pid': 25078}

We are now back to having 4 responsive engines. Their IPP engine id may have changed, but I can get back to using them.

[19]:
def get_rank():
    from mpi4py import MPI

    return MPI.COMM_WORLD.rank

rank_map = rc[:].apply_async(get_rank).get_dict()
rank_map
[19]:
{4: 0, 5: 2, 6: 3, 7: 1}

Finally, clean everything up

[20]:
await cluster.stop_cluster()
Stopping engine(s): 1623757404-oexv
Stopping controller
Controller stopped: {'exit_code': 0, 'pid': 25076}
engine set stopped 1623757404-oexv: {'exit_code': -9, 'pid': 25154}

Connecting to existing clusters

a Cluster object writes its state to disk, in a file accessible as cluster.cluster_file. By default, this willb e $PROFILE_DIR/security/ipcluster-$cluster-id.json.

Cluster objects can load state from a dictionary with Cluster.from_dict(d) or from a JSON file containing that information with Cluster.from_file().

The default arguments for from_file are to use the current IPython profile (default: ‘default’) and empty cluster id, so if you start a cluster with ipcluster start, you can connect to it immediately with

cluster = ipp.Cluster.from_file()
[2]:
import ipyparallel as ipp
cluster = ipp.Cluster.from_file()
[3]:
cluster
[3]:
<Cluster(cluster_id='', profile='default', controller=<running>, engine_sets=['1624884556-z9qr'])>

ipp.ClusterManager provides an API for collecting/discovering/loading all the clusters on your system.

By default, it finds loads clusters in all your IPython profiles, but can be confined to one profile or use explicit profile directories.

[6]:
clusters = ipp.ClusterManager().load_clusters()
[8]:
clusters
[8]:
{'mpi:abc-123': <Cluster(cluster_id='abc-123', profile='mpi', controller=<running>, engine_sets=['1624884663-euj7'])>,
 'default:': <Cluster(cluster_id='', profile='default', controller=<running>, engine_sets=['1624884556-z9qr'])>}

This is the class that powers the new ipcluster list

[10]:
!ipcluster list
PROFILE          CLUSTER ID                       RUNNING ENGINES LAUNCHER
default          ''                               True          4 Local
mpi              abc-123                          True          4 MPI
[11]:
!ipcluster stop --profile mpi --cluster-id abc-123
2021-06-28 14:53:00.591 [IPClusterStop] Stopping engine(s): 1624884663-euj7
2021-06-28 14:53:00.592 [IPClusterStop] Stopping controller
[12]:
!ipcluster list
PROFILE          CLUSTER ID                       RUNNING ENGINES LAUNCHER
default          ''                               True          4 Local

The same operation can be done from the Python API:

[13]:
cluster = ipp.Cluster.from_file(profile="default", cluster_id="")
await cluster.stop_cluster()
Stopping engine(s): 1624884556-z9qr
Stopping controller
[14]:
!ipcluster list
PROFILE          CLUSTER ID                       RUNNING ENGINES LAUNCHER