API Reference#

ipyparallel.version_info = (8, 7, 0)#

Built-in immutable sequence.

If no argument is given, the constructor returns an empty tuple. If iterable is specified the tuple is initialized from iterable’s items.

If the argument is a tuple, the return value is the same object.

The IPython parallel version as a tuple of integers. There will always be 3 integers. Development releases will have ‘dev’ as a fourth element.

Classes#

class ipyparallel.Cluster(*, cluster_file: str = '', cluster_id: str = '', config: Instance = {}, controller_args: list = [], controller_ip: str = '', controller: Launcher = <class 'ipyparallel.cluster.launcher.LocalControllerLauncher'>, controller_location: str = '', delay: float = 1.0, engines: Launcher = <class 'ipyparallel.cluster.launcher.LocalEngineSetLauncher'>, engine_timeout: int = 60, load_profile: bool = True, log: Any = None, log_level: int = 20, n: int = None, parent: Instance = None, profile: str = '', profile_dir: str = '', send_engines_connection_env: bool = True, shutdown_atexit: bool = True)#

Class representing an IPP cluster

i.e. one controller and one or more groups of engines

Can start/stop/monitor/poll cluster resources

All async methods can be called synchronously with a _sync suffix, e.g. cluster.start_cluster_sync()

Changed in version 8.0: controller and engine launcher classes can be specified via Cluster(controller='ssh', engines='mpi') without the _launcher_class suffix.

cluster_file Unicode('')#

The path to the cluster file for saving this cluster to disk

cluster_id Unicode('')#

The id of the cluster (default: random string)

async connect_client(**client_kwargs)#

Return a client connected to the cluster

controller_args c.Cluster.controller_args = List()#

Additional CLI args to pass to the controller.

controller_ip c.Cluster.controller_ip = Unicode('')#

Set the IP address of the controller.

controller_launcher_class c.Cluster.controller_launcher_class = Launcher(<class 'ipyparallel.cluster.launcher.LocalControllerLauncher'>)#
The class for launching a Controller. Change this value if you want

your controller to also be launched by a batch system, such as PBS,SGE,MPI,etc.

Each launcher class has its own set of configuration options, for making sure it will work in your environment.

Note that using a batch launcher for the controller does not put it in the same batch job as the engines, so they will still start separately.

Third-party engine launchers can be registered via ipyparallel.engine_launchers entry point.

They can be selected via case-insensitive abbreviation, e.g.

c.Cluster.controller_launcher_class = ‘SSH’

or:

ipcluster start –controller=MPI

Currently installed:
  • batch: ipyparallel.cluster.launcher.BatchControllerLauncher

  • htcondor: ipyparallel.cluster.launcher.HTCondorControllerLauncher

  • local: ipyparallel.cluster.launcher.LocalControllerLauncher

  • lsf: ipyparallel.cluster.launcher.LSFControllerLauncher

  • mpi: ipyparallel.cluster.launcher.MPIControllerLauncher

  • pbs: ipyparallel.cluster.launcher.PBSControllerLauncher

  • sge: ipyparallel.cluster.launcher.SGEControllerLauncher

  • slurm: ipyparallel.cluster.launcher.SlurmControllerLauncher

  • ssh: ipyparallel.cluster.launcher.SSHControllerLauncher

  • winhpc: ipyparallel.cluster.launcher.WindowsHPCControllerLauncher

controller_location c.Cluster.controller_location = Unicode('')#

Set the location (hostname or ip) of the controller.

This is used by engines and clients to locate the controller when the controller listens on all interfaces

delay c.Cluster.delay = Float(1.0)#

delay (in s) between starting the controller and the engines

engine_launcher_class c.Cluster.engine_launcher_class = Launcher(<class 'ipyparallel.cluster.launcher.LocalEngineSetLauncher'>)#
The class for launching a set of Engines. Change this value

to use various batch systems to launch your engines, such as PBS,SGE,MPI,etc. Each launcher class has its own set of configuration options, for making sure it will work in your environment.

Third-party engine launchers can be registered via ipyparallel.engine_launchers entry point.

They can be selected via case-insensitive abbreviation, e.g.

c.Cluster.engine_launcher_class = ‘ssh’

or:

ipcluster start –engines=mpi

Currently installed:
  • batch: ipyparallel.cluster.launcher.BatchEngineSetLauncher

  • htcondor: ipyparallel.cluster.launcher.HTCondorEngineSetLauncher

  • local: ipyparallel.cluster.launcher.LocalEngineSetLauncher

  • lsf: ipyparallel.cluster.launcher.LSFEngineSetLauncher

  • mpi: ipyparallel.cluster.launcher.MPIEngineSetLauncher

  • pbs: ipyparallel.cluster.launcher.PBSEngineSetLauncher

  • sge: ipyparallel.cluster.launcher.SGEEngineSetLauncher

  • slurm: ipyparallel.cluster.launcher.SlurmEngineSetLauncher

  • ssh: ipyparallel.cluster.launcher.SSHEngineSetLauncher

  • sshproxy: ipyparallel.cluster.launcher.SSHProxyEngineSetLauncher

  • winhpc: ipyparallel.cluster.launcher.WindowsHPCEngineSetLauncher

property engine_set#

Return the first engine set

Most clusters have only one engine set, which is tedious to get to via the engines dict with random engine set ids.

..versionadded:: 8.0

engine_timeout c.Cluster.engine_timeout = Int(60)#

Timeout to use when waiting for engines to register

before giving up.

classmethod from_dict(d, **kwargs)#

Construct a Cluster from serialized state

classmethod from_file(cluster_file=None, *, profile=None, profile_dir=None, cluster_id='', **kwargs)#

Load a Cluster object from a file

Can specify a full path, or combination of profile, profile_dir, and/or cluster_id.

With no arguments given, it will connect to a cluster created with ipcluster start.

load_profile c.Cluster.load_profile = Bool(True)#

If True (default) load ipcluster config from profile directory, if present.

n c.Cluster.n = Int(None)#

The number of engines to start

profile Unicode('')#

The profile name, a shortcut for specifying profile_dir within $IPYTHONDIR.

profile_dir Unicode('')#

The profile directory.

Default priority:

  • specified explicitly

  • current IPython session

  • use profile name (default: ‘default’)

remove_cluster_file()#

Remove my cluster file.

async restart_engine(engine_id)#

Restart one engine

May stop all engines in a set, depending on EngineSet features (e.g. mpiexec)

async restart_engines(engine_set_id=None)#

Restart an engine set

send_engines_connection_env c.Cluster.send_engines_connection_env = Bool(True)#

Wait for controller’s connection info before passing to engines via $IPP_CONNECTION_INFO environment variable.

Set to False to start engines immediately without waiting for the controller’s connection info to be available.

