Launchers#

The Launcher is the basic abstraction in IPython Parallel for starting and stopping processes.

A Launcher has two primary methods: start() and stop(), which should be async def coroutines.

There are two basic kinds of Launcher: ControllerLauncher and EngineLauncher. A ControllerLauncher should launch ipcontroller somewhere, and an EngineLauncher should start n engines somewhere. Shared configuration, principally profile_dir and cluster_id are typically used to locate the connection files necessary for these two communicate, though explicit paths can be added to arguments.

Launchers are used through the Cluster API, which manages one ControllerLauncher and zero to many EngineLaunchers, each representing a set of engines.

Launchers are registered via entry points (more below), and can be selected via short lowercase string naming the kind of launcher, e.g. ‘mpi’ or ‘local’:

import ipyparallel as ipp
c = ipp.Cluster(engines="mpi")

For the most part, Launchers are not interacted-with directly, but can be configured.

If you generate a config file with:

ipython profile create --parallel

you can check out the resulting ipcluster_config.py, which includes configuration options for all available Launcher classes.

You can also check ipcluster start --help-all to see them on the command-line.

Debugging launchers#

If a launcher isn’t doing what you want, the first thing to do is probably start your Cluster with log_level=logging.DEBUG.

You can also access the Launcher(s) on the Cluster object and call get_output() to retrieve the output from the process.

Writing your own Launcher(s)#

If you want to write your own launcher, the best place to start is to look at the Launcher classes that ship with IPython Parallel.

There are three key methods to implement:

  • start()

  • stop()

  • from_dict()

Writing start#

A start method on a launcher should do the following:

  1. request the process(es) to be started

  2. start monitoring to notice when the process exits, such that notify_stop() will be called when the process exits.

The command to launch should be the self.args list, inherited from the base class.

The default for the LocalProcessLauncher

    def start(self):
        self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
        if self.state != 'before':
            raise ProcessStateError(
                'The process was already started and has state: {self.state}'
            )
        self.log.debug(f"Sending output for {self.identifier} to {self.output_file}")

        env = os.environ.copy()
        env.update(self.get_env())
        self.log.debug(f"Setting environment: {','.join(self.get_env())}")

        with open(self.output_file, "ab") as f:
            proc = Popen(
                self.args,
                stdout=f.fileno(),
                stderr=STDOUT,
                stdin=PIPE,
                env=env,
                cwd=self.work_dir,
                start_new_session=True,  # don't forward signals
            )
        self.pid = proc.pid
        # use psutil API for self.process
        self.process = psutil.Process(proc.pid)

        self.notify_start(self.process.pid)
        self._start_waiting()
        if 1 <= self.log.getEffectiveLevel() <= logging.DEBUG:
            self._start_streaming()

ControllerLauncher.start is always called with no arguments, whereas EngineLauncher.start is called with n, which is an integer or None. If n is an integer, this many engines should be started. If n is None, a ‘default’ number should be used, e.g. the number of CPUs on a host.

Writing stop#

A stop method should request that the process(es) stop, and return only after everything is stopped and cleaned up. Exactly how to collect these resources will depend greatly on how the resources were requested in start.

Serializing Launchers#

Launchers are serialized to disk using JSON, via the .to_dict() method. The default .to_dict() method should rarely need to be overridden.

To declare a property of your launcher as one that should be included in serialization, register it as a traitlet with to_dict=True. For example:

from traitlets import Integer
from ipyparallel.cluster.launcher import EngineLauncher
class MyLauncher(EngineLauncher):
    pid = Integer(
        help="The pid of the process",
    ).tag(to_dict=True)

This .tag(to_dict=True) ensures that the .pid property will be persisted to disk, and reloaded in the default .from_dict implementation. Typically, these are populated in .start():

def start(self):
    process = start_process(self.args, ...)
    self.pid = process.pid

Mark whatever properties are required to reconstruct your object from disk with this metadata.

writing from_dict#

from_dict() should be a class method which returns an instance of your Launcher class, loaded from dict.

Most from_dict methods will look similar to this:

    @classmethod
    def from_dict(cls, d, **kwargs):
        self = super().from_dict(d, **kwargs)
        self._reconstruct_process(d)
        return self

