ipyparallel

ipyparallel.version_info = (6, 2, 0)

tuple() -> empty tuple tuple(iterable) -> tuple initialized from iterable’s items

If the argument is a tuple, the return value is the same object. The IPython parallel version as a tuple of integers. There will always be 3 integers. Development releases will have ‘dev’ as a fourth element.

Classes

class ipyparallel.Client(url_file=None, profile=None, profile_dir=None, ipython_dir=None, context=None, debug=False, sshserver=None, sshkey=None, password=None, paramiko=None, timeout=10, cluster_id=None, **extra_args)

A semi-synchronous client to an IPython parallel cluster

Parameters:
  • url_file (str) – The path to ipcontroller-client.json. This JSON file should contain all the information needed to connect to a cluster, and is likely the only argument needed. Connection information for the Hub’s registration. If a json connector file is given, then likely no further configuration is necessary. [Default: use profile]
  • profile (bytes) – The name of the Cluster profile to be used to find connector information. If run from an IPython application, the default profile will be the same as the running application, otherwise it will be ‘default’.
  • cluster_id (str) – String id to added to runtime files, to prevent name collisions when using multiple clusters with a single profile simultaneously. When set, will look for files named like: ‘ipcontroller-<cluster_id>-client.json’ Since this is text inserted into filenames, typical recommendations apply: Simple character strings are ideal, and spaces are not recommended (but should generally work)
  • context (zmq.Context) – Pass an existing zmq.Context instance, otherwise the client will create its own.
  • debug (bool) – flag for lots of message printing for debug purposes
  • timeout (float) – time (in seconds) to wait for connection replies from the Hub [Default: 10]
Other Parameters:
 
  • sshserver (str) – A string of the form passed to ssh, i.e. ‘server.tld’ or ‘user@server.tld:port’ If keyfile or password is specified, and this is not, it will default to the ip given in addr.
  • sshkey (str; path to ssh private key file) – This specifies a key to be used in ssh login, default None. Regular default ssh keys will be used without specifying this argument.
  • password (str) – Your ssh password to sshserver. Note that if this is left None, you will be prompted for it if passwordless key based login is unavailable.
  • paramiko (bool) – flag for whether to use paramiko instead of shell ssh for tunneling. [default: True on win32, False else]
ids

list of int engine IDs – requesting the ids attribute always synchronizes the registration state. To request ids without synchronization, use semi-private _ids attributes.

history

list of msg_ids – a list of msg_ids, keeping track of all the execution messages you have submitted in order.

outstanding

set of msg_ids – a set of msg_ids that have been submitted, but whose results have not yet been received.

results

dict – a dict of all our results, keyed by msg_id

block

bool – determines default behavior when block not specified in execution methods

abort(jobs=None, targets=None, block=None)

Abort specific jobs from the execution queues of target(s).

This is a mechanism to prevent jobs that have already been submitted from executing.

Parameters:jobs (msg_id, list of msg_ids, or AsyncResult) –

The jobs to be aborted

If unspecified/None: abort all outstanding jobs.

activate(targets='all', suffix='')

Create a DirectView and register it with IPython magics

Defines the magics %px, %autopx, %pxresult, %%px

Parameters:
  • targets (int, list of ints, or 'all') – The engines on which the view’s magics will run
  • suffix (str [default: '']) –

    The suffix, if any, for the magics. This allows you to have multiple views associated with parallel magics at the same time.

    e.g. rc.activate(targets=0, suffix='0') will give you the magics %px0, %pxresult0, etc. for running magics just on engine 0.

become_dask(targets='all', port=0, nanny=False, scheduler_args=None, **worker_args)

Turn the IPython cluster into a dask.distributed cluster

Parameters:
  • targets (target spec (default: all)) – Which engines to turn into dask workers.
  • port (int (default: random)) – Which port
  • nanny (bool (default: False)) – Whether to start workers as subprocesses instead of in the engine process. Using a nanny allows restarting the worker processes via executor.restart.
  • scheduler_args (dict) – Keyword arguments (e.g. ip) to pass to the distributed.Scheduler constructor.
  • **worker_args – Any additional keyword arguments (e.g. ncores) are passed to the distributed.Worker constructor.
Returns:

A dask.distributed.Client connected to the dask cluster.

Return type:

client = distributed.Client

become_distributed(targets='all', port=0, nanny=False, scheduler_args=None, **worker_args)

Turn the IPython cluster into a dask.distributed cluster