When True, no connection file movement is required. False is mainly useful when submitting the controller may take a long time in a job queue, and the engines should enter the queue before the controller is running.

New in version 8.0.

shutdown_atexit Bool(True)#

Shutdown the cluster at process exit.

Set to False if you want to launch a cluster and leave it running after the launching process exits.

async signal_engine(signum, engine_id)#

Signal one engine

May signal all engines in a set, depending on EngineSet features (e.g. mpiexec)

async signal_engines(signum, engine_set_id=None)#

Signal all engines in a set

If no engine set is specified, signal all engine sets.

async start_and_connect(n=None, activate=False)#

Single call to start a cluster and connect a client

If activate is given, a blocking DirectView on all engines will be created and activated, registering %px magics for use in IPython

Example:

rc = await Cluster(engines="mpi").start_and_connect(n=8, activate=True)

%px print("hello, world!")

Equivalent to:

await self.start_cluster(n)
client = await self.connect_client()
await client.wait_for_engines(n, block=False)

New in version 7.1.

New in version 8.1: activate argument.

async start_cluster(n=None)#

Start a cluster

starts one controller and n engines (default: self.n)

Changed in version 7.1: return self, to allow method chaining

async start_controller(**kwargs)#

Start the controller

Keyword arguments are passed to the controller launcher constructor

async start_engines(n=None, engine_set_id=None, **kwargs)#

Start an engine set

Returns an engine set id which can be used in stop_engines

async stop_cluster()#

Stop the controller and all engines

async stop_controller()#

Stop the controller

async stop_engine(engine_id)#

Stop one engine

May stop all engines in a set, depending on EngineSet features (e.g. mpiexec)

async stop_engines(engine_set_id=None)#

Stop an engine set

If engine_set_id is not given, all engines are stopped.

to_dict()#

Serialize a Cluster object for later reconstruction

update_cluster_file()#

Update my cluster file

If cluster_file is disabled, do nothing If cluster is fully stopped, remove the file

write_cluster_file()#

Write cluster info to disk for later loading

class ipyparallel.Client(*args, **kw)#

A semi-synchronous client to an IPython parallel cluster

Parameters:
  • connection_info (str or dict) – The path to ipcontroller-client.json, or a dict containing the same information. This JSON file should contain all the information needed to connect to a cluster, and is usually the only argument needed. [Default: use profile]

  • profile (str) – The name of the Cluster profile to be used to find connector information. If run from an IPython application, the default profile will be the same as the running application, otherwise it will be ‘default’.

  • cluster_id (str) – String id to added to runtime files, to prevent name collisions when using multiple clusters with a single profile simultaneously. When set, will look for files named like: ‘ipcontroller-<cluster_id>-client.json’ Since this is text inserted into filenames, typical recommendations apply: Simple character strings are ideal, and spaces are not recommended (but should generally work)

  • context (zmq.Context) – Pass an existing zmq.Context instance, otherwise the client will create its own.

  • debug (bool) – flag for lots of message printing for debug purposes

  • timeout (float) – time (in seconds) to wait for connection replies from the Hub [Default: 10]

  • sshserver (str) – A string of the form passed to ssh, i.e. ‘server.tld’ or ‘user@server.tld:port’ If keyfile or password is specified, and this is not, it will default to the ip given in addr.

  • sshkey (str; path to ssh private key file) – This specifies a key to be used in ssh login, default None. Regular default ssh keys will be used without specifying this argument.

  • password (str) – Your ssh password to sshserver. Note that if this is left None, you will be prompted for it if passwordless key based login is unavailable.

  • paramiko (bool) – flag for whether to use paramiko instead of shell ssh for tunneling. [default: True on win32, False else]

ids#

requesting the ids attribute always synchronizes the registration state. To request ids without synchronization, use semi-private _ids attributes.

Type:

list of int engine IDs

history#

a list of msg_ids, keeping track of all the execution messages you have submitted in order.

Type:

list of msg_ids

outstanding#

a set of msg_ids that have been submitted, but whose results have not yet been received.

Type:

set of msg_ids

results#

a dict of all our results, keyed by msg_id

Type:

dict

block#

determines default behavior when block not specified in execution methods

Type:

bool

abort(jobs=None, targets=None, block=None)#

Abort specific jobs from the execution queues of target(s).

This is a mechanism to prevent jobs that have already been submitted from executing. To halt a running job, you must interrupt the engine(s) by sending a signal. This can be done via os.kill for local engines, or Cluster.signal_engines() for multiple engines.

Parameters:

jobs (msg_id, list of msg_ids, or AsyncResult) –

The jobs to be aborted

If unspecified/None: abort all outstanding jobs.

activate(targets='all', suffix='')#

Create a DirectView and register it with IPython magics

Defines the magics %px, %autopx, %pxresult, %%px

Parameters:
  • targets (int, list of ints, or 'all') – The engines on which the view’s magics will run

  • suffix (str [default: '']) –

    The suffix, if any, for the magics. This allows you to have multiple views associated with parallel magics at the same time.

    e.g. rc.activate(targets=0, suffix='0') will give you the magics %px0, %pxresult0, etc. for running magics just on engine 0.

become_dask(targets='all', port=0, nanny=False, scheduler_args=None, **worker_args)#

Turn the IPython cluster into a dask.distributed cluster

Parameters:
  • targets (target spec (default: all)) – Which engines to turn into dask workers.

  • port (int (default: random)) – Which port

  • nanny (bool (default: False)) – Whether to start workers as subprocesses instead of in the engine process. Using a nanny allows restarting the worker processes via executor.restart.

  • scheduler_args (dict) – Keyword arguments (e.g. ip) to pass to the distributed.Scheduler constructor.

  • **worker_args – Any additional keyword arguments (e.g. nthreads) are passed to the distributed.Worker constructor.

Returns:

A dask.distributed.Client connected to the dask cluster.

Return type:

client = distributed.Client

become_distributed(targets='all', port=0, nanny=False, scheduler_args=None, **worker_args)#

Turn the IPython cluster into a dask.distributed cluster

Parameters:
  • targets (target spec (default: all)) – Which engines to turn into dask workers.

  • port (int (default: random)) – Which port

  • nanny (bool (default: False)) – Whether to start workers as subprocesses instead of in the engine process. Using a nanny allows restarting the worker processes via executor.restart.

  • scheduler_args (dict) – Keyword arguments (e.g. ip) to pass to the distributed.Scheduler constructor.

  • **worker_args – Any additional keyword arguments (e.g. nthreads) are passed to the distributed.Worker constructor.

Returns:

A dask.distributed.Client connected to the dask cluster.

Return type:

client = distributed.Client

broadcast_view(targets='all', is_coalescing=False, **kwargs)#

construct a BroadCastView object. If no arguments are specified, create a BroadCastView using all engines using all engines.

