Launchers#
The Launcher
is the basic abstraction in IPython Parallel
for starting and stopping processes.
A Launcher has two primary methods: start()
and stop()
,
which should be async def
coroutines.
There are two basic kinds of Launcher: ControllerLauncher
and EngineLauncher
.
A ControllerLauncher should launch ipcontroller
somewhere,
and an EngineLauncher should start n
engines somewhere.
Shared configuration,
principally profile_dir
and cluster_id
are typically used to locate the connection files necessary for these two communicate,
though explicit paths can be added to arguments.
Launchers are used through the Cluster
API,
which manages one ControllerLauncher and zero to many EngineLaunchers,
each representing a set of engines.
Launchers are registered via entry points (more below), and can be selected via short lowercase string naming the kind of launcher, e.g. ‘mpi’ or ‘local’:
import ipyparallel as ipp
c = ipp.Cluster(engines="mpi")
For the most part, Launchers are not interacted-with directly, but can be configured.
If you generate a config file with:
ipython profile create --parallel
you can check out the resulting ipcluster_config.py
,
which includes configuration options for all available Launcher classes.
You can also check ipcluster start --help-all
to see them on the command-line.
Debugging launchers#
If a launcher isn’t doing what you want,
the first thing to do is probably start your Cluster with log_level=logging.DEBUG
.
You can also access the Launcher(s) on the Cluster object and call get_output()
to retrieve the output from the process.
Writing your own Launcher(s)#
If you want to write your own launcher, the best place to start is to look at the Launcher classes that ship with IPython Parallel.
There are three key methods to implement:
start()
stop()
from_dict()
Writing start#
A start method on a launcher should do the following:
request the process(es) to be started
start monitoring to notice when the process exits, such that
notify_stop()
will be called when the process exits.
The command to launch should be the self.args
list, inherited from the base class.
The default for the LocalProcessLauncher
def start(self):
self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
if self.state != 'before':
raise ProcessStateError(
'The process was already started and has state: {self.state}'
)
self.log.debug(f"Sending output for {self.identifier} to {self.output_file}")
env = os.environ.copy()
env.update(self.get_env())
self.log.debug(f"Setting environment: {','.join(self.get_env())}")
with open(self.output_file, "ab") as f, open(os.devnull, "rb") as stdin:
proc = self._popen_process = Popen(
self.args,
stdout=f.fileno(),
stderr=STDOUT,
stdin=stdin,
env=env,
cwd=self.work_dir,
start_new_session=True, # don't forward signals
)
self.pid = proc.pid
# use psutil API for self.process
self.process = psutil.Process(proc.pid)
self.notify_start(self.process.pid)
self._start_waiting()
if 1 <= self.log.getEffectiveLevel() <= logging.DEBUG:
self._start_streaming()
ControllerLauncher.start is always called with no arguments,
whereas EngineLauncher.start
is called with n
,
which is an integer or None. If n
is an integer,
this many engines should be started.
If n
is None, a ‘default’ number should be used,
e.g. the number of CPUs on a host.
Writing stop#
A stop method should request that the process(es) stop,
and return only after everything is stopped and cleaned up.
Exactly how to collect these resources will depend greatly on how the resources were requested in start
.
Serializing Launchers#
Launchers are serialized to disk using JSON,
via the .to_dict()
method.
The default .to_dict()
method should rarely need to be overridden.
To declare a property of your launcher as one that should be included in serialization,
register it as a traitlet with to_dict=True
.
For example:
from traitlets import Integer
from ipyparallel.cluster.launcher import EngineLauncher
class MyLauncher(EngineLauncher):
pid = Integer(
help="The pid of the process",
).tag(to_dict=True)
This .tag(to_dict=True)
ensures that the .pid
property will be persisted to disk,
and reloaded in the default .from_dict
implementation.
Typically, these are populated in .start()
:
def start(self):
process = start_process(self.args, ...)
self.pid = process.pid
Mark whatever properties are required to reconstruct your object from disk with this metadata.
writing from_dict#
from_dict()
should be a class method which returns an instance of your Launcher class, loaded from dict.
Most from_dict
methods will look similar to this:
@classmethod
def from_dict(cls, d, **kwargs):
self = super().from_dict(d, **kwargs)
self._reconstruct_process(d)
return self
where serializable-state is loaded first, then ‘live’ objects are loaded from that. As in the default LocalProcessLauncher:
def _reconstruct_process(self, d):
"""Reconstruct our process"""
if 'pid' in d and d['pid'] > 0:
try:
self.process = psutil.Process(d['pid'])
except psutil.NoSuchProcess as e:
raise NotRunning(f"Process {d['pid']}")
self._start_waiting()
The local process case is the simplest, where the main thing that needs serialization is the PID of the process.
If reconstruction of the object fails because the resource is no longer running
(e.g. check for the PID and it’s not there, or a VM / batch job are gone),
the NotRunning
exception should be raised.
This tells the Cluster that the object is gone and should be removed
(handled the same as if it had stopped while we are watching).
Raising other unhandled errors will be assumed to be a bug in the Launcher,
and not result in removing the resource from cluster state.
Additional methods#
Some useful additional methods to implement, if the base class implementations do not work for you:
TODO: write more docs on these
=(entrypoints)
Registering your Launcher via entrypoints#
Once you have defined your launcher, you can ‘register’ it for discovery via entrypoints. In your setup.py:
setup(
...
entry_points={
'ipyparallel.controller_launchers': [
'mine = mypackage:MyControllerLauncher',
],
'ipyparallel.engine_launchers': [
'mine = mypackage:MyEngineSetLauncher',
],
},
)
This allows clusters created to use the shortcut:
Cluster(engines="mine")
instead of the full import string
Cluster(engines="mypackage.MyEngineSetLauncher")
though the long form will always still work.
Launcher API reference#
Facilities for launching IPython Parallel processes asynchronously.
- class ipyparallel.cluster.launcher.BaseLauncher(**kwargs: Any)#
An abstraction for starting, stopping and signaling a process.
- property arg_str#
The string form of the program arguments.
- property args#
A list of cmd and args that will be used to start the process.
This is what is passed to
spawnProcess()
and the first element will be the process name.
- property cluster_args#
Common cluster arguments
- property cluster_env#
Cluster-related env variables
- property connection_files#
Dict of connection file paths
- environment#
Set environment variables for the launched process
New in version 8.0.
- find_args()#
The
.args
property calls this to find the args list.Subcommand should implement this to construct the cmd and args.
- classmethod from_dict(d, *, config=None, parent=None, **kwargs)#
Restore a Launcher from a dict
Subclasses should always call
launcher = super().from_dict(*args, **kwargs)
and finish initialization after that.After calling from_dict(), the launcher should be in the same state as after
.start()
(i.e. monitoring for exit, etc.)- Returns: Launcher
The instantiated and fully configured Launcher.
- Raises: NotRunning
e.g. if the process has stopped and is no longer running.
- get_env()#
Get the full environment for the process
merges different sources for environment variables
- get_output(remove=False)#
Retrieve the output form the Launcher.
If remove: remove the file, if any, where it was being stored.
- identifier#
Used for lookup in e.g. EngineSetLauncher during notify_stop and default log files
- async join(timeout=None)#
Wait for the process to finish
- notify_start(data)#
Call this to trigger startup actions.
This logs the process startup and sets the state to ‘running’. It is a pass-through so it can be used as a callback.
- notify_stop(data)#
Call this to trigger process stop actions.
This logs the process stopping and sets the state to ‘after’. Call this to trigger callbacks registered via
on_stop()
.
- on_stop(f)#
Register a callback to be called with this Launcher’s stop_data when the process actually finishes.
- output_limit#
When a process exits, display up to this many lines of output
- property running#
Am I running.
- signal(sig)#
Signal the process.
- async start()#
Start the process.
Should be an
async def
coroutine.When start completes, the process should be requested (it need not be running yet), and waiting should begin in the background such that
notify_stop()
will be called when the process finishes.
- async stop()#
Stop the process and notify observers of stopping.
This method should be an
async def
coroutine, and return only after the process has stopped.All resources should be cleaned up by the time this returns.
- stop_timeout#
The number of seconds to wait for a process to exit before raising a TimeoutError in stop
- to_dict()#
Serialize a Launcher to a dict, for later restoration
- class ipyparallel.cluster.launcher.BatchControllerLauncher(**kwargs: Any)#
- start()#
Start n copies of the process using a batch system.
- class ipyparallel.cluster.launcher.BatchSystemLauncher(**kwargs: Any)#
Launch an external process using a batch system.
This class is designed to work with UNIX batch systems like PBS, LSF, GridEngine, etc. The overall model is that there are different commands like qsub, qdel, etc. that handle the starting and stopping of the process.
This class also has the notion of a batch script. The
batch_template
attribute can be set to a string that is a template for the batch script. This template is instantiated using string formatting. Thus the template can use {n} for the number of instances. Subclasses can add additional variables to the template dict.- batch_file_name#
The filename of the instantiated batch script.
- batch_template#
The string that is the batch script template itself.
- batch_template_file#
The file that contains the batch template.
- delete_command#
The name of the command line program used to delete jobs.
- find_args()#
The
.args
property calls this to find the args list.Subcommand should implement this to construct the cmd and args.
- get_output(remove=True)#
Retrieve the output form the Launcher.
If remove: remove the file, if any, where it was being stored.
- job_id_regexp#
A regular expression used to get the job id from the output of the submit_command.
- job_id_regexp_group#
The group we wish to match in job_id_regexp (0 to match all)
- namespace#
Extra variables to pass to the template.
This lets you parameterize additional options, such as wall_time with a custom template.
- output_file#
File in which to store stdout/err of processes
- parse_job_id(output)#
Take the output of the submit command and return the job id.
- poll()#
Poll not implemented
Need to use
squeue
and friends to check job status
- queue#
The batch queue.
- signal(sig)#
Signal the process.
- signal_command#
The name of the command line program used to send signals to jobs.
- start(n=1)#
Start n copies of the process using a batch system.
- stop()#
Stop the process and notify observers of stopping.
This method should be an
async def
coroutine, and return only after the process has stopped.All resources should be cleaned up by the time this returns.
- submit_command#
The name of the command line program used to submit jobs.
- write_batch_script(n=1)#
Instantiate and write the batch script to the work_dir.
- class ipyparallel.cluster.launcher.ControllerLauncher(**kwargs: Any)#
Base class for launching ipcontroller
- controller_args#
command-line args to pass to ipcontroller
- controller_cmd#
Popen command to launch ipcontroller.
- async get_connection_info(timeout=60)#
Retrieve connection info for the controller
Default implementation assumes profile_dir and cluster_id are local.
- class ipyparallel.cluster.launcher.EngineLauncher(**kwargs: Any)#
Base class for launching one engine
- engine_args#
command-line arguments to pass to ipengine
- engine_cmd#
command to launch the Engine.
- class ipyparallel.cluster.launcher.HTCondorControllerLauncher(**kwargs: Any)#
Launch a controller using HTCondor.
- batch_file_name#
batch file name for the controller job.
- class ipyparallel.cluster.launcher.HTCondorEngineSetLauncher(**kwargs: Any)#
Launch Engines using HTCondor
- batch_file_name#
batch file name for the engine(s) job.
- class ipyparallel.cluster.launcher.HTCondorLauncher(**kwargs: Any)#
A BatchSystemLauncher subclass for HTCondor.
HTCondor requires that we launch the ipengine/ipcontroller scripts rather that the python instance but otherwise is very similar to PBS. This is because HTCondor destroys sys.executable when launching remote processes - a launched python process depends on sys.executable to effectively evaluate its module search paths. Without it, regardless of which python interpreter you launch you will get the to built in module search paths.
We use the ip{cluster, engine, controller} scripts as our executable to circumvent this - the mechanism of shebanged scripts means that the python binary will be launched with argv[0] set to the location of the ip{cluster, engine, controller} scripts on the remote node. This means you need to take care that:
Your remote nodes have their paths configured correctly, with the ipengine and ipcontroller of the python environment you wish to execute code in having top precedence.
This functionality is untested on Windows.
If you need different behavior, consider making you own template.
- delete_command#
The HTCondor delete command [‘condor_rm’]
- job_id_regexp#
Regular expression for identifying the job ID [r’(d+).$’]
- job_id_regexp_group#
The group we wish to match in job_id_regexp [1]
- submit_command#
The HTCondor submit command [‘condor_submit’]
- class ipyparallel.cluster.launcher.LSFControllerLauncher(**kwargs: Any)#
Launch a controller using LSF.
- batch_file_name#
batch file name for the controller job.
- class ipyparallel.cluster.launcher.LSFEngineSetLauncher(**kwargs: Any)#
Launch Engines using LSF
- batch_file_name#
batch file name for the engine(s) job.
- get_env()#
Get the full environment for the process
merges different sources for environment variables
- class ipyparallel.cluster.launcher.LSFLauncher(**kwargs: Any)#
A BatchSystemLauncher subclass for LSF.
- delete_command#
The LSF delete command [‘bkill’]
- job_id_regexp#
Regular expresion for identifying the job ID [r’d+’]
- signal_command#
The LSF signal command [‘bkill’, ‘-s’]
- start(n=1)#
Start n copies of the process using LSF batch system. This cant inherit from the base class because bsub expects to be piped a shell script in order to honor the #BSUB directives : bsub < script
- submit_command#
The LSF submit command [‘bsub’]
- exception ipyparallel.cluster.launcher.LauncherError#
- class ipyparallel.cluster.launcher.LocalControllerLauncher(**kwargs: Any)#
Launch a controller as a regular external process.
- find_args()#
The
.args
property calls this to find the args list.Subcommand should implement this to construct the cmd and args.
- start()#
Start the controller by profile_dir.
- class ipyparallel.cluster.launcher.LocalEngineLauncher(**kwargs: Any)#
Launch a single engine as a regular external process.
- find_args()#
The
.args
property calls this to find the args list.Subcommand should implement this to construct the cmd and args.
- class ipyparallel.cluster.launcher.LocalEngineSetLauncher(**kwargs: Any)#
Launch a set of engines as regular external processes.
- delay#
delay (in seconds) between starting each engine after the first. This can help force the engines to get their ids in order, or limit process flood when starting many engines.
- find_args()#
The
.args
property calls this to find the args list.Subcommand should implement this to construct the cmd and args.
- classmethod from_dict(d, **kwargs)#
Restore a Launcher from a dict
Subclasses should always call
launcher = super().from_dict(*args, **kwargs)
and finish initialization after that.After calling from_dict(), the launcher should be in the same state as after
.start()
(i.e. monitoring for exit, etc.)- Returns: Launcher
The instantiated and fully configured Launcher.
- Raises: NotRunning
e.g. if the process has stopped and is no longer running.
- get_output(remove=False)#
Get the output of all my child Launchers
- launcher_class#
alias of
LocalEngineLauncher
- signal(sig)#
Signal the process.
- start(n)#
Start n engines by profile or profile_dir.
- async stop()#
Stop the process and notify observers of stopping.
This method should be an
async def
coroutine, and return only after the process has stopped.All resources should be cleaned up by the time this returns.
- to_dict()#
Serialize a Launcher to a dict, for later restoration
- class ipyparallel.cluster.launcher.LocalProcessLauncher(**kwargs: Any)#
Start and stop an external process in an asynchronous manner.
This will launch the external process with a working directory of
self.work_dir
.- find_args()#
The
.args
property calls this to find the args list.Subcommand should implement this to construct the cmd and args.
- classmethod from_dict(d, **kwargs)#
Restore a Launcher from a dict
Subclasses should always call
launcher = super().from_dict(*args, **kwargs)
and finish initialization after that.After calling from_dict(), the launcher should be in the same state as after
.start()
(i.e. monitoring for exit, etc.)- Returns: Launcher
The instantiated and fully configured Launcher.
- Raises: NotRunning
e.g. if the process has stopped and is no longer running.
- get_output(remove=False)#
Retrieve the output form the Launcher.
If remove: remove the file, if any, where it was being stored.
- async join(timeout=None)#
Wait for the process to exit
- poll_seconds#
Interval on which to poll processes (.
Note: process exit should be noticed immediately, due to use of Process.wait(), but this interval should ensure we aren’t leaving threads running forever, as other signals/events are checked on this interval
- signal(sig)#
Signal the process.
- start()#
Start the process.
Should be an
async def
coroutine.When start completes, the process should be requested (it need not be running yet), and waiting should begin in the background such that
notify_stop()
will be called when the process finishes.
- async stop()#
Stop the process and notify observers of stopping.
This method should be an
async def
coroutine, and return only after the process has stopped.All resources should be cleaned up by the time this returns.
- stop_seconds_until_kill#
The number of seconds to wait for a process to exit after sending SIGTERM before sending SIGKILL
- class ipyparallel.cluster.launcher.MPIControllerLauncher(**kwargs: Any)#
Launch a controller using mpiexec.
- property program#
The program to start via mpiexec.
- property program_args#
The command line argument to the program.
- class ipyparallel.cluster.launcher.MPIEngineSetLauncher(**kwargs: Any)#
Launch engines using mpiexec
- property program#
The program to start via mpiexec.
- property program_args#
The command line argument to the program.
- start(n)#
Start n engines by profile or profile_dir.
- class ipyparallel.cluster.launcher.MPIExecControllerLauncher(**kwargs: Any)#
Deprecated, use MPIControllerLauncher
- class ipyparallel.cluster.launcher.MPIExecEngineSetLauncher(**kwargs: Any)#
Deprecated, use MPIEngineSetLauncher
- class ipyparallel.cluster.launcher.MPILauncher(**kwargs: Any)#
Launch an external process using mpiexec.
- find_args()#
Build self.args using all the fields.
- mpi_args#
The command line arguments to pass to mpiexec.
- mpi_cmd#
The mpiexec command to use in starting the process.
- program#
The program to start via mpiexec.
- program_args#
The command line argument to the program.
- start(n=1)#
Start n instances of the program using mpiexec.
- exception ipyparallel.cluster.launcher.NotRunning#
Raised when a launcher is no longer running
- class ipyparallel.cluster.launcher.PBSControllerLauncher(**kwargs: Any)#
Launch a controller using PBS.
- batch_file_name#
batch file name for the controller job.
- class ipyparallel.cluster.launcher.PBSEngineSetLauncher(**kwargs: Any)#
Launch Engines using PBS
- batch_file_name#
batch file name for the engine(s) job.
- class ipyparallel.cluster.launcher.PBSLauncher(**kwargs: Any)#
A BatchSystemLauncher subclass for PBS.
- delete_command#
The PBS delete command [‘qdel’]
- job_id_regexp#
Regular expresion for identifying the job ID [r’d+’]
- signal_command#
The PBS signal command [‘qsig’]
- submit_command#
The PBS submit command [‘qsub’]
- exception ipyparallel.cluster.launcher.ProcessStateError#
- class ipyparallel.cluster.launcher.SGEControllerLauncher(**kwargs: Any)#
Launch a controller using SGE.
- batch_file_name#
batch file name for the ipontroller job.
- class ipyparallel.cluster.launcher.SGEEngineSetLauncher(**kwargs: Any)#
Launch Engines with SGE
- batch_file_name#
batch file name for the engine(s) job.
- class ipyparallel.cluster.launcher.SGELauncher(**kwargs: Any)#
Sun GridEngine is a PBS clone with slightly different syntax
- class ipyparallel.cluster.launcher.SSHControllerLauncher(**kwargs: Any)#
- property program#
Program to launch via ssh
- property program_args#
args to pass to remote program
- class ipyparallel.cluster.launcher.SSHEngineLauncher(**kwargs: Any)#
- property program#
Program to launch via ssh
- property program_args#
args to pass to remote program
- class ipyparallel.cluster.launcher.SSHEngineSetLauncher(**kwargs: Any)#
- engines#
dict of engines to launch. This is a dict by hostname of ints, corresponding to the number of engines to start on that host.
- launcher_class#
alias of
SSHEngineLauncher
- start(n)#
Start engines by profile or profile_dir.
n
is an upper limit of engines. Theengines
config property is used to assign slots to hosts.
- class ipyparallel.cluster.launcher.SSHLauncher(**kwargs: Any)#
A minimal launcher for ssh.
To be useful this will probably have to be extended to use the
sshx
idea for environment variables. There could be other things this needs as well.- property cluster_env#
Cluster-related env variables
- fetch_files()#
fetch remote files (called after start)
- find_args()#
The
.args
property calls this to find the args list.Subcommand should implement this to construct the cmd and args.
- get_output(remove=False)#
Retrieve engine output from the remote file
- hostname#
hostname on which to launch the program
- async join(timeout=None)#
Wait for the process to exit
- location#
user@hostname location for ssh in one setting
- poll()#
Override poll
- program#
Program to launch via ssh
- program_args#
args to pass to remote program
- property remote_connection_files#
Return remote paths for connection files
- remote_output_file#
The remote file to store output
- remote_profile_dir#
The remote profile_dir to use.
If not specified, use calling profile, stripping out possible leading homedir.
- remote_python#
Remote path to Python interpreter, if needed
- scp_args#
args to pass to scp
- scp_cmd#
command for sending files
- send_files()#
send our files (called before start)
- signal(sig)#
Signal the process.
- ssh_args#
args to pass to ssh
- ssh_cmd#
command for starting ssh
- start(hostname=None, user=None, port=None)#
Start the process.
Should be an
async def
coroutine.When start completes, the process should be requested (it need not be running yet), and waiting should begin in the background such that
notify_stop()
will be called when the process finishes.
- to_fetch#
List of (remote, local) files to fetch after starting
- to_send#
List of (local, remote) files to send before starting
- user#
username for ssh
- class ipyparallel.cluster.launcher.SSHProxyEngineSetLauncher(**kwargs: Any)#
Launcher for calling
ipcluster engines
on a remote machine.Requires that remote profile is already configured.
- ipcluster_args#
Extra CLI arguments to pass to ipcluster engines
- property program#
Program to launch via ssh
- property program_args#
args to pass to remote program
- start(n)#
Start the process.
Should be an
async def
coroutine.When start completes, the process should be requested (it need not be running yet), and waiting should begin in the background such that
notify_stop()
will be called when the process finishes.
- class ipyparallel.cluster.launcher.SlurmControllerLauncher(**kwargs: Any)#
Launch a controller using Slurm.
- batch_file_name#
batch file name for the controller job.
- class ipyparallel.cluster.launcher.SlurmEngineSetLauncher(**kwargs: Any)#
Launch Engines using Slurm
- batch_file_name#
batch file name for the engine(s) job.
- class ipyparallel.cluster.launcher.SlurmLauncher(**kwargs: Any)#
A BatchSystemLauncher subclass for slurm.
- account#
Slurm account to be used
- delete_command#
The slurm delete command [‘scancel’]
- job_id_regexp#
Regular expresion for identifying the job ID [r’d+’]
- options#
Extra Slurm options
- qos#
Slurm QoS to be used
- signal_command#
The slurm signal command [‘scancel’, ‘-s’]
- submit_command#
The slurm submit command [‘sbatch’]
- timelimit#
Slurm timelimit to be used
- exception ipyparallel.cluster.launcher.UnknownStatus#
- class ipyparallel.cluster.launcher.WindowsHPCControllerLauncher(**kwargs: Any)#
- controller_args#
extra args to pass to ipcontroller
- job_file_name#
WinHPC xml job file.
- class ipyparallel.cluster.launcher.WindowsHPCEngineSetLauncher(**kwargs: Any)#
- engine_args#
extra args to pas to ipengine
- job_file_name#
jobfile for ipengines job
- start(n)#
Start the controller by profile_dir.
- class ipyparallel.cluster.launcher.WindowsHPCLauncher(**kwargs: Any)#
- find_args()#
The
.args
property calls this to find the args list.Subcommand should implement this to construct the cmd and args.
- job_cmd#
The command for submitting jobs.
- job_file_name#
The filename of the instantiated job script.
- job_id_regexp#
A regular expression used to get the job id from the output of the submit_command.
- parse_job_id(output)#
Take the output of the submit command and return the job id.
- scheduler#
The hostname of the scheduler to submit the job to.
- start(n)#
Start n copies of the process using the Win HPC job scheduler.
- stop()#
Stop the process and notify observers of stopping.
This method should be an
async def
coroutine, and return only after the process has stopped.All resources should be cleaned up by the time this returns.
- ipyparallel.cluster.launcher.abbreviate_launcher_class(cls)#
Abbreviate a launcher class back to its entrypoint name
- ipyparallel.cluster.launcher.find_launcher_class(name, kind)#
Return a launcher class for a given name and kind.
- ipyparallel.cluster.launcher.ssh_waitpid(pid, timeout=None)#
To be called on a remote host, waiting on a pid
- ipyparallel.cluster.launcher.sshx(ssh_cmd, cmd, env, remote_output_file, log=None)#
Launch a remote process, returning its remote pid
Uses nohup and pipes to put it in the background