Parameters:
  • targets (target spec (default: all)) – Which engines to turn into dask workers.
  • port (int (default: random)) – Which port
  • nanny (bool (default: False)) – Whether to start workers as subprocesses instead of in the engine process. Using a nanny allows restarting the worker processes via executor.restart.
  • scheduler_args (dict) – Keyword arguments (e.g. ip) to pass to the distributed.Scheduler constructor.
  • **worker_args – Any additional keyword arguments (e.g. ncores) are passed to the distributed.Worker constructor.
Returns:

A dask.distributed.Client connected to the dask cluster.

Return type:

client = distributed.Client

clear(targets=None, block=None)

Clear the namespace in target(s).

close(linger=None)

Close my zmq Sockets

If linger, set the zmq LINGER socket option, which allows discarding of messages.

db_query(query, keys=None)

Query the Hub’s TaskRecord database

This will return a list of task record dicts that match query

Parameters:
  • query (mongodb query dict) – The search dict. See mongodb query docs for details.
  • keys (list of strs [optional]) – The subset of keys to be returned. The default is to fetch everything but buffers. ‘msg_id’ will always be included.
direct_view(targets='all', **kwargs)

construct a DirectView object.

If no targets are specified, create a DirectView using all engines.

rc.direct_view(‘all’) is distinguished from rc[:] in that ‘all’ will evaluate the target engines at each execution, whereas rc[:] will connect to all current engines, and that list will not change.

That is, ‘all’ will always use all engines, whereas rc[:] will not use engines added after the DirectView is constructed.

Parameters:
  • targets (list,slice,int,etc. [default: use all engines]) – The engines to use for the View
  • kwargs (passed to DirectView) –
executor(targets=None)

Construct a PEP-3148 Executor with a LoadBalancedView

Parameters:targets (list,slice,int,etc. [default: use all engines]) – The subset of engines across which to load-balance execution
Returns:executor – The Executor object
Return type:Executor
get_result(indices_or_msg_ids=None, block=None, owner=True)

Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.

If the client already has the results, no request to the Hub will be made.

This is a convenient way to construct AsyncResult objects, which are wrappers that include metadata about execution, and allow for awaiting results that were not submitted by this Client.

It can also be a convenient way to retrieve the metadata associated with blocking execution, since it always retrieves

Examples

In [10]: r = client.apply()
Parameters:
  • indices_or_msg_ids (integer history index, str msg_id, AsyncResult,) – or a list of same. The indices or msg_ids of indices to be retrieved
  • block (bool) – Whether to wait for the result to be done
  • owner (bool [default: True]) – Whether this AsyncResult should own the result. If so, calling ar.get() will remove data from the client’s result and metadata cache. There should only be one owner of any given msg_id.
Returns:

  • AsyncResult – A single AsyncResult object will always be returned.
  • AsyncHubResult – A subclass of AsyncResult that retrieves results from the Hub

hub_history()

Get the Hub’s history

Just like the Client, the Hub has a history, which is a list of msg_ids. This will contain the history of all clients, and, depending on configuration, may contain history across multiple cluster sessions.

Any msg_id returned here is a valid argument to get_result.

Returns:msg_ids – list of all msg_ids, ordered by task submission time.
Return type:list of strs
ids

Always up-to-date ids property.

load_balanced_view(targets=None, **kwargs)

construct a DirectView object.

If no arguments are specified, create a LoadBalancedView using all engines.

Parameters:
  • targets (list,slice,int,etc. [default: use all engines]) – The subset of engines across which to load-balance execution
  • kwargs (passed to LoadBalancedView) –
purge_everything()

Clears all content from previous Tasks from both the hub and the local client

In addition to calling purge_results(“all”) it also deletes the history and other bookkeeping lists.

purge_hub_results(jobs=[], targets=[])

Tell the Hub to forget results.

Individual results can be purged by msg_id, or the entire history of specific targets can be purged.

Use purge_results(‘all’) to scrub everything from the Hub’s db.

Parameters:
  • jobs (str or list of str or AsyncResult objects) – the msg_ids whose results should be forgotten.
  • targets (int/str/list of ints/strs) –

    The targets, by int_id, whose entire history is to be purged.

    default : None

purge_local_results(jobs=[], targets=[])

Clears the client caches of results and their metadata.

Individual results can be purged by msg_id, or the entire history of specific targets can be purged.

Use purge_local_results(‘all’) to scrub everything from the Clients’s results and metadata caches.

After this call all AsyncResults are invalid and should be discarded.