Parameters:
  • targets (list,slice,int,etc. [default: use all engines]) – The subset of engines across which to load-balance execution

  • is_coalescing (scheduler collects all messages from engines and returns them as one) –

  • **kwargs (passed to BroadCastView) –

clear(targets=None, block=None)#

Clear the namespace in target(s).

close(linger=None)#

Close my zmq Sockets

If linger, set the zmq LINGER socket option, which allows discarding of messages.

db_query(query, keys=None)#

Query the Hub’s TaskRecord database

This will return a list of task record dicts that match query

Parameters:
  • query (mongodb query dict) – The search dict. See mongodb query docs for details.

  • keys (list of strs [optional]) – The subset of keys to be returned. The default is to fetch everything but buffers. ‘msg_id’ will always be included.

direct_view(targets='all', **kwargs)#

construct a DirectView object.

If no targets are specified, create a DirectView using all engines.

rc.direct_view(‘all’) is distinguished from rc[:] in that ‘all’ will evaluate the target engines at each execution, whereas rc[:] will connect to all current engines, and that list will not change.

That is, ‘all’ will always use all engines, whereas rc[:] will not use engines added after the DirectView is constructed.

Parameters:
  • targets (list,slice,int,etc. [default: use all engines]) – The engines to use for the View

  • **kwargs (passed to DirectView) –

executor(targets=None)#

Construct a PEP-3148 Executor with a LoadBalancedView

Parameters:

targets (list,slice,int,etc. [default: use all engines]) – The subset of engines across which to load-balance execution

Returns:

executor – The Executor object

Return type:

Executor

get_result(indices_or_msg_ids=None, block=None, owner=True)#

Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.

If the client already has the results, no request to the Hub will be made.

This is a convenient way to construct AsyncResult objects, which are wrappers that include metadata about execution, and allow for awaiting results that were not submitted by this Client.

It can also be a convenient way to retrieve the metadata associated with blocking execution, since it always retrieves

Examples

In [10]: r = client.apply()
Parameters:
  • indices_or_msg_ids (integer history index, str msg_id, AsyncResult,) – or a list of same. The indices or msg_ids of indices to be retrieved

  • block (bool) – Whether to wait for the result to be done

  • owner (bool [default: True]) – Whether this AsyncResult should own the result. If so, calling ar.get() will remove data from the client’s result and metadata cache. There should only be one owner of any given msg_id.

Returns:

  • AsyncResult – A single AsyncResult object will always be returned.

  • AsyncHubResult – A subclass of AsyncResult that retrieves results from the Hub

hub_history()#

Get the Hub’s history

Just like the Client, the Hub has a history, which is a list of msg_ids. This will contain the history of all clients, and, depending on configuration, may contain history across multiple cluster sessions.

Any msg_id returned here is a valid argument to get_result.

Returns:

msg_ids – list of all msg_ids, ordered by task submission time.

Return type:

list of strs

load_balanced_view(targets=None, **kwargs)#

construct a DirectView object.

If no arguments are specified, create a LoadBalancedView using all engines.

Parameters:
  • targets (list,slice,int,etc. [default: use all engines]) – The subset of engines across which to load-balance execution

  • **kwargs (passed to LoadBalancedView) –

purge_everything()#

Clears all content from previous Tasks from both the hub and the local client

In addition to calling purge_results("all") it also deletes the history and other bookkeeping lists.

purge_hub_results(jobs=[], targets=[])#

Tell the Hub to forget results.

Individual results can be purged by msg_id, or the entire history of specific targets can be purged.

Use purge_results('all') to scrub everything from the Hub’s db.

Parameters:
  • jobs (str or list of str or AsyncResult objects) – the msg_ids whose results should be forgotten.

  • targets (int/str/list of ints/strs) –

    The targets, by int_id, whose entire history is to be purged.

    default : None

purge_local_results(jobs=[], targets=[])#

Clears the client caches of results and their metadata.

Individual results can be purged by msg_id, or the entire history of specific targets can be purged.

Use purge_local_results('all') to scrub everything from the Clients’s results and metadata caches.

After this call all AsyncResults are invalid and should be discarded.