where serializable-state is loaded first, then ‘live’ objects are loaded from that. As in the default LocalProcessLauncher:

    def _reconstruct_process(self, d):
        """Reconstruct our process"""
        if 'pid' in d and d['pid'] > 0:
            try:
                self.process = psutil.Process(d['pid'])
            except psutil.NoSuchProcess as e:
                raise NotRunning(f"Process {d['pid']}")
            self._start_waiting()

The local process case is the simplest, where the main thing that needs serialization is the PID of the process.

If reconstruction of the object fails because the resource is no longer running (e.g. check for the PID and it’s not there, or a VM / batch job are gone), the NotRunning exception should be raised. This tells the Cluster that the object is gone and should be removed (handled the same as if it had stopped while we are watching). Raising other unhandled errors will be assumed to be a bug in the Launcher, and not result in removing the resource from cluster state.

Additional methods#

Some useful additional methods to implement, if the base class implementations do not work for you:

TODO: write more docs on these

=(entrypoints)

Registering your Launcher via entrypoints#

Once you have defined your launcher, you can ‘register’ it for discovery via entrypoints. In your setup.py:

setup(
    ...
    entry_points={
        'ipyparallel.controller_launchers': [
            'mine = mypackage:MyControllerLauncher',
        ],
        'ipyparallel.engine_launchers': [
            'mine = mypackage:MyEngineSetLauncher',
        ],
    },
)

This allows clusters created to use the shortcut:

Cluster(engines="mine")

instead of the full import string

Cluster(engines="mypackage.MyEngineSetLauncher")

though the long form will always still work.

Launcher API reference#

Facilities for launching IPython Parallel processes asynchronously.

class ipyparallel.cluster.launcher.BaseLauncher(**kwargs)#

An abstraction for starting, stopping and signaling a process.

property arg_str#

The string form of the program arguments.

property args#

A list of cmd and args that will be used to start the process.

This is what is passed to spawnProcess() and the first element will be the process name.

property cluster_args#

Common cluster arguments

property cluster_env#

Cluster-related env variables

property connection_files#

Dict of connection file paths

find_args()#

The .args property calls this to find the args list.

Subcommand should implement this to construct the cmd and args.

classmethod from_dict(d, *, config=None, parent=None, **kwargs)#

Restore a Launcher from a dict

Subclasses should always call launcher = super().from_dict(*args, **kwargs) and finish initialization after that.

After calling from_dict(), the launcher should be in the same state as after .start() (i.e. monitoring for exit, etc.)

Returns: Launcher

The instantiated and fully configured Launcher.

Raises: NotRunning

e.g. if the process has stopped and is no longer running.

get_env()#

Get the full environment for the process

merges different sources for environment variables

get_output(remove=False)#

Retrieve the output form the Launcher.

If remove: remove the file, if any, where it was being stored.

async join(timeout=None)#

Wait for the process to finish

notify_start(data)#

Call this to trigger startup actions.

This logs the process startup and sets the state to ‘running’. It is a pass-through so it can be used as a callback.

notify_stop(data)#

Call this to trigger process stop actions.

This logs the process stopping and sets the state to ‘after’. Call this to trigger callbacks registered via on_stop().

on_stop(f)#

Register a callback to be called with this Launcher’s stop_data when the process actually finishes.

property running#

Am I running.

signal(sig)#

Signal the process.

Parameters

sig (str or int) – ‘KILL’, ‘INT’, etc., or any signal number

async start()#

Start the process.

Should be an async def coroutine.

When start completes, the process should be requested (it need not be running yet), and waiting should begin in the background such that notify_stop() will be called when the process finishes.

async stop()#

Stop the process and notify observers of stopping.

This method should be an async def coroutine, and return only after the process has stopped.

All resources should be cleaned up by the time this returns.

to_dict()#

Serialize a Launcher to a dict, for later restoration

class ipyparallel.cluster.launcher.BatchControllerLauncher(**kwargs)#
start()#

Start n copies of the process using a batch system.

class ipyparallel.cluster.launcher.BatchEngineSetLauncher(**kwargs)#
class ipyparallel.cluster.launcher.BatchSystemLauncher(**kwargs)#