If you must “reget” the results, you can still do so by using client.get_result(msg_id) or client.get_result(asyncresult). This will redownload the results from the hub if they are still available (i.e client.purge_hub_results(…) has not been called.

Parameters:
  • jobs (str or list of str or AsyncResult objects) – the msg_ids whose results should be purged.
  • targets (int/list of ints) – The engines, by integer ID, whose entire result histories are to be purged.
Raises:

RuntimeError : if any of the tasks to be purged are still outstanding.

purge_results(jobs=[], targets=[])

Clears the cached results from both the hub and the local client

Individual results can be purged by msg_id, or the entire history of specific targets can be purged.

Use purge_results(‘all’) to scrub every cached result from both the Hub’s and the Client’s db.

Equivalent to calling both purge_hub_results() and purge_client_results() with the same arguments.

Parameters:
  • jobs (str or list of str or AsyncResult objects) – the msg_ids whose results should be forgotten.
  • targets (int/str/list of ints/strs) –

    The targets, by int_id, whose entire history is to be purged.

    default : None

queue_status(targets='all', verbose=False)

Fetch the status of engine queues.

Parameters:
  • targets (int/str/list of ints/strs) – the engines whose states are to be queried. default : all
  • verbose (bool) – Whether to return lengths only, or lists of ids for each element
resubmit(indices_or_msg_ids=None, metadata=None, block=None)

Resubmit one or more tasks.

in-flight tasks may not be resubmitted.

Parameters:
  • indices_or_msg_ids (integer history index, str msg_id, or list of either) – The indices or msg_ids of indices to be retrieved
  • block (bool) – Whether to wait for the result to be done
Returns:

A subclass of AsyncResult that retrieves results from the Hub

Return type:

AsyncHubResult

result_status(msg_ids, status_only=True)

Check on the status of the result(s) of the apply request with msg_ids.

If status_only is False, then the actual results will be retrieved, else only the status of the results will be checked.

Parameters:
  • msg_ids (list of msg_ids) –
    if int:
    Passed as index to self.history for convenience.
  • status_only (bool (default: True)) –
    if False:
    Retrieve the actual results of completed tasks.
Returns:

results – There will always be the keys ‘pending’ and ‘completed’, which will be lists of msg_ids that are incomplete or complete. If status_only is False, then completed results will be keyed by their msg_id.

Return type:

dict

send_apply_request(socket, f, args=None, kwargs=None, metadata=None, track=False, ident=None)

construct and send an apply message via a socket.

This is the principal method with which all engine execution is performed by views.

send_execute_request(socket, code, silent=True, metadata=None, ident=None)

construct and send an execute request via a socket.

shutdown(targets='all', restart=False, hub=False, block=None)

Terminates one or more engine processes, optionally including the hub.

Parameters:
  • targets (list of ints or 'all' [default: all]) – Which engines to shutdown.
  • hub (bool [default: False]) – Whether to include the Hub. hub=True implies targets=’all’.
  • block (bool [default: self.block]) – Whether to wait for clean shutdown replies or not.
  • restart (bool [default: False]) – NOT IMPLEMENTED whether to restart engines after shutting them down.
spin()

DEPRECATED, DOES NOTHING

spin_thread(interval=1)

DEPRECATED, DOES NOTHING

stop_dask(targets='all')

Stop the distributed Scheduler and Workers started by become_dask.

Parameters:targets (target spec (default: all)) – Which engines to stop dask workers on.
stop_distributed(targets='all')

Stop the distributed Scheduler and Workers started by become_dask.

Parameters:targets (target spec (default: all)) – Which engines to stop dask workers on.
stop_spin_thread()

DEPRECATED, DOES NOTHING

wait(jobs=None, timeout=-1)

waits on one or more jobs, for up to timeout seconds.

Parameters:
  • jobs (int, str, or list of ints and/or strs, or one or more AsyncResult objects) – ints are indices to self.history strs are msg_ids default: wait on all outstanding messages
  • timeout (float) – a time in seconds, after which to give up. default is -1, which means no timeout
Returns:

  • True (when all msg_ids are done)
  • False (timeout reached, some msg_ids still outstanding)

wait_interactive(jobs=None, interval=1.0, timeout=-1.0)

Wait interactively for jobs

If no job is specified, will wait for all outstanding jobs to complete.

class ipyparallel.DirectView(client=None, socket=None, targets=None)

Direct Multiplexer View of one or more engines.

These are created via indexed access to a client:

>>> dv_1 = client[1]
>>> dv_all = client[:]
>>> dv_even = client[::2]
>>> dv_some = client[1:3]

This object provides dictionary access to engine namespaces:

# push a=5: >>> dv[‘a’] = 5 # pull ‘foo’: >>> dv[‘foo’]

activate(suffix='')

Activate IPython magics associated with this View

Defines the magics %px, %autopx, %pxresult, %%px, %pxconfig

Parameters:suffix (str [default: '']) –

The suffix, if any, for the magics. This allows you to have multiple views associated with parallel magics at the same time.

e.g. rc[::2].activate(suffix='_even') will give you the magics %px_even, %pxresult_even, etc. for running magics on the even engines.

clear(targets=None, block=None)

Clear the remote namespaces on my engines.

execute(code, silent=True, targets=None, block=None)

Executes code on targets in blocking or nonblocking manner.

execute is always bound (affects engine namespace)

Parameters:
  • code (str) – the code string to be executed
  • block (bool) – whether or not to wait until done to return default: self.block
gather(key, dist='b', targets=None, block=None)

Gather a partitioned sequence on a set of engines as a single local seq.

get(key_s)

get object(s) by key_s from remote namespace

see pull for details.

importer

sync_imports(local=True) as a property.

See sync_imports for details.

map(f, *sequences, **kwargs)

view.map(f, *sequences, block=self.block) => list|AsyncMapResult

Parallel version of builtin map, using this View’s targets.

There will be one task per target, so work will be chunked if the sequences are longer than targets.

Results can be iterated as they are ready, but will become available in chunks.

Parameters:
  • f (callable) – function to be mapped
  • *sequences (one or more sequences of matching length) – the sequences to be distributed and passed to f
  • block (bool) – whether to wait for the result or not [default self.block]
Returns:

  • If block=False – An AsyncMapResult instance. An object like AsyncResult, but which reassembles the sequence of results into a single list. AsyncMapResults can be iterated through before all results are complete.
  • else – A list, the result of map(f,*sequences)

pull(names, targets=None, block=None)

get object(s) by name from remote namespace

will return one object if it is a key. can also take a list of keys, in which case it will return a list of objects.

push(ns, targets=None, block=None, track=None)

update remote namespace with dict ns

Parameters:
  • ns (dict) – dict of keys with which to update engine namespace(s)
  • block (bool [default : self.block]) – whether to wait to be notified of engine receipt
run(filename, targets=None, block=None)

Execute contents of filename on my engine(s).

This simply reads the contents of the file and calls execute.

Parameters:
  • filename (str) – The path to the file
  • targets (int/str/list of ints/strs) – the engines on which to execute default : all
  • block (bool) – whether or not to wait until done default: self.block
scatter(key, seq, dist='b', flatten=False, targets=None, block=None, track=None)

Partition a Python sequence and send the partitions to a set of engines.

sync_imports(local=True, quiet=False)

Context Manager for performing simultaneous local and remote imports.

‘import x as y’ will not work. The ‘as y’ part will simply be ignored.

If local=True, then the package will also be imported locally.

If quiet=True, no output will be produced when attempting remote imports.

Note that remote-only (local=False) imports have not been implemented.

>>> with view.sync_imports():
...    from numpy import recarray
importing recarray from numpy on engine(s)
update(ns)

update remote namespace with dict ns

See push for details.

use_cloudpickle()

Expand serialization support with cloudpickle.

This calls ipyparallel.serialize.use_cloudpickle() here and on each engine.

use_dill()

Expand serialization support with dill

adds support for closures, etc.

This calls ipyparallel.serialize.use_dill() here and on each engine.

use_pickle()

Restore

This reverts changes to serialization caused by use_dill|.cloudpickle.

class ipyparallel.LoadBalancedView(client=None, socket=None, **flags)

An load-balancing View that only executes via the Task scheduler.

Load-balanced views can be created with the client’s view method:

>>> v = client.load_balanced_view()

or targets can be specified, to restrict the potential destinations:

>>> v = client.load_balanced_view([1,3])

which would restrict loadbalancing to between engines 1 and 3.

map(f, *sequences, **kwargs)

view.map(f, *sequences, block=self.block, chunksize=1, ordered=True) => list|AsyncMapResult

Parallel version of builtin map, load-balanced by this View.

block, and chunksize can be specified by keyword only.

Each chunksize elements will be a separate task, and will be load-balanced. This lets individual elements be available for iteration as soon as they arrive.

Parameters:
  • f (callable) – function to be mapped
  • *sequences (one or more sequences of matching length) – the sequences to be distributed and passed to f
  • block (bool [default self.block]) – whether to wait for the result or not
  • track (bool) – whether to create a MessageTracker to allow the user to safely edit after arrays and buffers during non-copying sends.
  • chunksize (int [default 1]) – how many elements should be in each task.
  • ordered (bool [default True]) –

    Whether the results should be gathered as they arrive, or enforce the order of submission.

    Only applies when iterating through AsyncMapResult as results arrive. Has no effect when block=True.

Returns:

  • if block=False – An AsyncMapResult instance. An object like AsyncResult, but which reassembles the sequence of results into a single list. AsyncMapResults can be iterated through before all results are complete.
  • else – A list, the result of map(f,*sequences)

register_joblib_backend(name='ipyparallel', make_default=False)

Register this View as a joblib parallel backend

To make this the default backend, set make_default=True.

Use with:

p = Parallel(backend='ipyparallel')
...

See joblib docs for details

Requires joblib >= 0.10

New in version 5.1.

set_flags(**kwargs)

set my attribute flags by keyword.

A View is a wrapper for the Client’s apply method, but with attributes that specify keyword arguments, those attributes can be set by keyword argument with this method.

Parameters:
  • block (bool) – whether to wait for results
  • track (bool) – whether to create a MessageTracker to allow the user to safely edit after arrays and buffers during non-copying sends.
  • after (Dependency or collection of msg_ids) – Only for load-balanced execution (targets=None) Specify a list of msg_ids as a time-based dependency. This job will only be run after the dependencies have been met.
  • follow (Dependency or collection of msg_ids) – Only for load-balanced execution (targets=None) Specify a list of msg_ids as a location-based dependency. This job will only be run on an engine where this dependency is met.
  • timeout (float/int or None) – Only for load-balanced execution (targets=None) Specify an amount of time (in seconds) for the scheduler to wait for dependencies to be met before failing with a DependencyTimeout.
  • retries (int) – Number of times a task will be retried on failure.
class ipyparallel.ViewExecutor(view)

A PEP-3148 Executor API for Views

Access as view.executor

map(func, *iterables, **kwargs)

Return generator for View.map_async

shutdown(wait=True)

ViewExecutor does not shutdown engines

results are awaited if wait=True, but engines are not shutdown.

submit(fn, *args, **kwargs)

Same as View.apply_async

Decorators

IPython parallel provides some decorators to assist in using your functions as tasks.

ipyparallel.interactive(f)

decorator for making functions appear as interactively defined. This results in the function being linked to the user_ns as globals() instead of the module globals().

ipyparallel.require(*objects, **mapping)

Simple decorator for requiring local objects and modules to be available when the decorated function is called on the engine.

Modules specified by name or passed directly will be imported prior to calling the decorated function.

Objects other than modules will be pushed as a part of the task. Functions can be passed positionally, and will be pushed to the engine with their __name__. Other objects can be passed by keyword arg.

Examples

In [1]: @ipp.require('numpy')
   ...: def norm(a):
   ...:     return numpy.linalg.norm(a,2)
In [2]: foo = lambda x: x*x
In [3]: @ipp.require(foo)
   ...: def bar(a):
   ...:     return foo(1-a)
ipyparallel.depend(_wrapped_f, *args, **kwargs)

Dependency decorator, for use with tasks.

@depend lets you define a function for engine dependencies just like you use apply for tasks.

Examples

@depend(df, a,b, c=5)
def f(m,n,p)

view.apply(f, 1,2,3)

will call df(a,b,c=5) on the engine, and if it returns False or raises an UnmetDependency error, then the task will not be run and another engine will be tried.

ipyparallel.remote(view, block=None, **flags)

Turn a function into a remote function.

This method can be used for map:

In [1]: @remote(view,block=True)
   ...: def func(a):
   ...:    pass
ipyparallel.parallel(view, dist='b', block=None, ordered=True, **flags)

Turn a function into a parallel remote function.

This method can be used for map:

In [1]: @parallel(view, block=True)
   ...: def func(a):
   ...:    pass

Exceptions

exception ipyparallel.RemoteError(ename, evalue, traceback, engine_info=None)

Error raised elsewhere

exception ipyparallel.CompositeError(message, elist)

Error for representing possibly multiple errors on engines

exception ipyparallel.NoEnginesRegistered
exception ipyparallel.ImpossibleDependency
exception ipyparallel.InvalidDependency