Using dill to pickle anything#

ipyparallel doesn’t do much in the way of serialization. It has custom zero-copy handling of numpy arrays, but other than that, it doesn’t do anything other than the bare minimum to make basic interactively defined functions and classes sendable.

There are a few projects that extend pickle to make just about anything sendable, and one of these is dill. Another is cloudpickle.

To install dill:

pip install dill

First, as always, we create a task function, this time with a closure

def make_closure(a):
    """make a weird function with a closure on an open file, and return it"""
    import os
    f = open('/tmp/dilltest', 'a')
    def has_closure(b):
        product = a * b
        f.write("%i: %g\n" % (os.getpid(), product))
        return product
    return has_closure
!rm -f /tmp/dilltest
closed = make_closure(5)
cat /tmp/dilltest
33018: 10
import pickle

Without help, pickle can’t deal with closures

AttributeError                            Traceback (most recent call last)
/var/folders/qr/3vxfnp1x2t1fw55dr288mphc0000gn/T/ipykernel_33018/ in <module>
----> 1 pickle.dumps(closed)

AttributeError: Can't pickle local object 'make_closure.<locals>.has_closure'

But after we import dill, magic happens

import dill
dill.dumps(closed)[:64] + b'...'

So from now on, pretty much everything is pickleable.

Using dill in IPython Parallel#

As usual, we start by creating our Client and View

import ipyparallel as ipp

cluster = ipp.Cluster(n=2)
rc = cluster.connect_client_sync()
view = rc.load_balanced_view()
Using existing profile dir: '/Users/minrk/.ipython/profile_default'
Starting 2 engines with <class 'ipyparallel.cluster.launcher.LocalEngineSetLauncher'>

Now let’s try sending our function with a closure:

view.apply_sync(closed, 3)
TypeError                                 Traceback (most recent call last)
/var/folders/qr/3vxfnp1x2t1fw55dr288mphc0000gn/T/ipykernel_33018/ in <module>
----> 1 view.apply_sync(closed, 3)

~/dev/ip/parallel/ipyparallel/client/ in apply_sync(self, _View__ipp_f, *args, **kwargs)
    233         returning the result.
    234         """
--> 235         return self._really_apply(__ipp_f, args, kwargs, block=True)
    237     # ----------------------------------------------------------------

~/conda/lib/python3.9/site-packages/ in fun(*args, **kw)
    230             if not kwsyntax:
    231                 args, kw = fix(args, kw, sig)
--> 232             return caller(func, *(extras + args), **kw)
    233     fun.__name__ = func.__name__
    234     fun.__doc__ = func.__doc__

~/dev/ip/parallel/ipyparallel/client/ in sync_results(f, self, *args, **kwargs)
     60     self._in_sync_results = True
     61     try:
---> 62         ret = f(self, *args, **kwargs)
     63     finally:
     64         self._in_sync_results = False

~/conda/lib/python3.9/site-packages/ in fun(*args, **kw)
    230             if not kwsyntax:
    231                 args, kw = fix(args, kw, sig)
--> 232             return caller(func, *(extras + args), **kw)
    233     fun.__name__ = func.__name__
    234     fun.__doc__ = func.__doc__

~/dev/ip/parallel/ipyparallel/client/ in save_ids(f, self, *args, **kwargs)
     44     n_previous = len(self.client.history)
     45     try:
---> 46         ret = f(self, *args, **kwargs)
     47     finally:
     48         nmsgs = len(self.client.history) - n_previous

~/dev/ip/parallel/ipyparallel/client/ in _really_apply(self, f, args, kwargs, block, track, after, follow, timeout, targets, retries)
   1215         )
-> 1217         future = self.client.send_apply_request(
   1218             self._socket, f, args, kwargs, track=track, metadata=metadata
   1219         )

~/dev/ip/parallel/ipyparallel/client/ in send_apply_request(self, socket, f, args, kwargs, metadata, track, ident, message_future_hook)
   1771             raise TypeError("metadata must be dict, not %s" % type(metadata))
-> 1773         bufs = serialize.pack_apply_message(
   1774             f,
   1775             args,

~/dev/ip/parallel/ipyparallel/serialize/ in pack_apply_message(f, args, kwargs, buffer_threshold, item_threshold)
    187     info = dict(nargs=len(args), narg_bufs=len(arg_bufs), kw_keys=kw_keys)
--> 188     msg = serialize_object(f)
    189     msg.append(pickle.dumps(info, PICKLE_PROTOCOL))
    190     msg.extend(arg_bufs)

~/dev/ip/parallel/ipyparallel/serialize/ in serialize_object(obj, buffer_threshold, item_threshold)
    117         buffers.extend(_extract_buffers(cobj, buffer_threshold))
--> 119     buffers.insert(0, pickle.dumps(cobj, PICKLE_PROTOCOL))
    120     return buffers

TypeError: cannot pickle '_io.TextIOWrapper' object

Oops, no dice. For IPython to work with dill, there are one or two more steps. IPython will do these for you if you call DirectView.use_dill:

<AsyncResult: use_dill>

Now let’s try again

view.apply_sync(closed, 3)
cat /tmp/dilltest
33018: 10
33046: 15

Yay! Now we can use dill to allow ipyparallel to send anything.

And that’s it! We can send closures, open file handles, and other previously non-pickleables to our engines.

Let’s give it a try now:

remote_closure = view.apply_sync(make_closure, 4)
cat /tmp/dilltest
33018: 10
33046: 15
33018: 20

But wait, there’s more!

At this point, we can send/recv all kinds of stuff

def outer(a):
    def inner(b):
        def inner_again(c):
            return c * b * a
        return inner_again
    return inner

So outer returns a function with a closure, which returns a function with a closure.

Now, we can resolve the first closure on the engine, the second here, and the third on a different engine, after passing through a lambda we define here and call there, just for good measure.

view.apply_sync(lambda f: f(3),view.apply_sync(outer, 1)(2))

And for good measure, let’s test that normal execution still works:

%px foo = 5

rc[:]['bar'] = lambda : 2 * foo
[5, 5]
[10, 10]

And test that the @interactive decorator works

import ipyparallel as ipp

class C:
    a = 5

class D(C):
    b = 10

def foo(a):
    return a * b
import testdill
v = rc[-1]
v['D'] = testdill.D
d = v.apply_sync(lambda : D())
print(d.a, d.b)
5 10
v['b'] = 10
v.apply_sync(, 5)