Note
This documentation is for a development version of IPython. There may be significant differences from the latest stable release.
API Reference#
- ipyparallel.version_info = (8, 7, 0, 'dev')#
Built-in immutable sequence.
If no argument is given, the constructor returns an empty tuple. If iterable is specified the tuple is initialized from iterable’s items.
If the argument is a tuple, the return value is the same object.
The IPython parallel version as a tuple of integers. There will always be 3 integers. Development releases will have ‘dev’ as a fourth element.
Classes#
- class ipyparallel.Cluster(*, cluster_file: str = '', cluster_id: str = '', config: Instance = {}, controller_args: list = [], controller_ip: str = '', controller: Launcher = <class 'ipyparallel.cluster.launcher.LocalControllerLauncher'>, controller_location: str = '', delay: float = 1.0, engines: Launcher = <class 'ipyparallel.cluster.launcher.LocalEngineSetLauncher'>, engine_timeout: int = 60, load_profile: bool = True, log: Any = None, log_level: int = 20, n: int = None, parent: Instance = None, profile: str = '', profile_dir: str = '', send_engines_connection_env: bool = True, shutdown_atexit: bool = True)#
Class representing an IPP cluster
i.e. one controller and one or more groups of engines
Can start/stop/monitor/poll cluster resources
All async methods can be called synchronously with a
_sync
suffix, e.g.cluster.start_cluster_sync()
Changed in version 8.0: controller and engine launcher classes can be specified via
Cluster(controller='ssh', engines='mpi')
without the_launcher_class
suffix.- cluster_file Unicode('')#
The path to the cluster file for saving this cluster to disk
- cluster_id Unicode('')#
The id of the cluster (default: random string)
- async connect_client(**client_kwargs)#
Return a client connected to the cluster
- controller_args c.Cluster.controller_args = List()#
Additional CLI args to pass to the controller.
- controller_ip c.Cluster.controller_ip = Unicode('')#
Set the IP address of the controller.
- controller_launcher_class c.Cluster.controller_launcher_class = Launcher(<class 'ipyparallel.cluster.launcher.LocalControllerLauncher'>)#
- The class for launching a Controller. Change this value if you want
your controller to also be launched by a batch system, such as PBS,SGE,MPI,etc.
Each launcher class has its own set of configuration options, for making sure it will work in your environment.
Note that using a batch launcher for the controller does not put it in the same batch job as the engines, so they will still start separately.
Third-party engine launchers can be registered via
ipyparallel.engine_launchers
entry point.They can be selected via case-insensitive abbreviation, e.g.
c.Cluster.controller_launcher_class = ‘SSH’
or:
ipcluster start –controller=MPI
- Currently installed:
batch: ipyparallel.cluster.launcher.BatchControllerLauncher
htcondor: ipyparallel.cluster.launcher.HTCondorControllerLauncher
local: ipyparallel.cluster.launcher.LocalControllerLauncher
lsf: ipyparallel.cluster.launcher.LSFControllerLauncher
mpi: ipyparallel.cluster.launcher.MPIControllerLauncher
pbs: ipyparallel.cluster.launcher.PBSControllerLauncher
sge: ipyparallel.cluster.launcher.SGEControllerLauncher
slurm: ipyparallel.cluster.launcher.SlurmControllerLauncher
ssh: ipyparallel.cluster.launcher.SSHControllerLauncher
winhpc: ipyparallel.cluster.launcher.WindowsHPCControllerLauncher
- controller_location c.Cluster.controller_location = Unicode('')#
Set the location (hostname or ip) of the controller.
This is used by engines and clients to locate the controller when the controller listens on all interfaces
- delay c.Cluster.delay = Float(1.0)#
delay (in s) between starting the controller and the engines
- engine_launcher_class c.Cluster.engine_launcher_class = Launcher(<class 'ipyparallel.cluster.launcher.LocalEngineSetLauncher'>)#
- The class for launching a set of Engines. Change this value
to use various batch systems to launch your engines, such as PBS,SGE,MPI,etc. Each launcher class has its own set of configuration options, for making sure it will work in your environment.
Third-party engine launchers can be registered via
ipyparallel.engine_launchers
entry point.They can be selected via case-insensitive abbreviation, e.g.
c.Cluster.engine_launcher_class = ‘ssh’
or:
ipcluster start –engines=mpi
- Currently installed:
batch: ipyparallel.cluster.launcher.BatchEngineSetLauncher
htcondor: ipyparallel.cluster.launcher.HTCondorEngineSetLauncher
local: ipyparallel.cluster.launcher.LocalEngineSetLauncher
lsf: ipyparallel.cluster.launcher.LSFEngineSetLauncher
mpi: ipyparallel.cluster.launcher.MPIEngineSetLauncher
pbs: ipyparallel.cluster.launcher.PBSEngineSetLauncher
sge: ipyparallel.cluster.launcher.SGEEngineSetLauncher
slurm: ipyparallel.cluster.launcher.SlurmEngineSetLauncher
ssh: ipyparallel.cluster.launcher.SSHEngineSetLauncher
sshproxy: ipyparallel.cluster.launcher.SSHProxyEngineSetLauncher
winhpc: ipyparallel.cluster.launcher.WindowsHPCEngineSetLauncher
- property engine_set#
Return the first engine set
Most clusters have only one engine set, which is tedious to get to via the
engines
dict with random engine set ids...versionadded:: 8.0
- engine_timeout c.Cluster.engine_timeout = Int(60)#
Timeout to use when waiting for engines to register
before giving up.
- classmethod from_dict(d, **kwargs)#
Construct a Cluster from serialized state
- classmethod from_file(cluster_file=None, *, profile=None, profile_dir=None, cluster_id='', **kwargs)#
Load a Cluster object from a file
Can specify a full path, or combination of profile, profile_dir, and/or cluster_id.
With no arguments given, it will connect to a cluster created with
ipcluster start
.
- load_profile c.Cluster.load_profile = Bool(True)#
If True (default) load ipcluster config from profile directory, if present.
- n c.Cluster.n = Int(None)#
The number of engines to start
- profile Unicode('')#
The profile name, a shortcut for specifying profile_dir within $IPYTHONDIR.
- profile_dir Unicode('')#
The profile directory.
Default priority:
specified explicitly
current IPython session
use profile name (default: ‘default’)
- remove_cluster_file()#
Remove my cluster file.
- async restart_engine(engine_id)#
Restart one engine
May stop all engines in a set, depending on EngineSet features (e.g. mpiexec)
- async restart_engines(engine_set_id=None)#
Restart an engine set
- send_engines_connection_env c.Cluster.send_engines_connection_env = Bool(True)#
Wait for controller’s connection info before passing to engines via $IPP_CONNECTION_INFO environment variable.
Set to False to start engines immediately without waiting for the controller’s connection info to be available.
When True, no connection file movement is required. False is mainly useful when submitting the controller may take a long time in a job queue, and the engines should enter the queue before the controller is running.
New in version 8.0.
- shutdown_atexit Bool(True)#
Shutdown the cluster at process exit.
Set to False if you want to launch a cluster and leave it running after the launching process exits.
- async signal_engine(signum, engine_id)#
Signal one engine
May signal all engines in a set, depending on EngineSet features (e.g. mpiexec)
- async signal_engines(signum, engine_set_id=None)#
Signal all engines in a set
If no engine set is specified, signal all engine sets.
- async start_and_connect(n=None, activate=False)#
Single call to start a cluster and connect a client
If
activate
is given, a blocking DirectView on all engines will be created and activated, registering%px
magics for use in IPythonExample:
rc = await Cluster(engines="mpi").start_and_connect(n=8, activate=True) %px print("hello, world!")
Equivalent to:
await self.start_cluster(n) client = await self.connect_client() await client.wait_for_engines(n, block=False)
New in version 7.1.
New in version 8.1: activate argument.
- async start_cluster(n=None)#
Start a cluster
starts one controller and n engines (default: self.n)
Changed in version 7.1: return self, to allow method chaining
- async start_controller(**kwargs)#
Start the controller
Keyword arguments are passed to the controller launcher constructor
- async start_engines(n=None, engine_set_id=None, **kwargs)#
Start an engine set
Returns an engine set id which can be used in stop_engines
- async stop_cluster()#
Stop the controller and all engines
- async stop_controller()#
Stop the controller
- async stop_engine(engine_id)#
Stop one engine
May stop all engines in a set, depending on EngineSet features (e.g. mpiexec)
- async stop_engines(engine_set_id=None)#
Stop an engine set
If engine_set_id is not given, all engines are stopped.
- to_dict()#
Serialize a Cluster object for later reconstruction
- update_cluster_file()#
Update my cluster file
If cluster_file is disabled, do nothing If cluster is fully stopped, remove the file
- write_cluster_file()#
Write cluster info to disk for later loading
- class ipyparallel.Client(*args, **kw)#
A semi-synchronous client to an IPython parallel cluster
- Parameters
connection_info (str or dict) – The path to ipcontroller-client.json, or a dict containing the same information. This JSON file should contain all the information needed to connect to a cluster, and is usually the only argument needed. [Default: use profile]
profile (str) – The name of the Cluster profile to be used to find connector information. If run from an IPython application, the default profile will be the same as the running application, otherwise it will be ‘default’.
cluster_id (str) – String id to added to runtime files, to prevent name collisions when using multiple clusters with a single profile simultaneously. When set, will look for files named like: ‘ipcontroller-<cluster_id>-client.json’ Since this is text inserted into filenames, typical recommendations apply: Simple character strings are ideal, and spaces are not recommended (but should generally work)
context (zmq.Context) – Pass an existing zmq.Context instance, otherwise the client will create its own.
debug (bool) – flag for lots of message printing for debug purposes
timeout (float) – time (in seconds) to wait for connection replies from the Hub [Default: 10]
sshserver (str) – A string of the form passed to ssh, i.e. ‘server.tld’ or ‘user@server.tld:port’ If keyfile or password is specified, and this is not, it will default to the ip given in addr.
sshkey (str; path to ssh private key file) – This specifies a key to be used in ssh login, default None. Regular default ssh keys will be used without specifying this argument.
password (str) – Your ssh password to sshserver. Note that if this is left None, you will be prompted for it if passwordless key based login is unavailable.
paramiko (bool) – flag for whether to use paramiko instead of shell ssh for tunneling. [default: True on win32, False else]
- ids#
requesting the ids attribute always synchronizes the registration state. To request ids without synchronization, use semi-private _ids attributes.
- Type
list of int engine IDs
- history#
a list of msg_ids, keeping track of all the execution messages you have submitted in order.
- Type
list of msg_ids
- outstanding#
a set of msg_ids that have been submitted, but whose results have not yet been received.
- Type
set of msg_ids
- abort(jobs=None, targets=None, block=None)#
Abort specific jobs from the execution queues of target(s).
This is a mechanism to prevent jobs that have already been submitted from executing. To halt a running job, you must interrupt the engine(s) by sending a signal. This can be done via os.kill for local engines, or
Cluster.signal_engines()
for multiple engines.- Parameters
jobs (msg_id, list of msg_ids, or AsyncResult) –
The jobs to be aborted
If unspecified/None: abort all outstanding jobs.
- activate(targets='all', suffix='')#
Create a DirectView and register it with IPython magics
Defines the magics
%px, %autopx, %pxresult, %%px
- Parameters
targets (int, list of ints, or 'all') – The engines on which the view’s magics will run
suffix (str [default: '']) –
The suffix, if any, for the magics. This allows you to have multiple views associated with parallel magics at the same time.
e.g.
rc.activate(targets=0, suffix='0')
will give you the magics%px0
,%pxresult0
, etc. for running magics just on engine 0.
- become_dask(targets='all', port=0, nanny=False, scheduler_args=None, **worker_args)#
Turn the IPython cluster into a dask.distributed cluster
- Parameters
targets (target spec (default: all)) – Which engines to turn into dask workers.
port (int (default: random)) – Which port
nanny (bool (default: False)) – Whether to start workers as subprocesses instead of in the engine process. Using a nanny allows restarting the worker processes via
executor.restart
.scheduler_args (dict) – Keyword arguments (e.g. ip) to pass to the distributed.Scheduler constructor.
**worker_args – Any additional keyword arguments (e.g. nthreads) are passed to the distributed.Worker constructor.
- Returns
A dask.distributed.Client connected to the dask cluster.
- Return type
client = distributed.Client
- become_distributed(targets='all', port=0, nanny=False, scheduler_args=None, **worker_args)#
Turn the IPython cluster into a dask.distributed cluster
- Parameters
targets (target spec (default: all)) – Which engines to turn into dask workers.
port (int (default: random)) – Which port
nanny (bool (default: False)) – Whether to start workers as subprocesses instead of in the engine process. Using a nanny allows restarting the worker processes via
executor.restart
.scheduler_args (dict) – Keyword arguments (e.g. ip) to pass to the distributed.Scheduler constructor.
**worker_args – Any additional keyword arguments (e.g. nthreads) are passed to the distributed.Worker constructor.
- Returns
A dask.distributed.Client connected to the dask cluster.
- Return type
client = distributed.Client
- broadcast_view(targets='all', is_coalescing=False, **kwargs)#
construct a BroadCastView object. If no arguments are specified, create a BroadCastView using all engines using all engines.
- clear(targets=None, block=None)#
Clear the namespace in target(s).
- close(linger=None)#
Close my zmq Sockets
If
linger
, set the zmq LINGER socket option, which allows discarding of messages.
- db_query(query, keys=None)#
Query the Hub’s TaskRecord database
This will return a list of task record dicts that match
query
- Parameters
query (mongodb query dict) – The search dict. See mongodb query docs for details.
keys (list of strs [optional]) – The subset of keys to be returned. The default is to fetch everything but buffers. ‘msg_id’ will always be included.
- direct_view(targets='all', **kwargs)#
construct a DirectView object.
If no targets are specified, create a DirectView using all engines.
rc.direct_view(‘all’) is distinguished from rc[:] in that ‘all’ will evaluate the target engines at each execution, whereas rc[:] will connect to all current engines, and that list will not change.
That is, ‘all’ will always use all engines, whereas rc[:] will not use engines added after the DirectView is constructed.
- executor(targets=None)#
Construct a PEP-3148 Executor with a LoadBalancedView
- get_result(indices_or_msg_ids=None, block=None, owner=True)#
Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
If the client already has the results, no request to the Hub will be made.
This is a convenient way to construct AsyncResult objects, which are wrappers that include metadata about execution, and allow for awaiting results that were not submitted by this Client.
It can also be a convenient way to retrieve the metadata associated with blocking execution, since it always retrieves
Examples
In [10]: r = client.apply()
- Parameters
indices_or_msg_ids (integer history index, str msg_id, AsyncResult,) – or a list of same. The indices or msg_ids of indices to be retrieved
block (bool) – Whether to wait for the result to be done
owner (bool [default: True]) – Whether this AsyncResult should own the result. If so, calling
ar.get()
will remove data from the client’s result and metadata cache. There should only be one owner of any given msg_id.
- Returns
AsyncResult – A single AsyncResult object will always be returned.
AsyncHubResult – A subclass of AsyncResult that retrieves results from the Hub
- hub_history()#
Get the Hub’s history
Just like the Client, the Hub has a history, which is a list of msg_ids. This will contain the history of all clients, and, depending on configuration, may contain history across multiple cluster sessions.
Any msg_id returned here is a valid argument to
get_result
.- Returns
msg_ids – list of all msg_ids, ordered by task submission time.
- Return type
list of strs
- load_balanced_view(targets=None, **kwargs)#
construct a DirectView object.
If no arguments are specified, create a LoadBalancedView using all engines.
- purge_everything()#
Clears all content from previous Tasks from both the hub and the local client
In addition to calling
purge_results("all")
it also deletes the history and other bookkeeping lists.
- purge_hub_results(jobs=[], targets=[])#
Tell the Hub to forget results.
Individual results can be purged by msg_id, or the entire history of specific targets can be purged.
Use
purge_results('all')
to scrub everything from the Hub’s db.
- purge_local_results(jobs=[], targets=[])#
Clears the client caches of results and their metadata.
Individual results can be purged by msg_id, or the entire history of specific targets can be purged.
Use
purge_local_results('all')
to scrub everything from the Clients’s results and metadata caches.After this call all
AsyncResults
are invalid and should be discarded.If you must “reget” the results, you can still do so by using
client.get_result(msg_id)
orclient.get_result(asyncresult)
. This will redownload the results from the hub if they are still available (i.eclient.purge_hub_results(...)
has not been called.- Parameters
:raises RuntimeError : if any of the tasks to be purged are still outstanding.:
- purge_results(jobs=[], targets=[])#
Clears the cached results from both the hub and the local client
Individual results can be purged by msg_id, or the entire history of specific targets can be purged.
Use
purge_results('all')
to scrub every cached result from both the Hub’s and the Client’s db.Equivalent to calling both
purge_hub_results()
andpurge_client_results()
with the same arguments.
- queue_status(targets='all', verbose=False)#
Fetch the status of engine queues.
- Parameters
targets (int/str/list of ints/strs) – the engines whose states are to be queried. default : all
verbose (bool) – Whether to return lengths only, or lists of ids for each element
- resubmit(indices_or_msg_ids=None, metadata=None, block=None)#
Resubmit one or more tasks.
in-flight tasks may not be resubmitted.
- result_status(msg_ids, status_only=True)#
Check on the status of the result(s) of the apply request with
msg_ids
.If status_only is False, then the actual results will be retrieved, else only the status of the results will be checked.
- Parameters
- Returns
results – There will always be the keys ‘pending’ and ‘completed’, which will be lists of msg_ids that are incomplete or complete. If
status_only
is False, then completed results will be keyed by theirmsg_id
.- Return type
- send_apply_request(socket, f, args=None, kwargs=None, metadata=None, track=False, ident=None, message_future_hook=None)#
construct and send an apply message via a socket.
This is the principal method with which all engine execution is performed by views.
- send_execute_request(socket, code, silent=True, metadata=None, ident=None, message_future_hook=None)#
construct and send an execute request via a socket.
- send_signal(sig, targets=None, block=None)#
Send a signal target(s).
- shutdown(targets='all', restart=False, hub=False, block=None)#
Terminates one or more engine processes, optionally including the hub.
- Parameters
targets (list of ints or 'all' [default: all]) – Which engines to shutdown.
hub (bool [default: False]) – Whether to include the Hub. hub=True implies targets=’all’.
block (bool [default: self.block]) – Whether to wait for clean shutdown replies or not.
restart (bool [default: False]) – NOT IMPLEMENTED whether to restart engines after shutting them down.
- spin()#
DEPRECATED, DOES NOTHING
- spin_thread(interval=1)#
DEPRECATED, DOES NOTHING
- stop_dask(targets='all')#
Stop the distributed Scheduler and Workers started by become_dask.
- Parameters
targets (target spec (default: all)) – Which engines to stop dask workers on.
- stop_distributed(targets='all')#
Stop the distributed Scheduler and Workers started by become_dask.
- Parameters
targets (target spec (default: all)) – Which engines to stop dask workers on.
- stop_spin_thread()#
DEPRECATED, DOES NOTHING
- wait(jobs=None, timeout=-1)#
waits on one or more
jobs
, for up totimeout
seconds.- Parameters
- Returns
True (when all msg_ids are done)
False (timeout reached, some msg_ids still outstanding)
- wait_for_engines(n=None, *, timeout=-1, block=True, interactive=None, widget=None)#
Wait for
n
engines to become available.Returns when
n
engines are available, or raises TimeoutError iftimeout
is reached beforen
engines are ready.- Parameters
n (int) – Number of engines to wait for.
timeout (float) – Time (in seconds) to wait before raising a TimeoutError
block (bool) – if False, return Future instead of waiting
interactive (bool) – default: True if in IPython, False otherwise. if True, show a progress bar while waiting for engines
widget (bool) – default: True if in an IPython kernel (notebook), False otherwise. Only has an effect if
interactive
is True. if True, forces use of widget progress bar. If False, forces use of terminal tqdm.
- Returns
f – Future object to wait on if block is False, None if block is True.
- Return type
concurrent.futures.Future or None
:raises TimeoutError : if timeout is reached.:
- wait_interactive(jobs=None, interval=1.0, timeout=-1.0)#
Wait interactively for jobs
If no job is specified, will wait for all outstanding jobs to complete.
- class ipyparallel.DirectView(**kwargs: Any)#
Direct Multiplexer View of one or more engines.
These are created via indexed access to a client:
>>> dv_1 = client[1] >>> dv_all = client[:] >>> dv_even = client[::2] >>> dv_some = client[1:3]
This object provides dictionary access to engine namespaces:
# push a=5: >>> dv[‘a’] = 5 # pull ‘foo’: >>> dv[‘foo’]
- abort(jobs=None, targets=None, block=None)#
Abort jobs on my engines.
Note: only jobs that have not started yet can be aborted. To halt a running job, you must interrupt the engine(s) via the Cluster API.
- activate(suffix='')#
Activate IPython magics associated with this View
Defines the magics
%px, %autopx, %pxresult, %%px, %pxconfig
- Parameters
suffix (str [default: '']) –
The suffix, if any, for the magics. This allows you to have multiple views associated with parallel magics at the same time.
e.g.
rc[::2].activate(suffix='_even')
will give you the magics%px_even
,%pxresult_even
, etc. for running magics on the even engines.
- apply(_View__ipp_f, *args, **kwargs)#
calls
f(*args, **kwargs)
on remote engines, returning the result.This method sets all apply flags via this View’s attributes.
Returns
AsyncResult
instance ifself.block
is False, otherwise the return value off(*args, **kwargs)
.
- apply_async(_View__ipp_f, *args, **kwargs)#
calls
f(*args, **kwargs)
on remote engines in a nonblocking manner.Returns
AsyncResult
instance.
- apply_sync(_View__ipp_f, *args, **kwargs)#
calls
f(*args, **kwargs)
on remote engines in a blocking manner, returning the result.
- clear(targets=None, block=None)#
Clear the remote namespaces on my engines.
- execute(code, silent=True, targets=None, block=None)#
Executes
code
ontargets
in blocking or nonblocking manner.execute
is alwaysbound
(affects engine namespace)
- gather(key, dist='b', targets=None, block=None)#
Gather a partitioned sequence on a set of engines as a single local seq.
- get(key_s)#
get object(s) by
key_s
from remote namespacesee
pull
for details.
- get_result(indices_or_msg_ids=None, block=None, owner=False)#
return one or more results, specified by history index or msg_id.
See
ipyparallel.client.client.Client.get_result()
for details.
- imap(f, *sequences, **kwargs)#
Parallel version of
itertools.imap()
.See
self.map
for details.
- property importer#
sync_imports(local=True) as a property.
See sync_imports for details.
- map(f, *sequences, block=None, track=False, return_exceptions=False)#
Parallel version of builtin
map
, using this View’stargets
.There will be one task per target, so work will be chunked if the sequences are longer than
targets
.Results can be iterated as they are ready, but will become available in chunks.
New in version 7.0:
return_exceptions
- Parameters
f (callable) – function to be mapped
*sequences (one or more sequences of matching length) – the sequences to be distributed and passed to
f
block (bool [default self.block]) – whether to wait for the result or not
track (bool [default False]) – Track underlying zmq send to indicate when it is safe to modify memory. Only for zero-copy sends such as numpy arrays that are going to be modified in-place.
return_exceptions (bool [default False]) – Return remote Exceptions in the result sequence instead of raising them.
- Returns
If block=False – An
AsyncMapResult
instance. An object like AsyncResult, but which reassembles the sequence of results into a single list. AsyncMapResults can be iterated through before all results are complete.else – A list, the result of
map(f,*sequences)
- map_async(f, *sequences, **kwargs)#
Parallel version of builtin
map()
, using this view’s engines.This is equivalent to
map(...block=False)
.See
self.map
for details.
- map_sync(f, *sequences, **kwargs)#
Parallel version of builtin
map()
, using this view’s engines.This is equivalent to
map(...block=True)
.See
self.map
for details.
- parallel(dist='b', block=None, **flags)#
Decorator for making a ParallelFunction
- pull(names, targets=None, block=None)#
get object(s) by
name
from remote namespacewill return one object if it is a key. can also take a list of keys, in which case it will return a list of objects.
- purge_results(jobs=[], targets=[])#
Instruct the controller to forget specific results.
- push(ns, targets=None, block=None, track=None)#
update remote namespace with dict
ns
- queue_status(targets=None, verbose=False)#
Fetch the Queue status of my engines
- remote(block=None, **flags)#
Decorator for making a RemoteFunction
- run(filename, targets=None, block=None)#
Execute contents of
filename
on my engine(s).This simply reads the contents of the file and calls
execute
.
- scatter(key, seq, dist='b', flatten=False, targets=None, block=None, track=None)#
Partition a Python sequence and send the partitions to a set of engines.
- set_flags(**kwargs)#
set my attribute flags by keyword.
Views determine behavior with a few attributes (
block
,track
, etc.). These attributes can be set all at once by name with this method.
- shutdown(targets=None, restart=False, hub=False, block=None)#
Terminates one or more engine processes, optionally including the hub.
- spin()#
spin the client, and sync
- sync_imports(local=True, quiet=False)#
Context Manager for performing simultaneous local and remote imports.
‘import x as y’ will not work. The ‘as y’ part will simply be ignored.
If
local=True
, then the package will also be imported locally.If
quiet=True
, no output will be produced when attempting remote imports.Note that remote-only (
local=False
) imports have not been implemented.>>> with view.sync_imports(): ... from numpy import recarray importing recarray from numpy on engine(s)
- temp_flags(**kwargs)#
temporarily set flags, for use in
with
statements.See set_flags for permanent setting of flags
Examples
>>> view.track=False ... >>> with view.temp_flags(track=True): ... ar = view.apply(dostuff, my_big_array) ... ar.tracker.wait() # wait for send to finish >>> view.track False
- update(ns)#
update remote namespace with dict
ns
See
push
for details.
- use_cloudpickle()#
Expand serialization support with cloudpickle.
This calls ipyparallel.serialize.use_cloudpickle() here and on each engine.
- use_dill()#
Expand serialization support with dill
adds support for closures, etc.
This calls ipyparallel.serialize.use_dill() here and on each engine.
- use_pickle()#
Restore
This reverts changes to serialization caused by
use_dill|.cloudpickle
.
- wait(jobs=None, timeout=-1)#
waits on one or more
jobs
, for up totimeout
seconds.- Parameters
- Returns
True (when all msg_ids are done)
False (timeout reached, some msg_ids still outstanding)
- class ipyparallel.LoadBalancedView(**kwargs: Any)#
An load-balancing View that only executes via the Task scheduler.
Load-balanced views can be created with the client’s
view
method:>>> v = client.load_balanced_view()
or targets can be specified, to restrict the potential destinations:
>>> v = client.load_balanced_view([1,3])
which would restrict loadbalancing to between engines 1 and 3.
- abort(jobs=None, targets=None, block=None)#
Abort jobs on my engines.
Note: only jobs that have not started yet can be aborted. To halt a running job, you must interrupt the engine(s) via the Cluster API.
- apply(_View__ipp_f, *args, **kwargs)#
calls
f(*args, **kwargs)
on remote engines, returning the result.This method sets all apply flags via this View’s attributes.
Returns
AsyncResult
instance ifself.block
is False, otherwise the return value off(*args, **kwargs)
.
- apply_async(_View__ipp_f, *args, **kwargs)#
calls
f(*args, **kwargs)
on remote engines in a nonblocking manner.Returns
AsyncResult
instance.
- apply_sync(_View__ipp_f, *args, **kwargs)#
calls
f(*args, **kwargs)
on remote engines in a blocking manner, returning the result.
- get_result(indices_or_msg_ids=None, block=None, owner=False)#
return one or more results, specified by history index or msg_id.
See
ipyparallel.client.client.Client.get_result()
for details.
- imap(f, *sequences, ordered=True, max_outstanding='auto', return_exceptions=False)#
Parallel version of lazily-evaluated
imap
, load-balanced by this View.ordered
, andmax_outstanding
can be specified by keyword only.Unlike other map functions in IPython Parallel, this one does not consume the full iterable before submitting work, returning a single ‘AsyncMapResult’ representing the full computation.
Instead, it consumes iterables as they come, submitting up to
max_outstanding
tasks to the cluster before waiting on results (default: one task per engine). This allows it to work with infinite generators, and avoid potentially expensive read-ahead for large streams of inputs that may not fit in memory all at once.New in version 7.0.
- Parameters
f (callable) – function to be mapped
*sequences (one or more sequences of matching length) – the sequences to be distributed and passed to
f
ordered (bool [default True]) – Whether the results should be yielded on a first-come-first-yield basis, or preserve the order of submission.
max_outstanding (int [default len(engines)]) –
The maximum number of tasks to be outstanding.
max_outstanding=0 will greedily consume the whole generator (map_async may be more efficient).
A limit of 1 should be strictly worse than running a local map, as there will be no parallelism.
Use this to tune how greedily input generator should be consumed.
return_exceptions (bool [default False]) – Return Exceptions instead of raising them.
- Returns
lazily-evaluated generator, yielding results of
f
on each item of sequences.Yield-order depends on
ordered
argument.
- map(f, *sequences, block=None, chunksize=1, ordered=True, return_exceptions=False)#
Parallel version of builtin
map
, load-balanced by this View.Each
chunksize
elements will be a separate task, and will be load-balanced. This lets individual elements be available for iteration as soon as they arrive.New in version 7.0:
return_exceptions
- Parameters
f (callable) – function to be mapped
*sequences (one or more sequences of matching length) – the sequences to be distributed and passed to
f
block (bool [default self.block]) – whether to wait for the result or not
chunksize (int [default 1]) – how many elements should be in each task.
ordered (bool [default True]) –
Whether the results should be gathered as they arrive, or enforce the order of submission.
Only applies when iterating through AsyncMapResult as results arrive. Has no effect when block=True.
return_exceptions (bool [default False]) – Return Exceptions instead of raising on the first exception.
- Returns
if block=False – An
AsyncMapResult
instance. An object like AsyncResult, but which reassembles the sequence of results into a single list. AsyncMapResults can be iterated through before all results are complete.else – A list, the result of
map(f,*sequences)
- map_async(f, *sequences, **kwargs)#
Parallel version of builtin
map()
, using this view’s engines.This is equivalent to
map(...block=False)
.See
self.map
for details.
- map_sync(f, *sequences, **kwargs)#
Parallel version of builtin
map()
, using this view’s engines.This is equivalent to
map(...block=True)
.See
self.map
for details.
- parallel(dist='b', block=None, **flags)#
Decorator for making a ParallelFunction
- purge_results(jobs=[], targets=[])#
Instruct the controller to forget specific results.
- queue_status(targets=None, verbose=False)#
Fetch the Queue status of my engines
- register_joblib_backend(name='ipyparallel', make_default=False)#
Register this View as a joblib parallel backend
To make this the default backend, set make_default=True.
Use with:
p = Parallel(backend='ipyparallel') ...
See joblib docs for details
Requires joblib >= 0.10
New in version 5.1.
- remote(block=None, **flags)#
Decorator for making a RemoteFunction
- set_flags(**kwargs)#
set my attribute flags by keyword.
A View is a wrapper for the Client’s apply method, but with attributes that specify keyword arguments, those attributes can be set by keyword argument with this method.
- Parameters
block (bool) – whether to wait for results
track (bool) – whether to create a MessageTracker to allow the user to safely edit after arrays and buffers during non-copying sends.
after (Dependency or collection of msg_ids) – Only for load-balanced execution (targets=None) Specify a list of msg_ids as a time-based dependency. This job will only be run after the dependencies have been met.
follow (Dependency or collection of msg_ids) – Only for load-balanced execution (targets=None) Specify a list of msg_ids as a location-based dependency. This job will only be run on an engine where this dependency is met.
timeout (float/int or None) – Only for load-balanced execution (targets=None) Specify an amount of time (in seconds) for the scheduler to wait for dependencies to be met before failing with a DependencyTimeout.
retries (int) – Number of times a task will be retried on failure.
- shutdown(targets=None, restart=False, hub=False, block=None)#
Terminates one or more engine processes, optionally including the hub.
- spin()#
spin the client, and sync
- temp_flags(**kwargs)#
temporarily set flags, for use in
with
statements.See set_flags for permanent setting of flags
Examples
>>> view.track=False ... >>> with view.temp_flags(track=True): ... ar = view.apply(dostuff, my_big_array) ... ar.tracker.wait() # wait for send to finish >>> view.track False
- wait(jobs=None, timeout=-1)#
waits on one or more
jobs
, for up totimeout
seconds.- Parameters
- Returns
True (when all msg_ids are done)
False (timeout reached, some msg_ids still outstanding)
- class ipyparallel.BroadcastView(**kwargs: Any)#
- abort(jobs=None, targets=None, block=None)#
Abort jobs on my engines.
Note: only jobs that have not started yet can be aborted. To halt a running job, you must interrupt the engine(s) via the Cluster API.
- activate(suffix='')#
Activate IPython magics associated with this View
Defines the magics
%px, %autopx, %pxresult, %%px, %pxconfig
- Parameters
suffix (str [default: '']) –
The suffix, if any, for the magics. This allows you to have multiple views associated with parallel magics at the same time.
e.g.
rc[::2].activate(suffix='_even')
will give you the magics%px_even
,%pxresult_even
, etc. for running magics on the even engines.
- apply(_View__ipp_f, *args, **kwargs)#
calls
f(*args, **kwargs)
on remote engines, returning the result.This method sets all apply flags via this View’s attributes.
Returns
AsyncResult
instance ifself.block
is False, otherwise the return value off(*args, **kwargs)
.
- apply_async(_View__ipp_f, *args, **kwargs)#
calls
f(*args, **kwargs)
on remote engines in a nonblocking manner.Returns
AsyncResult
instance.
- apply_sync(_View__ipp_f, *args, **kwargs)#
calls
f(*args, **kwargs)
on remote engines in a blocking manner, returning the result.
- clear(targets=None, block=None)#
Clear the remote namespaces on my engines.
- execute(code, silent=True, targets=None, block=None)#
Executes
code
ontargets
in blocking or nonblocking manner.execute
is alwaysbound
(affects engine namespace)
- gather(key, dist='b', targets=None, block=None)#
Gather a partitioned sequence on a set of engines as a single local seq.
- get(key_s)#
get object(s) by
key_s
from remote namespacesee
pull
for details.
- get_result(indices_or_msg_ids=None, block=None, owner=False)#
return one or more results, specified by history index or msg_id.
See
ipyparallel.client.client.Client.get_result()
for details.
- imap(f, *sequences, **kwargs)#
Parallel version of
itertools.imap()
.See
self.map
for details.
- property importer#
sync_imports(local=True) as a property.
See sync_imports for details.
- map(f, *sequences, **kwargs)#
Parallel version of builtin
map
, using this View’stargets
.There will be one task per target, so work will be chunked if the sequences are longer than
targets
.Results can be iterated as they are ready, but will become available in chunks.
New in version 7.0:
return_exceptions
- Parameters
f (callable) – function to be mapped
*sequences (one or more sequences of matching length) – the sequences to be distributed and passed to
f
block (bool [default self.block]) – whether to wait for the result or not
track (bool [default False]) – Track underlying zmq send to indicate when it is safe to modify memory. Only for zero-copy sends such as numpy arrays that are going to be modified in-place.
return_exceptions (bool [default False]) – Return remote Exceptions in the result sequence instead of raising them.
- Returns
If block=False – An
AsyncMapResult
instance. An object like AsyncResult, but which reassembles the sequence of results into a single list. AsyncMapResults can be iterated through before all results are complete.else – A list, the result of
map(f,*sequences)
- map_async(f, *sequences, **kwargs)#
Parallel version of builtin
map()
, using this view’s engines.This is equivalent to
map(...block=False)
.See
self.map
for details.
- map_sync(f, *sequences, **kwargs)#
Parallel version of builtin
map()
, using this view’s engines.This is equivalent to
map(...block=True)
.See
self.map
for details.
- parallel(dist='b', block=None, **flags)#
Decorator for making a ParallelFunction
- pull(names, targets=None, block=None)#
get object(s) by
name
from remote namespacewill return one object if it is a key. can also take a list of keys, in which case it will return a list of objects.
- purge_results(jobs=[], targets=[])#
Instruct the controller to forget specific results.
- push(ns, targets=None, block=None, track=None)#
update remote namespace with dict
ns
- queue_status(targets=None, verbose=False)#
Fetch the Queue status of my engines
- remote(block=None, **flags)#
Decorator for making a RemoteFunction
- run(filename, targets=None, block=None)#
Execute contents of
filename
on my engine(s).This simply reads the contents of the file and calls
execute
.
- scatter(key, seq, dist='b', flatten=False, targets=None, block=None, track=None)#
Partition a Python sequence and send the partitions to a set of engines.
- set_flags(**kwargs)#
set my attribute flags by keyword.
Views determine behavior with a few attributes (
block
,track
, etc.). These attributes can be set all at once by name with this method.
- shutdown(targets=None, restart=False, hub=False, block=None)#
Terminates one or more engine processes, optionally including the hub.
- spin()#
spin the client, and sync
- sync_imports(local=True, quiet=False)#
Context Manager for performing simultaneous local and remote imports.
‘import x as y’ will not work. The ‘as y’ part will simply be ignored.
If
local=True
, then the package will also be imported locally.If
quiet=True
, no output will be produced when attempting remote imports.Note that remote-only (
local=False
) imports have not been implemented.>>> with view.sync_imports(): ... from numpy import recarray importing recarray from numpy on engine(s)
- temp_flags(**kwargs)#
temporarily set flags, for use in
with
statements.See set_flags for permanent setting of flags
Examples
>>> view.track=False ... >>> with view.temp_flags(track=True): ... ar = view.apply(dostuff, my_big_array) ... ar.tracker.wait() # wait for send to finish >>> view.track False
- update(ns)#
update remote namespace with dict
ns
See
push
for details.
- use_cloudpickle()#
Expand serialization support with cloudpickle.
This calls ipyparallel.serialize.use_cloudpickle() here and on each engine.
- use_dill()#
Expand serialization support with dill
adds support for closures, etc.
This calls ipyparallel.serialize.use_dill() here and on each engine.
- use_pickle()#
Restore
This reverts changes to serialization caused by
use_dill|.cloudpickle
.
- wait(jobs=None, timeout=-1)#
waits on one or more
jobs
, for up totimeout
seconds.- Parameters
- Returns
True (when all msg_ids are done)
False (timeout reached, some msg_ids still outstanding)
- class ipyparallel.AsyncResult(client, children, fname='unknown', targets=None, owner=False, return_exceptions=False, chunk_sizes=None)#
Class for representing results of non-blocking calls.
Extends the interfaces of
multiprocessing.pool.AsyncResult
andconcurrent.futures.Future
.- abort()#
Abort my tasks, if possible.
Only tasks that have not started yet can be aborted.
Raises RuntimeError if already done.
- add_done_callback(fn)#
Attaches a callable that will be called when the future finishes.
- Parameters
fn – A callable that will be called with this future as its only argument when the future completes or is cancelled. The callable will always be called by a thread in the same process in which it was added. If the future has already completed or been cancelled then the callable will be called immediately. These callables are called in the order that they were added.
- cancel()#
Cancel the future if possible.
Returns True if the future was cancelled, False otherwise. A future cannot be cancelled if it is running or has already completed.
- cancelled()#
Return True if the future was cancelled.
- display_outputs(groupby='type', result_only=False)#
republish the outputs of the computation
- Parameters
groupby (str [default: type]) –
- if ‘type’:
Group outputs by type (show all stdout, then all stderr, etc.):
[stdout:1] foo [stdout:2] foo [stderr:1] bar [stderr:2] bar
- if ‘engine’:
Display outputs for each engine before moving on to the next:
[stdout:1] foo [stderr:1] bar [stdout:2] foo [stderr:2] bar
- if ‘order’:
Like ‘type’, but further collate individual displaypub outputs. This is meant for cases of each command producing several plots, and you would like to see all of the first plots together, then all of the second plots, and so on.
result_only (boolean [default: False]) – Only display the execution result and skip stdout, stderr and display-outputs. Usually used when using streaming output since these outputs would have already been displayed.
- done()#
Return True if the future was cancelled or finished executing.
- property elapsed#
elapsed time since initial submission
- exception(timeout=None)#
Return the exception raised by the call that the future represents.
- Parameters
timeout – The number of seconds to wait for the exception if the future isn’t done. If None, then there is no limit on the wait time.
- Returns
The exception raised by the call that the future represents or None if the call completed without raising.
- Raises
CancelledError – If the future was cancelled.
TimeoutError – If the future didn’t finish executing before the given timeout.
- get(timeout=None, return_exceptions=None, return_when=None)#
Return the result when it arrives.
Arguments:
- timeoutint [default None]
If
timeout
is notNone
and the result does not arrive withintimeout
seconds thenTimeoutError
is raised. If the remote call raised an exception then that exception will be reraised by get() inside aRemoteError
.- return_exceptionsbool [default False]
If True, return Exceptions instead of raising them.
- return_whenNone, ALL_COMPLETED, or FIRST_EXCEPTION
FIRST_COMPLETED is not supported, and treated the same as ALL_COMPLETED. See
concurrent.futures.wait()
for documentation.When return_when=FIRST_EXCEPTION, will raise immediately on the first exception, rather than waiting for all results to finish before reporting errors.
Changed in version 8.0: Added
return_when
argument.
- get_dict(timeout=-1)#
Get the results as a dict, keyed by engine_id.
timeout behavior is described in
get()
.
- classmethod join(*async_results)#
Join multiple AsyncResults into one
Inverse of .split(), used for rejoining split results in wait.
New in version 8.0.
- property metadata#
property for accessing execution metadata.
- property progress#
the number of tasks which have been completed at this point.
Fractional progress would be given by 1.0 * ar.progress / len(ar)
- property r#
result property wrapper for
get(timeout=-1)
.
- ready()#
Return whether the call has completed.
- result(timeout=None)#
Return the result of the call that the future represents.
- Parameters
timeout – The number of seconds to wait for the result if the future isn’t done. If None, then there is no limit on the wait time.
- Returns
The result of the call that the future represents.
- Raises
CancelledError – If the future was cancelled.
TimeoutError – If the future didn’t finish executing before the given timeout.
Exception – If the call raised then that exception will be raised.
- property result_dict#
result property as a dict.
- running()#
Return True if the future is currently executing.
- property sent#
check whether my messages have been sent.
- property serial_time#
serial computation time of a parallel calculation
Computed as the sum of (completed-started) of each task
- set_exception(exception)#
Sets the result of the future as being the given exception.
Should only be used by Executor implementations and unit tests.
- set_result(result)#
Sets the return value of work associated with the future.
Should only be used by Executor implementations and unit tests.
- set_running_or_notify_cancel()#
Mark the future as running or process any cancel notifications.
Should only be used by Executor implementations and unit tests.
If the future has been cancelled (cancel() was called and returned True) then any threads waiting on the future completing (though calls to as_completed() or wait()) are notified and False is returned.
If the future was not cancelled then it is put in the running state (future calls to running() will return True) and True is returned.
This method should be called by Executor implementations before executing the work associated with this future. If this method returns False then the work should not be executed.
- Returns
False if the Future was cancelled, True otherwise.
- Raises
RuntimeError – if this method was already called or if set_result() or set_exception() was called.
- split()#
Split an AsyncResult
An AsyncResult object that represents multiple messages can be split to wait for individual results This can be passed to
concurrent.futures.wait
and friends to get partial results.New in version 8.0.
- stream_output()#
Stream output for this result as it arrives.
Returns a context manager, during which output is streamed.
- successful()#
Return whether the call completed without raising an exception.
Will raise
RuntimeError
if the result is not ready.
- timedelta(start, end, start_key=<built-in function min>, end_key=<built-in function max>)#
compute the difference between two sets of timestamps
The default behavior is to use the earliest of the first and the latest of the second list, but this can be changed by passing a different
- Parameters
start (one or more datetime objects (e.g. ar.submitted)) –
end (one or more datetime objects (e.g. ar.received)) –
start_key (callable) – Function to call on
start
to extract the relevant entry [default: min]end_key (callable) – Function to call on
end
to extract the relevant entry [default: max]
- Returns
dt – The time elapsed (in seconds) between the two selected timestamps.
- Return type
- wait(timeout=-1, return_when=None)#
Wait until the result is available or until
timeout
seconds pass.Arguments:
- timeout (int):
The timeout in seconds.
-1
or None indicate an infinite timeout.- return_when (enum):
None, ALL_COMPLETED, FIRST_COMPLETED, or FIRST_EXCEPTION. Passed to
concurrent.futures.wait()
. If specified and not-None,
- Returns
- For backward-compatibility.
If
return_when
is None or unspecified, returns True if all tasks are done, False otherwise- (done, pending):
If
return_when
is any of the constants forconcurrent.futures.wait()
, will return two sets of AsyncResult objects representing the completed and still-pending subsets of results, matching the return value ofwait
itself.
- Return type
ready (bool)
Changed in version 8.0: Added
return_when
.
- wait_for_output(timeout=-1)#
Wait for our output to be complete.
AsyncResult.wait only waits for the result, which may arrive before output is complete.
- wait_for_send(timeout=-1)#
wait for pyzmq send to complete.
This is necessary when sending arrays that you intend to edit in-place.
timeout
is in seconds, and will raise TimeoutError if it is reached before the send completes.
- wait_interactive(interval=0.1, timeout=-1, widget=None, return_when='ALL_COMPLETED')#
interactive wait, printing progress at regular intervals.
- Parameters
interval (float) – Interval on which to update progress display.
timeout (float) – Time (in seconds) to wait before raising a TimeoutError. -1 (default) means no timeout.
widget (bool) – default: True if in an IPython kernel (notebook), False otherwise. Override default context-detection behavior for whether a widget-based progress bar should be used.
return_when (concurrent.futures.ALL_COMPLETED | FIRST_EXCEPTION | FIRST_COMPLETED) –
- property wall_time#
actual computation time of a parallel calculation
Computed as the time between the latest
received
stamp and the earliestsubmitted
.For similar comparison of other timestamp pairs, check out AsyncResult.timedelta.
- class ipyparallel.ViewExecutor(view)#
A PEP-3148 Executor API for Views
Access as view.executor
- map(func, *iterables, **kwargs)#
Return generator for View.map_async
- shutdown(wait=True)#
ViewExecutor does not shutdown engines
results are awaited if wait=True, but engines are not shutdown.
- submit(fn, *args, **kwargs)#
Same as View.apply_async
Decorators#
IPython parallel provides some decorators to assist in using your functions as tasks.
- @ipyparallel.interactive#
decorator for making functions appear as interactively defined. This results in the function being linked to the user_ns as globals() instead of the module globals().
- @ipyparallel.require(*objects, **mapping)#
Simple decorator for requiring local objects and modules to be available when the decorated function is called on the engine.
Modules specified by name or passed directly will be imported prior to calling the decorated function.
Objects other than modules will be pushed as a part of the task. Functions can be passed positionally, and will be pushed to the engine with their __name__. Other objects can be passed by keyword arg.
Examples
In [1]: @ipp.require('numpy') ...: def norm(a): ...: return numpy.linalg.norm(a,2)
In [2]: foo = lambda x: x*x In [3]: @ipp.require(foo) ...: def bar(a): ...: return foo(1-a)
- @ipyparallel.depend(_wrapped_f, *args, **kwargs)#
Dependency decorator, for use with tasks.
@depend
lets you define a function for engine dependencies just like you useapply
for tasks.Examples
@depend(df, a,b, c=5) def f(m,n,p) view.apply(f, 1,2,3)
will call df(a,b,c=5) on the engine, and if it returns False or raises an UnmetDependency error, then the task will not be run and another engine will be tried.
- @ipyparallel.remote(view, block=None, **flags)#
Turn a function into a remote function.
This method can be used for map:
In [1]: @remote(view,block=True) ...: def func(a): ...: pass
- @ipyparallel.parallel(view, dist='b', block=None, ordered=True, **flags)#
Turn a function into a parallel remote function.
This method can be used for map:
In [1]: @parallel(view, block=True) ...: def func(a): ...: pass
Exceptions#
- exception ipyparallel.RemoteError(ename, evalue, traceback, engine_info=None)#
Error raised elsewhere
- exception ipyparallel.CompositeError(message, elist)#
Error for representing possibly multiple errors on engines
- exception ipyparallel.NoEnginesRegistered#
Exception for operations that require some engines, but none exist
- exception ipyparallel.ImpossibleDependency#
- exception ipyparallel.InvalidDependency#