Launch an external process using a batch system.

This class is designed to work with UNIX batch systems like PBS, LSF, GridEngine, etc. The overall model is that there are different commands like qsub, qdel, etc. that handle the starting and stopping of the process.

This class also has the notion of a batch script. The batch_template attribute can be set to a string that is a template for the batch script. This template is instantiated using string formatting. Thus the template can use {n} for the number of instances. Subclasses can add additional variables to the template dict.

find_args()#

The .args property calls this to find the args list.

Subcommand should implement this to construct the cmd and args.

get_output(remove=True)#

Retrieve the output form the Launcher.

If remove: remove the file, if any, where it was being stored.

parse_job_id(output)#

Take the output of the submit command and return the job id.

poll()#

Poll not implemented

Need to use squeue and friends to check job status

signal(sig)#

Signal the process.

Parameters

sig (str or int) – ‘KILL’, ‘INT’, etc., or any signal number

start(n=1)#

Start n copies of the process using a batch system.

stop()#

Stop the process and notify observers of stopping.

This method should be an async def coroutine, and return only after the process has stopped.

All resources should be cleaned up by the time this returns.

write_batch_script(n=1)#

Instantiate and write the batch script to the work_dir.

class ipyparallel.cluster.launcher.ControllerLauncher(**kwargs)#

Base class for launching ipcontroller

async get_connection_info(timeout=60)#

Retrieve connection info for the controller

Default implementation assumes profile_dir and cluster_id are local.

class ipyparallel.cluster.launcher.EngineLauncher(**kwargs)#

Base class for launching one engine

class ipyparallel.cluster.launcher.HTCondorControllerLauncher(**kwargs)#

Launch a controller using HTCondor.

class ipyparallel.cluster.launcher.HTCondorEngineSetLauncher(**kwargs)#

Launch Engines using HTCondor

class ipyparallel.cluster.launcher.HTCondorLauncher(**kwargs)#

A BatchSystemLauncher subclass for HTCondor.

HTCondor requires that we launch the ipengine/ipcontroller scripts rather that the python instance but otherwise is very similar to PBS. This is because HTCondor destroys sys.executable when launching remote processes - a launched python process depends on sys.executable to effectively evaluate its module search paths. Without it, regardless of which python interpreter you launch you will get the to built in module search paths.

We use the ip{cluster, engine, controller} scripts as our executable to circumvent this - the mechanism of shebanged scripts means that the python binary will be launched with argv[0] set to the location of the ip{cluster, engine, controller} scripts on the remote node. This means you need to take care that:

  1. Your remote nodes have their paths configured correctly, with the ipengine and ipcontroller of the python environment you wish to execute code in having top precedence.

  2. This functionality is untested on Windows.

If you need different behavior, consider making you own template.

class ipyparallel.cluster.launcher.LSFControllerLauncher(**kwargs)#

Launch a controller using LSF.

class ipyparallel.cluster.launcher.LSFEngineSetLauncher(**kwargs)#

Launch Engines using LSF

get_env()#

Get the full environment for the process

merges different sources for environment variables

class ipyparallel.cluster.launcher.LSFLauncher(**kwargs)#

A BatchSystemLauncher subclass for LSF.

start(n=1)#

Start n copies of the process using LSF batch system. This cant inherit from the base class because bsub expects to be piped a shell script in order to honor the #BSUB directives : bsub < script

exception ipyparallel.cluster.launcher.LauncherError#
class ipyparallel.cluster.launcher.LocalControllerLauncher(**kwargs)#

Launch a controller as a regular external process.

find_args()#

The .args property calls this to find the args list.

Subcommand should implement this to construct the cmd and args.

start()#

Start the controller by profile_dir.

class ipyparallel.cluster.launcher.LocalEngineLauncher(**kwargs)#

Launch a single engine as a regular external process.

find_args()#

The .args property calls this to find the args list.

Subcommand should implement this to construct the cmd and args.

class ipyparallel.cluster.launcher.LocalEngineSetLauncher(**kwargs)#

Launch a set of engines as regular external processes.

find_args()#

The .args property calls this to find the args list.

Subcommand should implement this to construct the cmd and args.

classmethod from_dict(d, **kwargs)#

