Launchers#
The Launcher
is the basic abstraction in IPython Parallel
for starting and stopping processes.
A Launcher has two primary methods: start()
and stop()
,
which should be async def
coroutines.
There are two basic kinds of Launcher: ControllerLauncher
and EngineLauncher
.
A ControllerLauncher should launch ipcontroller
somewhere,
and an EngineLauncher should start n
engines somewhere.
Shared configuration,
principally profile_dir
and cluster_id
are typically used to locate the connection files necessary for these two communicate,
though explicit paths can be added to arguments.
Launchers are used through the Cluster
API,
which manages one ControllerLauncher and zero to many EngineLaunchers,
each representing a set of engines.
Launchers are registered via entry points (more below), and can be selected via short lowercase string naming the kind of launcher, e.g. ‘mpi’ or ‘local’:
import ipyparallel as ipp
c = ipp.Cluster(engines="mpi")
For the most part, Launchers are not interacted-with directly, but can be configured.
If you generate a config file with:
ipython profile create --parallel
you can check out the resulting ipcluster_config.py
,
which includes configuration options for all available Launcher classes.
You can also check ipcluster start --help-all
to see them on the command-line.
Debugging launchers#
If a launcher isn’t doing what you want,
the first thing to do is probably start your Cluster with log_level=logging.DEBUG
.
You can also access the Launcher(s) on the Cluster object and call get_output()
to retrieve the output from the process.
Writing your own Launcher(s)#
If you want to write your own launcher, the best place to start is to look at the Launcher classes that ship with IPython Parallel.
There are three key methods to implement:
start()
stop()
from_dict()
Writing start#
A start method on a launcher should do the following:
request the process(es) to be started
start monitoring to notice when the process exits, such that
notify_stop()
will be called when the process exits.
The command to launch should be the self.args
list, inherited from the base class.
The default for the LocalProcessLauncher
def start(self):
self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
if self.state != 'before':
raise ProcessStateError(
'The process was already started and has state: {self.state}'
)
self.log.debug(f"Sending output for {self.identifier} to {self.output_file}")
env = os.environ.copy()
env.update(self.get_env())
self.log.debug(f"Setting environment: {','.join(self.get_env())}")
with open(self.output_file, "ab") as f:
proc = Popen(
self.args,
stdout=f.fileno(),
stderr=STDOUT,
stdin=PIPE,
env=env,
cwd=self.work_dir,
start_new_session=True, # don't forward signals
)
self.pid = proc.pid
# use psutil API for self.process
self.process = psutil.Process(proc.pid)
self.notify_start(self.process.pid)
self._start_waiting()
if 1 <= self.log.getEffectiveLevel() <= logging.DEBUG:
self._start_streaming()
ControllerLauncher.start is always called with no arguments,
whereas EngineLauncher.start
is called with n
,
which is an integer or None. If n
is an integer,
this many engines should be started.
If n
is None, a ‘default’ number should be used,
e.g. the number of CPUs on a host.
Writing stop#
A stop method should request that the process(es) stop,
and return only after everything is stopped and cleaned up.
Exactly how to collect these resources will depend greatly on how the resources were requested in start
.
Serializing Launchers#
Launchers are serialized to disk using JSON,
via the .to_dict()
method.
The default .to_dict()
method should rarely need to be overridden.
To declare a property of your launcher as one that should be included in serialization,
register it as a traitlet with to_dict=True
.
For example:
from traitlets import Integer
from ipyparallel.cluster.launcher import EngineLauncher
class MyLauncher(EngineLauncher):
pid = Integer(
help="The pid of the process",
).tag(to_dict=True)
This .tag(to_dict=True)
ensures that the .pid
property will be persisted to disk,
and reloaded in the default .from_dict
implementation.
Typically, these are populated in .start()
:
def start(self):
process = start_process(self.args, ...)
self.pid = process.pid
Mark whatever properties are required to reconstruct your object from disk with this metadata.
writing from_dict#
from_dict()
should be a class method which returns an instance of your Launcher class, loaded from dict.
Most from_dict
methods will look similar to this:
@classmethod
def from_dict(cls, d, **kwargs):
self = super().from_dict(d, **kwargs)
self._reconstruct_process(d)
return self
where serializable-state is loaded first, then ‘live’ objects are loaded from that. As in the default LocalProcessLauncher:
def _reconstruct_process(self, d):
"""Reconstruct our process"""
if 'pid' in d and d['pid'] > 0:
try:
self.process = psutil.Process(d['pid'])
except psutil.NoSuchProcess as e:
raise NotRunning(f"Process {d['pid']}")
self._start_waiting()
The local process case is the simplest, where the main thing that needs serialization is the PID of the process.
If reconstruction of the object fails because the resource is no longer running
(e.g. check for the PID and it’s not there, or a VM / batch job are gone),
the NotRunning
exception should be raised.
This tells the Cluster that the object is gone and should be removed
(handled the same as if it had stopped while we are watching).
Raising other unhandled errors will be assumed to be a bug in the Launcher,
and not result in removing the resource from cluster state.
Additional methods#
Some useful additional methods to implement, if the base class implementations do not work for you:
TODO: write more docs on these
=(entrypoints)
Registering your Launcher via entrypoints#
Once you have defined your launcher, you can ‘register’ it for discovery via entrypoints. In your setup.py:
setup(
...
entry_points={
'ipyparallel.controller_launchers': [
'mine = mypackage:MyControllerLauncher',
],
'ipyparallel.engine_launchers': [
'mine = mypackage:MyEngineSetLauncher',
],
},
)
This allows clusters created to use the shortcut:
Cluster(engines="mine")
instead of the full import string
Cluster(engines="mypackage.MyEngineSetLauncher")
though the long form will always still work.
Launcher API reference#
Facilities for launching IPython Parallel processes asynchronously.
- class ipyparallel.cluster.launcher.BaseLauncher(**kwargs)#
An abstraction for starting, stopping and signaling a process.
- property arg_str#
The string form of the program arguments.
- property args#
A list of cmd and args that will be used to start the process.
This is what is passed to
spawnProcess()
and the first element will be the process name.
- property cluster_args#
Common cluster arguments
- property cluster_env#
Cluster-related env variables
- property connection_files#
Dict of connection file paths
- find_args()#
The
.args
property calls this to find the args list.Subcommand should implement this to construct the cmd and args.
- classmethod from_dict(d, *, config=None, parent=None, **kwargs)#
Restore a Launcher from a dict
Subclasses should always call
launcher = super().from_dict(*args, **kwargs)
and finish initialization after that.After calling from_dict(), the launcher should be in the same state as after
.start()
(i.e. monitoring for exit, etc.)- Returns: Launcher
The instantiated and fully configured Launcher.
- Raises: NotRunning
e.g. if the process has stopped and is no longer running.
- get_env()#
Get the full environment for the process
merges different sources for environment variables
- get_output(remove=False)#
Retrieve the output form the Launcher.
If remove: remove the file, if any, where it was being stored.
- async join(timeout=None)#
Wait for the process to finish
- notify_start(data)#
Call this to trigger startup actions.
This logs the process startup and sets the state to ‘running’. It is a pass-through so it can be used as a callback.
- notify_stop(data)#
Call this to trigger process stop actions.
This logs the process stopping and sets the state to ‘after’. Call this to trigger callbacks registered via
on_stop()
.
- on_stop(f)#
Register a callback to be called with this Launcher’s stop_data when the process actually finishes.
- property running#
Am I running.
- signal(sig)#
Signal the process.
- async start()#
Start the process.
Should be an
async def
coroutine.When start completes, the process should be requested (it need not be running yet), and waiting should begin in the background such that
notify_stop()
will be called when the process finishes.
- async stop()#
Stop the process and notify observers of stopping.
This method should be an
async def
coroutine, and return only after the process has stopped.All resources should be cleaned up by the time this returns.
- to_dict()#
Serialize a Launcher to a dict, for later restoration
- class ipyparallel.cluster.launcher.BatchControllerLauncher(**kwargs)#
- start()#
Start n copies of the process using a batch system.
- class ipyparallel.cluster.launcher.BatchEngineSetLauncher(**kwargs)#
- class ipyparallel.cluster.launcher.BatchSystemLauncher(**kwargs)#
Launch an external process using a batch system.
This class is designed to work with UNIX batch systems like PBS, LSF, GridEngine, etc. The overall model is that there are different commands like qsub, qdel, etc. that handle the starting and stopping of the process.
This class also has the notion of a batch script. The
batch_template
attribute can be set to a string that is a template for the batch script. This template is instantiated using string formatting. Thus the template can use {n} for the number of instances. Subclasses can add additional variables to the template dict.- find_args()#
The
.args
property calls this to find the args list.Subcommand should implement this to construct the cmd and args.
- get_output(remove=True)#
Retrieve the output form the Launcher.
If remove: remove the file, if any, where it was being stored.
- parse_job_id(output)#
Take the output of the submit command and return the job id.
- poll()#
Poll not implemented
Need to use
squeue
and friends to check job status
- signal(sig)#
Signal the process.
- start(n=1)#
Start n copies of the process using a batch system.
- stop()#
Stop the process and notify observers of stopping.
This method should be an
async def
coroutine, and return only after the process has stopped.All resources should be cleaned up by the time this returns.
- write_batch_script(n=1)#
Instantiate and write the batch script to the work_dir.
- class ipyparallel.cluster.launcher.ControllerLauncher(**kwargs)#
Base class for launching ipcontroller
- async get_connection_info(timeout=60)#
Retrieve connection info for the controller
Default implementation assumes profile_dir and cluster_id are local.
- class ipyparallel.cluster.launcher.EngineLauncher(**kwargs)#
Base class for launching one engine
- class ipyparallel.cluster.launcher.HTCondorControllerLauncher(**kwargs)#
Launch a controller using HTCondor.
- class ipyparallel.cluster.launcher.HTCondorEngineSetLauncher(**kwargs)#
Launch Engines using HTCondor
- class ipyparallel.cluster.launcher.HTCondorLauncher(**kwargs)#
A BatchSystemLauncher subclass for HTCondor.
HTCondor requires that we launch the ipengine/ipcontroller scripts rather that the python instance but otherwise is very similar to PBS. This is because HTCondor destroys sys.executable when launching remote processes - a launched python process depends on sys.executable to effectively evaluate its module search paths. Without it, regardless of which python interpreter you launch you will get the to built in module search paths.
We use the ip{cluster, engine, controller} scripts as our executable to circumvent this - the mechanism of shebanged scripts means that the python binary will be launched with argv[0] set to the location of the ip{cluster, engine, controller} scripts on the remote node. This means you need to take care that:
Your remote nodes have their paths configured correctly, with the ipengine and ipcontroller of the python environment you wish to execute code in having top precedence.
This functionality is untested on Windows.
If you need different behavior, consider making you own template.
- class ipyparallel.cluster.launcher.LSFControllerLauncher(**kwargs)#
Launch a controller using LSF.
- class ipyparallel.cluster.launcher.LSFEngineSetLauncher(**kwargs)#
Launch Engines using LSF
- get_env()#
Get the full environment for the process
merges different sources for environment variables
- class ipyparallel.cluster.launcher.LSFLauncher(**kwargs)#
A BatchSystemLauncher subclass for LSF.
- start(n=1)#
Start n copies of the process using LSF batch system. This cant inherit from the base class because bsub expects to be piped a shell script in order to honor the #BSUB directives : bsub < script
- exception ipyparallel.cluster.launcher.LauncherError#
- class ipyparallel.cluster.launcher.LocalControllerLauncher(**kwargs)#
Launch a controller as a regular external process.
- find_args()#
The
.args
property calls this to find the args list.Subcommand should implement this to construct the cmd and args.
- start()#
Start the controller by profile_dir.
- class ipyparallel.cluster.launcher.LocalEngineLauncher(**kwargs)#
Launch a single engine as a regular external process.
- find_args()#
The
.args
property calls this to find the args list.Subcommand should implement this to construct the cmd and args.
- class ipyparallel.cluster.launcher.LocalEngineSetLauncher(**kwargs)#
Launch a set of engines as regular external processes.
- find_args()#
The
.args
property calls this to find the args list.Subcommand should implement this to construct the cmd and args.
- classmethod from_dict(d, **kwargs)#
Restore a Launcher from a dict
Subclasses should always call
launcher = super().from_dict(*args, **kwargs)
and finish initialization after that.After calling from_dict(), the launcher should be in the same state as after
.start()
(i.e. monitoring for exit, etc.)- Returns: Launcher
The instantiated and fully configured Launcher.
- Raises: NotRunning
e.g. if the process has stopped and is no longer running.
- get_output(remove=False)#
Get the output of all my child Launchers
- launcher_class#
- signal(sig)#
Signal the process.
- start(n)#
Start n engines by profile or profile_dir.
- async stop()#
Stop the process and notify observers of stopping.
This method should be an
async def
coroutine, and return only after the process has stopped.All resources should be cleaned up by the time this returns.
- to_dict()#
Serialize a Launcher to a dict, for later restoration
- class ipyparallel.cluster.launcher.LocalProcessLauncher(**kwargs)#
Start and stop an external process in an asynchronous manner.
This will launch the external process with a working directory of
self.work_dir
.- find_args()#
The
.args
property calls this to find the args list.Subcommand should implement this to construct the cmd and args.
- classmethod from_dict(d, **kwargs)#
Restore a Launcher from a dict
Subclasses should always call
launcher = super().from_dict(*args, **kwargs)
and finish initialization after that.After calling from_dict(), the launcher should be in the same state as after
.start()
(i.e. monitoring for exit, etc.)- Returns: Launcher
The instantiated and fully configured Launcher.
- Raises: NotRunning
e.g. if the process has stopped and is no longer running.
- get_output(remove=False)#
Retrieve the output form the Launcher.
If remove: remove the file, if any, where it was being stored.
- async join(timeout=None)#
Wait for the process to exit
- signal(sig)#
Signal the process.
- start()#
Start the process.
Should be an
async def
coroutine.When start completes, the process should be requested (it need not be running yet), and waiting should begin in the background such that
notify_stop()
will be called when the process finishes.
- async stop()#
Stop the process and notify observers of stopping.
This method should be an
async def
coroutine, and return only after the process has stopped.All resources should be cleaned up by the time this returns.
- class ipyparallel.cluster.launcher.MPIControllerLauncher(**kwargs)#
Launch a controller using mpiexec.
- property program#
An instance of a Python list.
- property program_args#
An instance of a Python list.
- class ipyparallel.cluster.launcher.MPIEngineSetLauncher(**kwargs)#
Launch engines using mpiexec
- property program#
An instance of a Python list.
- property program_args#
An instance of a Python list.
- start(n)#
Start n engines by profile or profile_dir.
- class ipyparallel.cluster.launcher.MPIExecControllerLauncher(**kwargs)#
Deprecated, use MPIControllerLauncher
- class ipyparallel.cluster.launcher.MPIExecEngineSetLauncher(**kwargs)#
Deprecated, use MPIEngineSetLauncher
- class ipyparallel.cluster.launcher.MPIExecLauncher(**kwargs)#
Deprecated, use MPILauncher
- class ipyparallel.cluster.launcher.MPILauncher(**kwargs)#
Launch an external process using mpiexec.
- find_args()#
Build self.args using all the fields.
- start(n=1)#
Start n instances of the program using mpiexec.
- exception ipyparallel.cluster.launcher.NotRunning#
Raised when a launcher is no longer running
- class ipyparallel.cluster.launcher.PBSControllerLauncher(**kwargs)#
Launch a controller using PBS.
- class ipyparallel.cluster.launcher.PBSEngineSetLauncher(**kwargs)#
Launch Engines using PBS
- class ipyparallel.cluster.launcher.PBSLauncher(**kwargs)#
A BatchSystemLauncher subclass for PBS.
- exception ipyparallel.cluster.launcher.ProcessStateError#
- class ipyparallel.cluster.launcher.SGEControllerLauncher(**kwargs)#
Launch a controller using SGE.
- class ipyparallel.cluster.launcher.SGEEngineSetLauncher(**kwargs)#
Launch Engines with SGE
- class ipyparallel.cluster.launcher.SGELauncher(**kwargs)#
Sun GridEngine is a PBS clone with slightly different syntax
- class ipyparallel.cluster.launcher.SSHControllerLauncher(**kwargs)#
- property program#
An instance of a Python list.
- property program_args#
An instance of a Python list.
- class ipyparallel.cluster.launcher.SSHEngineLauncher(**kwargs)#
- property program#
An instance of a Python list.
- property program_args#
An instance of a Python list.
- class ipyparallel.cluster.launcher.SSHEngineSetLauncher(**kwargs)#
- launcher_class#
- start(n)#
Start engines by profile or profile_dir.
n
is an upper limit of engines. Theengines
config property is used to assign slots to hosts.
- class ipyparallel.cluster.launcher.SSHLauncher(**kwargs)#
A minimal launcher for ssh.
To be useful this will probably have to be extended to use the
sshx
idea for environment variables. There could be other things this needs as well.- property cluster_env#
Cluster-related env variables
- fetch_files()#
fetch remote files (called after start)
- find_args()#
The
.args
property calls this to find the args list.Subcommand should implement this to construct the cmd and args.
- get_output(remove=False)#
Retrieve engine output from the remote file
- async join(timeout=None)#
Wait for the process to exit
- poll()#
Override poll
- property remote_connection_files#
Return remote paths for connection files
- send_files()#
send our files (called before start)
- signal(sig)#
Signal the process.
- start(hostname=None, user=None, port=None)#
Start the process.
Should be an
async def
coroutine.When start completes, the process should be requested (it need not be running yet), and waiting should begin in the background such that
notify_stop()
will be called when the process finishes.
- class ipyparallel.cluster.launcher.SSHProxyEngineSetLauncher(**kwargs)#
Launcher for calling
ipcluster engines
on a remote machine.Requires that remote profile is already configured.
- property program#
An instance of a Python list.
- property program_args#
An instance of a Python list.
- start(n)#
Start the process.
Should be an
async def
coroutine.When start completes, the process should be requested (it need not be running yet), and waiting should begin in the background such that
notify_stop()
will be called when the process finishes.
- class ipyparallel.cluster.launcher.SlurmControllerLauncher(**kwargs)#
Launch a controller using Slurm.
- class ipyparallel.cluster.launcher.SlurmEngineSetLauncher(**kwargs)#
Launch Engines using Slurm
- class ipyparallel.cluster.launcher.SlurmLauncher(**kwargs)#
A BatchSystemLauncher subclass for slurm.
- exception ipyparallel.cluster.launcher.UnknownStatus#
- class ipyparallel.cluster.launcher.WindowsHPCControllerLauncher(**kwargs)#
- class ipyparallel.cluster.launcher.WindowsHPCEngineSetLauncher(**kwargs)#
- start(n)#
Start the controller by profile_dir.
- class ipyparallel.cluster.launcher.WindowsHPCLauncher(**kwargs)#
- find_args()#
The
.args
property calls this to find the args list.Subcommand should implement this to construct the cmd and args.
- parse_job_id(output)#
Take the output of the submit command and return the job id.
- start(n)#
Start n copies of the process using the Win HPC job scheduler.
- stop()#
Stop the process and notify observers of stopping.
This method should be an
async def
coroutine, and return only after the process has stopped.All resources should be cleaned up by the time this returns.
- ipyparallel.cluster.launcher.abbreviate_launcher_class(cls)#
Abbreviate a launcher class back to its entrypoint name
- ipyparallel.cluster.launcher.find_launcher_class(name, kind)#
Return a launcher class for a given name and kind.
- ipyparallel.cluster.launcher.ssh_waitpid(pid, timeout=None)#
To be called on a remote host, waiting on a pid
- ipyparallel.cluster.launcher.sshx(ssh_cmd, cmd, env, remote_output_file, log=None)#
Launch a remote process, returning its remote pid
Uses nohup and pipes to put it in the background