If you must “reget” the results, you can still do so by using client.get_result(msg_id) or client.get_result(asyncresult). This will redownload the results from the hub if they are still available (i.e client.purge_hub_results(...) has not been called.

Parameters:
  • jobs (str or list of str or AsyncResult objects) – the msg_ids whose results should be purged.

  • targets (int/list of ints) – The engines, by integer ID, whose entire result histories are to be purged.

:raises RuntimeError : if any of the tasks to be purged are still outstanding.:

purge_results(jobs=[], targets=[])#

Clears the cached results from both the hub and the local client

Individual results can be purged by msg_id, or the entire history of specific targets can be purged.

Use purge_results('all') to scrub every cached result from both the Hub’s and the Client’s db.

Equivalent to calling both purge_hub_results() and purge_client_results() with the same arguments.

Parameters:
  • jobs (str or list of str or AsyncResult objects) – the msg_ids whose results should be forgotten.

  • targets (int/str/list of ints/strs) –

    The targets, by int_id, whose entire history is to be purged.

    default : None

queue_status(targets='all', verbose=False)#

Fetch the status of engine queues.

Parameters:
  • targets (int/str/list of ints/strs) – the engines whose states are to be queried. default : all

  • verbose (bool) – Whether to return lengths only, or lists of ids for each element

resubmit(indices_or_msg_ids=None, metadata=None, block=None)#

Resubmit one or more tasks.

in-flight tasks may not be resubmitted.

Parameters:
  • indices_or_msg_ids (integer history index, str msg_id, or list of either) – The indices or msg_ids of indices to be retrieved

  • block (bool) – Whether to wait for the result to be done

Returns:

A subclass of AsyncResult that retrieves results from the Hub

Return type:

AsyncHubResult

result_status(msg_ids, status_only=True)#

Check on the status of the result(s) of the apply request with msg_ids.

If status_only is False, then the actual results will be retrieved, else only the status of the results will be checked.

Parameters:
  • msg_ids (list of msg_ids) –

    if int:

    Passed as index to self.history for convenience.

  • status_only (bool (default: True)) –

    if False:

    Retrieve the actual results of completed tasks.

Returns:

results – There will always be the keys ‘pending’ and ‘completed’, which will be lists of msg_ids that are incomplete or complete. If status_only is False, then completed results will be keyed by their msg_id.

Return type:

dict

send_apply_request(socket, f, args=None, kwargs=None, metadata=None, track=False, ident=None, message_future_hook=None)#

construct and send an apply message via a socket.

This is the principal method with which all engine execution is performed by views.

send_execute_request(socket, code, silent=True, metadata=None, ident=None, message_future_hook=None)#

construct and send an execute request via a socket.

send_signal(sig, targets=None, block=None)#

Send a signal target(s).

Parameters:
  • sig (int or str) – The signal number or name to send. If a str, will evaluate to getattr(signal, sig) on the engine, which is useful for sending signals cross-platform.

  • versionadded: (..) – 7.0:

shutdown(targets='all', restart=False, hub=False, block=None)#

Terminates one or more engine processes, optionally including the hub.

Parameters:
  • targets (list of ints or 'all' [default: all]) – Which engines to shutdown.

  • hub (bool [default: False]) – Whether to include the Hub. hub=True implies targets=’all’.

  • block (bool [default: self.block]) – Whether to wait for clean shutdown replies or not.

  • restart (bool [default: False]) – NOT IMPLEMENTED whether to restart engines after shutting them down.

spin()#

DEPRECATED, DOES NOTHING

spin_thread(interval=1)#

DEPRECATED, DOES NOTHING

stop_dask(targets='all')#

Stop the distributed Scheduler and Workers started by become_dask.

Parameters:

targets (target spec (default: all)) – Which engines to stop dask workers on.

stop_distributed(targets='all')#

Stop the distributed Scheduler and Workers started by become_dask.

Parameters:

targets (target spec (default: all)) – Which engines to stop dask workers on.

stop_spin_thread()#

DEPRECATED, DOES NOTHING

wait(jobs=None, timeout=-1)#

waits on one or more jobs, for up to timeout seconds.

Parameters:
  • jobs (int, str, or list of ints and/or strs, or one or more AsyncResult objects) – ints are indices to self.history strs are msg_ids default: wait on all outstanding messages

  • timeout (float) – a time in seconds, after which to give up. default is -1, which means no timeout

Returns:

  • True (when all msg_ids are done)

  • False (timeout reached, some msg_ids still outstanding)

wait_for_engines(n=None, *, timeout=-1, block=True, interactive=None, widget=None)#

Wait for n engines to become available.

Returns when n engines are available, or raises TimeoutError if timeout is reached before n engines are ready.

Parameters:
  • n (int) – Number of engines to wait for.

  • timeout (float) – Time (in seconds) to wait before raising a TimeoutError

  • block (bool) – if False, return Future instead of waiting

  • interactive (bool) – default: True if in IPython, False otherwise. if True, show a progress bar while waiting for engines

  • widget (bool) – default: True if in an IPython kernel (notebook), False otherwise. Only has an effect if interactive is True. if True, forces use of widget progress bar. If False, forces use of terminal tqdm.

Returns:

f – Future object to wait on if block is False, None if block is True.

Return type:

concurrent.futures.Future or None

:raises TimeoutError : if timeout is reached.:

wait_interactive(jobs=None, interval=1.0, timeout=-1.0)#

Wait interactively for jobs

If no job is specified, will wait for all outstanding jobs to complete.

class ipyparallel.DirectView(**kwargs: Any)#

Direct Multiplexer View of one or more engines.

These are created via indexed access to a client:

>>> dv_1 = client[1]
>>> dv_all = client[:]
>>> dv_even = client[::2]
>>> dv_some = client[1:3]

This object provides dictionary access to engine namespaces:

# push a=5: >>> dv[‘a’] = 5 # pull ‘foo’: >>> dv[‘foo’]

abort(jobs=None, targets=None, block=None)#

Abort jobs on my engines.

Note: only jobs that have not started yet can be aborted. To halt a running job, you must interrupt the engine(s) via the Cluster API.

Parameters:

jobs (None, str, list of strs, optional) – if None: abort all jobs. else: abort specific msg_id(s).

activate(suffix='')#

Activate IPython magics associated with this View

Defines the magics %px, %autopx, %pxresult, %%px, %pxconfig

Parameters:

suffix (str [default: '']) –

The suffix, if any, for the magics. This allows you to have multiple views associated with parallel magics at the same time.

e.g. rc[::2].activate(suffix='_even') will give you the magics %px_even, %pxresult_even, etc. for running magics on the even engines.

apply(_View__ipp_f, *args, **kwargs)#

calls f(*args, **kwargs) on remote engines, returning the result.

This method sets all apply flags via this View’s attributes.

Returns AsyncResult instance if self.block is False, otherwise the return value of f(*args, **kwargs).

apply_async(_View__ipp_f, *args, **kwargs)#

calls f(*args, **kwargs) on remote engines in a nonblocking manner.

Returns AsyncResult instance.

apply_sync(_View__ipp_f, *args, **kwargs)#

calls f(*args, **kwargs) on remote engines in a blocking manner, returning the result.

clear(targets=None, block=None)#

Clear the remote namespaces on my engines.

execute(code, silent=True, targets=None, block=None)#

Executes code on targets in blocking or nonblocking manner.

execute is always bound (affects engine namespace)

Parameters:
  • code (str) – the code string to be executed

  • block (bool) – whether or not to wait until done to return default: self.block

gather(key, dist='b', targets=None, block=None)#

Gather a partitioned sequence on a set of engines as a single local seq.

get(key_s)#

get object(s) by key_s from remote namespace

see pull for details.

get_result(indices_or_msg_ids=None, block=None, owner=False)#

return one or more results, specified by history index or msg_id.

See ipyparallel.client.client.Client.get_result() for details.

imap(f, *sequences, **kwargs)#

Parallel version of itertools.imap().

See self.map for details.

property importer#

sync_imports(local=True) as a property.

See sync_imports for details.

map(f, *sequences, block=None, track=False, return_exceptions=False)#

Parallel version of builtin map, using this View’s targets.

There will be one task per target, so work will be chunked if the sequences are longer than targets.

Results can be iterated as they are ready, but will become available in chunks.

New in version 7.0: return_exceptions

Parameters:
  • f (callable) – function to be mapped

  • *sequences (one or more sequences of matching length) – the sequences to be distributed and passed to f

  • block (bool [default self.block]) – whether to wait for the result or not

  • track (bool [default False]) – Track underlying zmq send to indicate when it is safe to modify memory. Only for zero-copy sends such as numpy arrays that are going to be modified in-place.

  • return_exceptions (bool [default False]) – Return remote Exceptions in the result sequence instead of raising them.

Returns:

  • If block=False – An AsyncMapResult instance. An object like AsyncResult, but which reassembles the sequence of results into a single list. AsyncMapResults can be iterated through before all results are complete.

  • else – A list, the result of map(f,*sequences)

map_async(f, *sequences, **kwargs)#

Parallel version of builtin map(), using this view’s engines.

This is equivalent to map(...block=False).

See self.map for details.

map_sync(f, *sequences, **kwargs)#

Parallel version of builtin map(), using this view’s engines.

This is equivalent to map(...block=True).

See self.map for details.

parallel(dist='b', block=None, **flags)#

Decorator for making a ParallelFunction

pull(names, targets=None, block=None)#

get object(s) by name from remote namespace

will return one object if it is a key. can also take a list of keys, in which case it will return a list of objects.

purge_results(jobs=[], targets=[])#

Instruct the controller to forget specific results.

push(ns, targets=None, block=None, track=None)#

update remote namespace with dict ns

Parameters:
  • ns (dict) – dict of keys with which to update engine namespace(s)

  • block (bool [default : self.block]) – whether to wait to be notified of engine receipt

queue_status(targets=None, verbose=False)#

Fetch the Queue status of my engines

remote(block=None, **flags)#

Decorator for making a RemoteFunction

run(filename, targets=None, block=None)#

Execute contents of filename on my engine(s).

This simply reads the contents of the file and calls execute.

Parameters:
  • filename (str) – The path to the file

  • targets (int/str/list of ints/strs) – the engines on which to execute default : all

  • block (bool) – whether or not to wait until done default: self.block

scatter(key, seq, dist='b', flatten=False, targets=None, block=None, track=None)#

Partition a Python sequence and send the partitions to a set of engines.

set_flags(**kwargs)#

set my attribute flags by keyword.

Views determine behavior with a few attributes (block, track, etc.). These attributes can be set all at once by name with this method.

Parameters:
  • block (bool) – whether to wait for results

  • track (bool) – whether to create a MessageTracker to allow the user to safely edit after arrays and buffers during non-copying sends.

shutdown(targets=None, restart=False, hub=False, block=None)#

Terminates one or more engine processes, optionally including the hub.

spin()#

spin the client, and sync

sync_imports(local=True, quiet=False)#

Context Manager for performing simultaneous local and remote imports.

‘import x as y’ will not work. The ‘as y’ part will simply be ignored.

If local=True, then the package will also be imported locally.

If quiet=True, no output will be produced when attempting remote imports.

Note that remote-only (local=False) imports have not been implemented.

>>> with view.sync_imports():
...    from numpy import recarray
importing recarray from numpy on engine(s)
temp_flags(**kwargs)#

temporarily set flags, for use in with statements.

See set_flags for permanent setting of flags

Examples

>>> view.track=False
...
>>> with view.temp_flags(track=True):
...    ar = view.apply(dostuff, my_big_array)
...    ar.tracker.wait() # wait for send to finish
>>> view.track
False
update(ns)#

update remote namespace with dict ns

See push for details.

use_cloudpickle()#

Expand serialization support with cloudpickle.

This calls ipyparallel.serialize.use_cloudpickle() here and on each engine.

use_dill()#

Expand serialization support with dill

adds support for closures, etc.

This calls ipyparallel.serialize.use_dill() here and on each engine.

use_pickle()#

Restore

This reverts changes to serialization caused by use_dill|.cloudpickle.

wait(jobs=None, timeout=-1)#

waits on one or more jobs, for up to timeout seconds.

Parameters:
  • jobs (int, str, or list of ints and/or strs, or one or more AsyncResult objects) – ints are indices to self.history strs are msg_ids default: wait on all outstanding messages

  • timeout (float) – a time in seconds, after which to give up. default is -1, which means no timeout

Returns:

  • True (when all msg_ids are done)

  • False (timeout reached, some msg_ids still outstanding)

class ipyparallel.LoadBalancedView(**kwargs: Any)#

An load-balancing View that only executes via the Task scheduler.

Load-balanced views can be created with the client’s view method:

>>> v = client.load_balanced_view()

or targets can be specified, to restrict the potential destinations:

>>> v = client.load_balanced_view([1,3])

which would restrict loadbalancing to between engines 1 and 3.

abort(jobs=None, targets=None, block=None)#

Abort jobs on my engines.

Note: only jobs that have not started yet can be aborted. To halt a running job, you must interrupt the engine(s) via the Cluster API.

Parameters:

jobs (None, str, list of strs, optional) – if None: abort all jobs. else: abort specific msg_id(s).

apply(_View__ipp_f, *args, **kwargs)#

calls f(*args, **kwargs) on remote engines, returning the result.

This method sets all apply flags via this View’s attributes.

Returns AsyncResult instance if self.block is False, otherwise the return value of f(*args, **kwargs).

apply_async(_View__ipp_f, *args, **kwargs)#

calls f(*args, **kwargs) on remote engines in a nonblocking manner.

Returns AsyncResult instance.

apply_sync(_View__ipp_f, *args, **kwargs)#

calls f(*args, **kwargs) on remote engines in a blocking manner, returning the result.

get_result(indices_or_msg_ids=None, block=None, owner=False)#

return one or more results, specified by history index or msg_id.

See ipyparallel.client.client.Client.get_result() for details.

imap(f, *sequences, ordered=True, max_outstanding='auto', return_exceptions=False)#

Parallel version of lazily-evaluated imap, load-balanced by this View.

ordered, and max_outstanding can be specified by keyword only.

Unlike other map functions in IPython Parallel, this one does not consume the full iterable before submitting work, returning a single ‘AsyncMapResult’ representing the full computation.

Instead, it consumes iterables as they come, submitting up to max_outstanding tasks to the cluster before waiting on results (default: one task per engine). This allows it to work with infinite generators, and avoid potentially expensive read-ahead for large streams of inputs that may not fit in memory all at once.

New in version 7.0.

Parameters:
  • f (callable) – function to be mapped

  • *sequences (one or more sequences of matching length) – the sequences to be distributed and passed to f

  • ordered (bool [default True]) – Whether the results should be yielded on a first-come-first-yield basis, or preserve the order of submission.

  • max_outstanding (int [default len(engines)]) –

    The maximum number of tasks to be outstanding.

    max_outstanding=0 will greedily consume the whole generator (map_async may be more efficient).

    A limit of 1 should be strictly worse than running a local map, as there will be no parallelism.

    Use this to tune how greedily input generator should be consumed.

  • return_exceptions (bool [default False]) – Return Exceptions instead of raising them.

Returns:

  • lazily-evaluated generator, yielding results of f on each item of sequences.

  • Yield-order depends on ordered argument.

map(f, *sequences, block=None, chunksize=1, ordered=True, return_exceptions=False)#

Parallel version of builtin map, load-balanced by this View.

Each chunksize elements will be a separate task, and will be load-balanced. This lets individual elements be available for iteration as soon as they arrive.

New in version 7.0: return_exceptions

Parameters:
  • f (callable) – function to be mapped

  • *sequences (one or more sequences of matching length) – the sequences to be distributed and passed to f

  • block (bool [default self.block]) – whether to wait for the result or not

  • chunksize (int [default 1]) – how many elements should be in each task.

  • ordered (bool [default True]) –

    Whether the results should be gathered as they arrive, or enforce the order of submission.

    Only applies when iterating through AsyncMapResult as results arrive. Has no effect when block=True.

  • return_exceptions (bool [default False]) – Return Exceptions instead of raising on the first exception.

Returns:

  • if block=False – An AsyncMapResult instance. An object like AsyncResult, but which reassembles the sequence of results into a single list. AsyncMapResults can be iterated through before all results are complete.

  • else – A list, the result of map(f,*sequences)

map_async(f, *sequences, **kwargs)#

Parallel version of builtin map(), using this view’s engines.

This is equivalent to map(...block=False).

See self.map for details.

map_sync(f, *sequences, **kwargs)#

Parallel version of builtin map(), using this view’s engines.

This is equivalent to map(...block=True).

See self.map for details.

parallel(dist='b', block=None, **flags)#

Decorator for making a ParallelFunction

purge_results(jobs=[], targets=[])#

Instruct the controller to forget specific results.

queue_status(targets=None, verbose=False)#

Fetch the Queue status of my engines

register_joblib_backend(name='ipyparallel', make_default=False)#

Register this View as a joblib parallel backend

To make this the default backend, set make_default=True.

Use with:

p = Parallel(backend='ipyparallel')
...

See joblib docs for details

Requires joblib >= 0.10

New in version 5.1.

remote(block=None, **flags)#

Decorator for making a RemoteFunction

set_flags(**kwargs)#

set my attribute flags by keyword.

A View is a wrapper for the Client’s apply method, but with attributes that specify keyword arguments, those attributes can be set by keyword argument with this method.

Parameters:
  • block (bool) – whether to wait for results

  • track (bool) – whether to create a MessageTracker to allow the user to safely edit after arrays and buffers during non-copying sends.

  • after (Dependency or collection of msg_ids) – Only for load-balanced execution (targets=None) Specify a list of msg_ids as a time-based dependency. This job will only be run after the dependencies have been met.

  • follow (Dependency or collection of msg_ids) – Only for load-balanced execution (targets=None) Specify a list of msg_ids as a location-based dependency. This job will only be run on an engine where this dependency is met.

  • timeout (float/int or None) – Only for load-balanced execution (targets=None) Specify an amount of time (in seconds) for the scheduler to wait for dependencies to be met before failing with a DependencyTimeout.

  • retries (int) – Number of times a task will be retried on failure.

shutdown(targets=None, restart=False, hub=False, block=None)#

Terminates one or more engine processes, optionally including the hub.

spin()#

spin the client, and sync

temp_flags(**kwargs)#

temporarily set flags, for use in with statements.

See set_flags for permanent setting of flags

Examples

>>> view.track=False
...
>>> with view.temp_flags(track=True):
...    ar = view.apply(dostuff, my_big_array)
...    ar.tracker.wait() # wait for send to finish
>>> view.track
False
wait(jobs=None, timeout=-1)#

waits on one or more jobs, for up to timeout seconds.

Parameters:
  • jobs (int, str, or list of ints and/or strs, or one or more AsyncResult objects) – ints are indices to self.history strs are msg_ids default: wait on all outstanding messages

  • timeout (float) – a time in seconds, after which to give up. default is -1, which means no timeout

Returns:

  • True (when all msg_ids are done)

  • False (timeout reached, some msg_ids still outstanding)

class ipyparallel.BroadcastView(**kwargs: Any)#
abort(jobs=None, targets=None, block=None)#

Abort jobs on my engines.

Note: only jobs that have not started yet can be aborted. To halt a running job, you must interrupt the engine(s) via the Cluster API.

Parameters:

jobs (None, str, list of strs, optional) – if None: abort all jobs. else: abort specific msg_id(s).

activate(suffix='')#

Activate IPython magics associated with this View

Defines the magics %px, %autopx, %pxresult, %%px, %pxconfig

Parameters:

suffix (str [default: '']) –

The suffix, if any, for the magics. This allows you to have multiple views associated with parallel magics at the same time.

e.g. rc[::2].activate(suffix='_even') will give you the magics %px_even, %pxresult_even, etc. for running magics on the even engines.

apply(_View__ipp_f, *args, **kwargs)#

calls f(*args, **kwargs) on remote engines, returning the result.

This method sets all apply flags via this View’s attributes.

Returns AsyncResult instance if self.block is False, otherwise the return value of f(*args, **kwargs).

apply_async(_View__ipp_f, *args, **kwargs)#

calls f(*args, **kwargs) on remote engines in a nonblocking manner.

Returns AsyncResult instance.

apply_sync(_View__ipp_f, *args, **kwargs)#

calls f(*args, **kwargs) on remote engines in a blocking manner, returning the result.

clear(targets=None, block=None)#

Clear the remote namespaces on my engines.

execute(code, silent=True, targets=None, block=None)#

Executes code on targets in blocking or nonblocking manner.

execute is always bound (affects engine namespace)

Parameters:
  • code (str) – the code string to be executed

  • block (bool) – whether or not to wait until done to return default: self.block

gather(key, dist='b', targets=None, block=None)#

Gather a partitioned sequence on a set of engines as a single local seq.

get(key_s)#

get object(s) by key_s from remote namespace

see pull for details.

get_result(indices_or_msg_ids=None, block=None, owner=False)#

return one or more results, specified by history index or msg_id.

See ipyparallel.client.client.Client.get_result() for details.

imap(f, *sequences, **kwargs)#

Parallel version of itertools.imap().

See self.map for details.

property importer#

sync_imports(local=True) as a property.

See sync_imports for details.

map(f, *sequences, block=None, track=False, return_exceptions=False)#

Parallel version of builtin map, using this View’s targets.

There will be one task per engine, so work will be chunked if the sequences are longer than targets.

Results can be iterated as they are ready, but will become available in chunks.

Note

BroadcastView does not yet have a fully native map implementation. In particular, the scatter step is still one message per engine, identical to DirectView, and typically slower due to the more complex scheduler.

It is more efficient to partition inputs via other means (e.g. SPMD based on rank & size) and use apply to submit all tasks in one broadcast.

New in version 8.8.

Parameters:
  • f (callable) – function to be mapped

  • *sequences (one or more sequences of matching length) – the sequences to be distributed and passed to f

  • block (bool [default self.block]) – whether to wait for the result or not

  • track (bool [default False]) – Track underlying zmq send to indicate when it is safe to modify memory. Only for zero-copy sends such as numpy arrays that are going to be modified in-place.

  • return_exceptions (bool [default False]) – Return remote Exceptions in the result sequence instead of raising them.

Returns:

  • If block=False – An AsyncMapResult instance. An object like AsyncResult, but which reassembles the sequence of results into a single list. AsyncMapResults can be iterated through before all results are complete.

  • else – A list, the result of map(f,*sequences)

map_async(f, *sequences, **kwargs)#

Parallel version of builtin map(), using this view’s engines.

This is equivalent to map(...block=False).

See self.map for details.

map_sync(f, *sequences, **kwargs)#

Parallel version of builtin map(), using this view’s engines.

This is equivalent to map(...block=True).

See self.map for details.

parallel(dist='b', block=None, **flags)#

Decorator for making a ParallelFunction

pull(names, targets=None, block=None)#

get object(s) by name from remote namespace

will return one object if it is a key. can also take a list of keys, in which case it will return a list of objects.

purge_results(jobs=[], targets=[])#

Instruct the controller to forget specific results.

push(ns, targets=None, block=None, track=None)#

update remote namespace with dict ns

Parameters:
  • ns (dict) – dict of keys with which to update engine namespace(s)

  • block (bool [default : self.block]) – whether to wait to be notified of engine receipt

queue_status(targets=None, verbose=False)#

Fetch the Queue status of my engines

remote(block=None, **flags)#

Decorator for making a RemoteFunction

run(filename, targets=None, block=None)#

Execute contents of filename on my engine(s).

This simply reads the contents of the file and calls execute.

Parameters:
  • filename (str) – The path to the file

  • targets (int/str/list of ints/strs) – the engines on which to execute default : all

  • block (bool) – whether or not to wait until done default: self.block

scatter(key, seq, dist='b', flatten=False, targets=None, block=None, track=None)#

Partition a Python sequence and send the partitions to a set of engines.

set_flags(**kwargs)#

set my attribute flags by keyword.

Views determine behavior with a few attributes (block, track, etc.). These attributes can be set all at once by name with this method.

Parameters:
  • block (bool) – whether to wait for results

  • track (bool) – whether to create a MessageTracker to allow the user to safely edit after arrays and buffers during non-copying sends.

shutdown(targets=None, restart=False, hub=False, block=None)#

Terminates one or more engine processes, optionally including the hub.

spin()#

spin the client, and sync

sync_imports(local=True, quiet=False)#

Context Manager for performing simultaneous local and remote imports.

‘import x as y’ will not work. The ‘as y’ part will simply be ignored.

If local=True, then the package will also be imported locally.

If quiet=True, no output will be produced when attempting remote imports.

Note that remote-only (local=False) imports have not been implemented.

>>> with view.sync_imports():
...    from numpy import recarray
importing recarray from numpy on engine(s)
temp_flags(**kwargs)#

temporarily set flags, for use in with statements.

See set_flags for permanent setting of flags

Examples

>>> view.track=False
...
>>> with view.temp_flags(track=True):
...    ar = view.apply(dostuff, my_big_array)
...    ar.tracker.wait() # wait for send to finish
>>> view.track
False
update(ns)#

update remote namespace with dict ns

See push for details.

use_cloudpickle()#

Expand serialization support with cloudpickle.

This calls ipyparallel.serialize.use_cloudpickle() here and on each engine.

use_dill()#

Expand serialization support with dill

adds support for closures, etc.

This calls ipyparallel.serialize.use_dill() here and on each engine.

use_pickle()#

Restore

This reverts changes to serialization caused by use_dill|.cloudpickle.

wait(jobs=None, timeout=-1)#

waits on one or more jobs, for up to timeout seconds.

Parameters:
  • jobs (int, str, or list of ints and/or strs, or one or more AsyncResult objects) – ints are indices to self.history strs are msg_ids default: wait on all outstanding messages

  • timeout (float) – a time in seconds, after which to give up. default is -1, which means no timeout

Returns:

  • True (when all msg_ids are done)

  • False (timeout reached, some msg_ids still outstanding)

class ipyparallel.AsyncResult(client, children, fname='unknown', targets=None, owner=False, return_exceptions=False, chunk_sizes=None)#

Class for representing results of non-blocking calls.

Extends the interfaces of multiprocessing.pool.AsyncResult and concurrent.futures.Future.

abort()#

Abort my tasks, if possible.

Only tasks that have not started yet can be aborted.

Raises RuntimeError if already done.

add_done_callback(fn)#

Attaches a callable that will be called when the future finishes.

Parameters:

fn – A callable that will be called with this future as its only argument when the future completes or is cancelled. The callable will always be called by a thread in the same process in which it was added. If the future has already completed or been cancelled then the callable will be called immediately. These callables are called in the order that they were added.

cancel()#

Cancel the future if possible.

Returns True if the future was cancelled, False otherwise. A future cannot be cancelled if it is running or has already completed.

cancelled()#

Return True if the future was cancelled.

display_outputs(groupby='type', result_only=False)#

republish the outputs of the computation

Parameters:
  • groupby (str [default: type]) –

    if ‘type’:

    Group outputs by type (show all stdout, then all stderr, etc.):

    [stdout:1] foo [stdout:2] foo [stderr:1] bar [stderr:2] bar

    if ‘engine’:

    Display outputs for each engine before moving on to the next:

    [stdout:1] foo [stderr:1] bar [stdout:2] foo [stderr:2] bar

    if ‘order’:

    Like ‘type’, but further collate individual displaypub outputs. This is meant for cases of each command producing several plots, and you would like to see all of the first plots together, then all of the second plots, and so on.

  • result_only (boolean [default: False]) – Only display the execution result and skip stdout, stderr and display-outputs. Usually used when using streaming output since these outputs would have already been displayed.

done()#

Return True if the future was cancelled or finished executing.

property elapsed#

elapsed time since initial submission

exception(timeout=None)#

Return the exception raised by the call that the future represents.

Parameters:

timeout – The number of seconds to wait for the exception if the future isn’t done. If None, then there is no limit on the wait time.

Returns:

The exception raised by the call that the future represents or None if the call completed without raising.

Raises:
  • CancelledError – If the future was cancelled.

  • TimeoutError – If the future didn’t finish executing before the given timeout.

get(timeout=None, return_exceptions=None, return_when=None)#

Return the result when it arrives.

Arguments:

timeoutint [default None]

If timeout is not None and the result does not arrive within timeout seconds then TimeoutError is raised. If the remote call raised an exception then that exception will be reraised by get() inside a RemoteError.

return_exceptionsbool [default False]

If True, return Exceptions instead of raising them.

return_whenNone, ALL_COMPLETED, or FIRST_EXCEPTION

FIRST_COMPLETED is not supported, and treated the same as ALL_COMPLETED. See concurrent.futures.wait() for documentation.

When return_when=FIRST_EXCEPTION, will raise immediately on the first exception, rather than waiting for all results to finish before reporting errors.

Changed in version 8.0: Added return_when argument.

get_dict(timeout=-1)#

Get the results as a dict, keyed by engine_id.

timeout behavior is described in get().

classmethod join(*async_results)#

Join multiple AsyncResults into one

Inverse of .split(), used for rejoining split results in wait.

New in version 8.0.

property metadata#

property for accessing execution metadata.

property progress#

the number of tasks which have been completed at this point.

Fractional progress would be given by 1.0 * ar.progress / len(ar)

property r#

result property wrapper for get(timeout=-1).

ready()#

Return whether the call has completed.

result(timeout=None)#

Return the result of the call that the future represents.

Parameters:

timeout – The number of seconds to wait for the result if the future isn’t done. If None, then there is no limit on the wait time.

Returns:

The result of the call that the future represents.

Raises:
  • CancelledError – If the future was cancelled.

  • TimeoutError – If the future didn’t finish executing before the given timeout.

  • Exception – If the call raised then that exception will be raised.

property result_dict#

result property as a dict.

running()#

Return True if the future is currently executing.

property sent#

check whether my messages have been sent.

property serial_time#

serial computation time of a parallel calculation

Computed as the sum of (completed-started) of each task

set_exception(exception)#

Sets the result of the future as being the given exception.

Should only be used by Executor implementations and unit tests.

set_result(result)#

Sets the return value of work associated with the future.

Should only be used by Executor implementations and unit tests.

set_running_or_notify_cancel()#

Mark the future as running or process any cancel notifications.

Should only be used by Executor implementations and unit tests.

If the future has been cancelled (cancel() was called and returned True) then any threads waiting on the future completing (though calls to as_completed() or wait()) are notified and False is returned.

If the future was not cancelled then it is put in the running state (future calls to running() will return True) and True is returned.

This method should be called by Executor implementations before executing the work associated with this future. If this method returns False then the work should not be executed.

Returns:

False if the Future was cancelled, True otherwise.

Raises:

RuntimeError – if this method was already called or if set_result() or set_exception() was called.

split()#

Split an AsyncResult

An AsyncResult object that represents multiple messages can be split to wait for individual results This can be passed to concurrent.futures.wait and friends to get partial results.

New in version 8.0.

stream_output()#

Stream output for this result as it arrives.

Returns a context manager, during which output is streamed.

successful()#

Return whether the call completed without raising an exception.

Will raise RuntimeError if the result is not ready.

timedelta(start, end, start_key=<built-in function min>, end_key=<built-in function max>)#

compute the difference between two sets of timestamps

The default behavior is to use the earliest of the first and the latest of the second list, but this can be changed by passing a different

Parameters:
  • start (one or more datetime objects (e.g. ar.submitted)) –

  • end (one or more datetime objects (e.g. ar.received)) –

  • start_key (callable) – Function to call on start to extract the relevant entry [default: min]

  • end_key (callable) – Function to call on end to extract the relevant entry [default: max]

Returns:

dt – The time elapsed (in seconds) between the two selected timestamps.

Return type:

float

wait(timeout=-1, return_when=None)#

Wait until the result is available or until timeout seconds pass.

Arguments:

timeout (int):

The timeout in seconds. -1 or None indicate an infinite timeout.

return_when (enum):

None, ALL_COMPLETED, FIRST_COMPLETED, or FIRST_EXCEPTION. Passed to concurrent.futures.wait(). If specified and not-None,

Returns:

For backward-compatibility.

If return_when is None or unspecified, returns True if all tasks are done, False otherwise

(done, pending):

If return_when is any of the constants for concurrent.futures.wait(), will return two sets of AsyncResult objects representing the completed and still-pending subsets of results, matching the return value of wait itself.

Return type:

ready (bool)

Changed in version 8.0: Added return_when.

wait_for_output(timeout=-1)#

Wait for our output to be complete.

AsyncResult.wait only waits for the result, which may arrive before output is complete.

wait_for_send(timeout=-1)#

wait for pyzmq send to complete.

This is necessary when sending arrays that you intend to edit in-place. timeout is in seconds, and will raise TimeoutError if it is reached before the send completes.

wait_interactive(interval=0.1, timeout=-1, widget=None, return_when='ALL_COMPLETED')#

interactive wait, printing progress at regular intervals.

Parameters:
  • interval (float) – Interval on which to update progress display.

  • timeout (float) – Time (in seconds) to wait before raising a TimeoutError. -1 (default) means no timeout.

  • widget (bool) – default: True if in an IPython kernel (notebook), False otherwise. Override default context-detection behavior for whether a widget-based progress bar should be used.

  • return_when (concurrent.futures.ALL_COMPLETED | FIRST_EXCEPTION | FIRST_COMPLETED) –

property wall_time#

actual computation time of a parallel calculation

Computed as the time between the latest received stamp and the earliest submitted.

For similar comparison of other timestamp pairs, check out AsyncResult.timedelta.

class ipyparallel.ViewExecutor(view)#

A PEP-3148 Executor API for Views

Access as view.executor

map(func, *iterables, **kwargs)#

Return generator for View.map_async

shutdown(wait=True)#

ViewExecutor does not shutdown engines

results are awaited if wait=True, but engines are not shutdown.

submit(fn, *args, **kwargs)#

Same as View.apply_async

Decorators#

IPython parallel provides some decorators to assist in using your functions as tasks.

@ipyparallel.interactive#

decorator for making functions appear as interactively defined. This results in the function being linked to the user_ns as globals() instead of the module globals().

@ipyparallel.require(*objects, **mapping)#

Simple decorator for requiring local objects and modules to be available when the decorated function is called on the engine.

Modules specified by name or passed directly will be imported prior to calling the decorated function.

Objects other than modules will be pushed as a part of the task. Functions can be passed positionally, and will be pushed to the engine with their __name__. Other objects can be passed by keyword arg.

Examples

In [1]: @ipp.require('numpy')
   ...: def norm(a):
   ...:     return numpy.linalg.norm(a,2)
In [2]: foo = lambda x: x*x
In [3]: @ipp.require(foo)
   ...: def bar(a):
   ...:     return foo(1-a)
@ipyparallel.depend(_wrapped_f, *args, **kwargs)#

Dependency decorator, for use with tasks.

@depend lets you define a function for engine dependencies just like you use apply for tasks.

Examples

@depend(df, a,b, c=5)
def f(m,n,p)

view.apply(f, 1,2,3)

will call df(a,b,c=5) on the engine, and if it returns False or raises an UnmetDependency error, then the task will not be run and another engine will be tried.

@ipyparallel.remote(view, block=None, **flags)#

Turn a function into a remote function.

This method can be used for map:

In [1]: @remote(view,block=True)
   ...: def func(a):
   ...:    pass
@ipyparallel.parallel(view, dist='b', block=None, ordered=True, **flags)#

Turn a function into a parallel remote function.

This method can be used for map:

In [1]: @parallel(view, block=True)
   ...: def func(a):
   ...:    pass

Exceptions#

exception ipyparallel.RemoteError(ename, evalue, traceback, engine_info=None)#

Error raised elsewhere

exception ipyparallel.CompositeError(message, elist)#

Error for representing possibly multiple errors on engines

exception ipyparallel.NoEnginesRegistered#

Exception for operations that require some engines, but none exist

exception ipyparallel.ImpossibleDependency#
exception ipyparallel.InvalidDependency#