Restore a Launcher from a dict

Subclasses should always call launcher = super().from_dict(*args, **kwargs) and finish initialization after that.

After calling from_dict(), the launcher should be in the same state as after .start() (i.e. monitoring for exit, etc.)

Returns: Launcher

The instantiated and fully configured Launcher.

Raises: NotRunning

e.g. if the process has stopped and is no longer running.

get_output(remove=False)#

Get the output of all my child Launchers

launcher_class#

alias of ipyparallel.cluster.launcher.LocalEngineLauncher

signal(sig)#

Signal the process.

Parameters

sig (str or int) – ‘KILL’, ‘INT’, etc., or any signal number

start(n)#

Start n engines by profile or profile_dir.

async stop()#

Stop the process and notify observers of stopping.

This method should be an async def coroutine, and return only after the process has stopped.

All resources should be cleaned up by the time this returns.

to_dict()#

Serialize a Launcher to a dict, for later restoration

class ipyparallel.cluster.launcher.LocalProcessLauncher(**kwargs)#

Start and stop an external process in an asynchronous manner.

This will launch the external process with a working directory of self.work_dir.

find_args()#

The .args property calls this to find the args list.

Subcommand should implement this to construct the cmd and args.

classmethod from_dict(d, **kwargs)#

Restore a Launcher from a dict

Subclasses should always call launcher = super().from_dict(*args, **kwargs) and finish initialization after that.

After calling from_dict(), the launcher should be in the same state as after .start() (i.e. monitoring for exit, etc.)

Returns: Launcher

The instantiated and fully configured Launcher.

Raises: NotRunning

e.g. if the process has stopped and is no longer running.

get_output(remove=False)#

Retrieve the output form the Launcher.

If remove: remove the file, if any, where it was being stored.

async join(timeout=None)#

Wait for the process to exit

signal(sig)#

Signal the process.

Parameters

sig (str or int) – ‘KILL’, ‘INT’, etc., or any signal number

start()#

Start the process.

Should be an async def coroutine.

When start completes, the process should be requested (it need not be running yet), and waiting should begin in the background such that notify_stop() will be called when the process finishes.

async stop()#

Stop the process and notify observers of stopping.

This method should be an async def coroutine, and return only after the process has stopped.

All resources should be cleaned up by the time this returns.

class ipyparallel.cluster.launcher.MPIControllerLauncher(**kwargs)#

Launch a controller using mpiexec.

property program#

An instance of a Python list.

property program_args#

An instance of a Python list.

class ipyparallel.cluster.launcher.MPIEngineSetLauncher(**kwargs)#

Launch engines using mpiexec

property program#

An instance of a Python list.

property program_args#

An instance of a Python list.

start(n)#

Start n engines by profile or profile_dir.

class ipyparallel.cluster.launcher.MPIExecControllerLauncher(**kwargs)#

Deprecated, use MPIControllerLauncher

class ipyparallel.cluster.launcher.MPIExecEngineSetLauncher(**kwargs)#

Deprecated, use MPIEngineSetLauncher

class ipyparallel.cluster.launcher.MPIExecLauncher(**kwargs)#

Deprecated, use MPILauncher

class ipyparallel.cluster.launcher.MPILauncher(**kwargs)#

Launch an external process using mpiexec.

find_args()#

Build self.args using all the fields.

start(n=1)#

Start n instances of the program using mpiexec.

exception ipyparallel.cluster.launcher.NotRunning#

Raised when a launcher is no longer running

class ipyparallel.cluster.launcher.PBSControllerLauncher(**kwargs)#

Launch a controller using PBS.

class ipyparallel.cluster.launcher.PBSEngineSetLauncher(**kwargs)#

Launch Engines using PBS

class ipyparallel.cluster.launcher.PBSLauncher(**kwargs)#

A BatchSystemLauncher subclass for PBS.

exception ipyparallel.cluster.launcher.ProcessStateError#
class ipyparallel.cluster.launcher.SGEControllerLauncher(**kwargs)#

Launch a controller using SGE.

class ipyparallel.cluster.launcher.SGEEngineSetLauncher(**kwargs)#

Launch Engines with SGE

class ipyparallel.cluster.launcher.SGELauncher(**kwargs)#

Sun GridEngine is a PBS clone with slightly different syntax

class ipyparallel.cluster.launcher.SSHControllerLauncher(**kwargs)#
property program#

An instance of a Python list.

property program_args#

An instance of a Python list.

class ipyparallel.cluster.launcher.SSHEngineLauncher(**kwargs)#
property program#

An instance of a Python list.

property program_args#

An instance of a Python list.

class ipyparallel.cluster.launcher.SSHEngineSetLauncher(**kwargs)#
launcher_class#

alias of ipyparallel.cluster.launcher.SSHEngineLauncher

start(n)#

Start engines by profile or profile_dir. n is an upper limit of engines. The engines config property is used to assign slots to hosts.

class ipyparallel.cluster.launcher.SSHLauncher(**kwargs)#

A minimal launcher for ssh.

To be useful this will probably have to be extended to use the sshx idea for environment variables. There could be other things this needs as well.

property cluster_env#

Cluster-related env variables

fetch_files()#

fetch remote files (called after start)

find_args()#

The .args property calls this to find the args list.

Subcommand should implement this to construct the cmd and args.

get_output(remove=False)#

Retrieve engine output from the remote file

async join(timeout=None)#

Wait for the process to exit

poll()#

Override poll

property remote_connection_files#

Return remote paths for connection files

send_files()#

send our files (called before start)

signal(sig)#

Signal the process.

Parameters

sig (str or int) – ‘KILL’, ‘INT’, etc., or any signal number

start(hostname=None, user=None, port=None)#

Start the process.

Should be an async def coroutine.

When start completes, the process should be requested (it need not be running yet), and waiting should begin in the background such that notify_stop() will be called when the process finishes.

class ipyparallel.cluster.launcher.SSHProxyEngineSetLauncher(**kwargs)#

Launcher for calling ipcluster engines on a remote machine.

Requires that remote profile is already configured.

property program#

An instance of a Python list.

property program_args#

An instance of a Python list.

start(n)#

Start the process.

Should be an async def coroutine.

When start completes, the process should be requested (it need not be running yet), and waiting should begin in the background such that notify_stop() will be called when the process finishes.

class ipyparallel.cluster.launcher.SlurmControllerLauncher(**kwargs)#

Launch a controller using Slurm.

class ipyparallel.cluster.launcher.SlurmEngineSetLauncher(**kwargs)#

Launch Engines using Slurm

class ipyparallel.cluster.launcher.SlurmLauncher(**kwargs)#

A BatchSystemLauncher subclass for slurm.

exception ipyparallel.cluster.launcher.UnknownStatus#
class ipyparallel.cluster.launcher.WindowsHPCControllerLauncher(**kwargs)#
class ipyparallel.cluster.launcher.WindowsHPCEngineSetLauncher(**kwargs)#
start(n)#

Start the controller by profile_dir.

class ipyparallel.cluster.launcher.WindowsHPCLauncher(**kwargs)#
find_args()#

The .args property calls this to find the args list.

Subcommand should implement this to construct the cmd and args.

parse_job_id(output)#

Take the output of the submit command and return the job id.

start(n)#

Start n copies of the process using the Win HPC job scheduler.

stop()#

Stop the process and notify observers of stopping.

This method should be an async def coroutine, and return only after the process has stopped.

All resources should be cleaned up by the time this returns.

ipyparallel.cluster.launcher.abbreviate_launcher_class(cls)#

Abbreviate a launcher class back to its entrypoint name

ipyparallel.cluster.launcher.find_launcher_class(name, kind)#

Return a launcher class for a given name and kind.

Parameters
  • name (str) – The full name of the launcher class, either with or without the module path, or an abbreviation (MPI, SSH, SGE, PBS, LSF, HTCondor Slurm, WindowsHPC).

  • kind (str) – Either ‘EngineSet’ or ‘Controller’.

ipyparallel.cluster.launcher.ssh_waitpid(pid, timeout=None)#

To be called on a remote host, waiting on a pid

ipyparallel.cluster.launcher.sshx(ssh_cmd, cmd, env, remote_output_file, log=None)#

Launch a remote process, returning its remote pid

Uses nohup and pipes to put it in the background