Using dill to pickle anything

ipyparallel doesn’t do much in the way of serialization. It has custom zero-copy handling of numpy arrays, but other than that, it doesn’t do anything other than the bare minimum to make basic interactively defined functions and classes sendable.

There are a few projects that extend pickle to make just about anything sendable, and one of these is dill.

To install dill:

pip install dill

First, as always, we create a task function, this time with a closure

[1]:
def make_closure(a):
    """make a weird function with a closure on an open file, and return it"""
    import os
    f = open('/tmp/dilltest', 'a')
    def has_closure(b):
        product = a * b
        f.write("%i: %g\n" % (os.getpid(), product))
        f.flush()
        return product
    return has_closure
[2]:
!rm -f /tmp/dilltest
closed = make_closure(5)
[3]:
closed(2)
[3]:
10
[4]:
cat /tmp/dilltest
74025: 10
[5]:
import pickle

Without help, pickle can’t deal with closures

[6]:
pickle.dumps(closed)
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-6-0f1f376cfea0> in <module>()
----> 1 pickle.dumps(closed)

AttributeError: Can't pickle local object 'make_closure.<locals>.has_closure'

But after we import dill, magic happens

[7]:
import dill
[8]:
dill.dumps(closed)[:64] + b'...'
[8]:
b'\x80\x03cdill.dill\n_create_function\nq\x00(cdill.dill\n_load_type\nq\x01X\x08\x00\x00\x00Co...'

So from now on, pretty much everything is pickleable.

Now use this in ipyparallel

As usual, we start by creating our Client and View

[9]:
import ipyparallel as parallel
rc = parallel.Client()
view = rc.load_balanced_view()

Now let’s try sending our function with a closure:

[10]:
view.apply_sync(closed, 3)
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-10-23a646829fdc> in <module>()
----> 1 view.apply_sync(closed, 3)

/Users/benjaminrk/dev/ip/parallel/ipyparallel/client/view.py in apply_sync(self, f, *args, **kwargs)
    224          returning the result.
    225         """
--> 226         return self._really_apply(f, args, kwargs, block=True)
    227
    228     #----------------------------------------------------------------

<decorator-gen-148> in _really_apply(self, f, args, kwargs, block, track, after, follow, timeout, targets, retries)

/Users/benjaminrk/dev/ip/parallel/ipyparallel/client/view.py in sync_results(f, self, *args, **kwargs)
     50     self._in_sync_results = True
     51     try:
---> 52         ret = f(self, *args, **kwargs)
     53     finally:
     54         self._in_sync_results = False

<decorator-gen-147> in _really_apply(self, f, args, kwargs, block, track, after, follow, timeout, targets, retries)

/Users/benjaminrk/dev/ip/parallel/ipyparallel/client/view.py in save_ids(f, self, *args, **kwargs)
     35     n_previous = len(self.client.history)
     36     try:
---> 37         ret = f(self, *args, **kwargs)
     38     finally:
     39         nmsgs = len(self.client.history) - n_previous

/Users/benjaminrk/dev/ip/parallel/ipyparallel/client/view.py in _really_apply(self, f, args, kwargs, block, track, after, follow, timeout, targets, retries)
   1030
   1031         future = self.client.send_apply_request(self._socket, f, args, kwargs, track=track,
-> 1032                                 metadata=metadata)
   1033
   1034         ar = AsyncResult(self.client, future, fname=getname(f),

/Users/benjaminrk/dev/ip/parallel/ipyparallel/client/client.py in send_apply_request(self, socket, f, args, kwargs, metadata, track, ident)
   1300         bufs = serialize.pack_apply_message(f, args, kwargs,
   1301             buffer_threshold=self.session.buffer_threshold,
-> 1302             item_threshold=self.session.item_threshold,
   1303         )
   1304

/Users/benjaminrk/dev/ip/parallel/ipyparallel/serialize/serialize.py in pack_apply_message(f, args, kwargs, buffer_threshold, item_threshold)
    150     info = dict(nargs=len(args), narg_bufs=len(arg_bufs), kw_keys=kw_keys)
    151
--> 152     msg = [pickle.dumps(can(f), PICKLE_PROTOCOL)]
    153     msg.append(pickle.dumps(info, PICKLE_PROTOCOL))
    154     msg.extend(arg_bufs)

TypeError: cannot serialize '_io.TextIOWrapper' object

Oops, no dice. For IPython to work with dill, there are one or two more steps. IPython will do these for you if you call DirectView.use_dill:

[11]:
rc[:].use_dill()
<module 'ipyparallel.serialize' from '/Users/benjaminrk/dev/ip/parallel/ipyparallel/serialize/__init__.py'>
[11]:
<AsyncResult: use_dill>

This is equivalent to

from ipyparallel.canning import use_dill
use_dill()
rc[:].apply(use_dill)

Now let’s try again

[12]:
view.apply_sync(closed, 3)
[12]:
15
[13]:
cat /tmp/dilltest
74025: 10
74113: 15

Yay! Now we can use dill to allow ipyparallel to send anything.

And that’s it! We can send closures, open file handles, and other previously non-pickleables to our engines.

Let’s give it a try now:

[14]:
remote_closure = view.apply_sync(make_closure, 4)
remote_closure(5)
[14]:
20
[15]:
cat /tmp/dilltest
74025: 10
74113: 15
74025: 20

But wait, there’s more!

At this point, we can send/recv all kinds of stuff

[16]:
def outer(a):
    def inner(b):
        def inner_again(c):
            return c * b * a
        return inner_again
    return inner

So outer returns a function with a closure, which returns a function with a closure.

Now, we can resolve the first closure on the engine, the second here, and the third on a different engine, after passing through a lambda we define here and call there, just for good measure.

[17]:
view.apply_sync(lambda f: f(3),view.apply_sync(outer, 1)(2))
[17]:
6

And for good measure, let’s test that normal execution still works:

[18]:
%px foo = 5

print(rc[:]['foo'])
rc[:]['bar'] = lambda : 2 * foo
rc[:].apply_sync(parallel.Reference('bar'))
[5, 5, 5, 5, 5, 5, 5, 5]
[18]:
[10, 10, 10, 10, 10, 10, 10, 10]

And test that the @interactive decorator works

[19]:
%%file testdill.py
import ipyparallel as ipp

@interactive
class C(object):
    a = 5

@ipp.interactive
class D(C):
    b = 10

@interactive
def foo(a):
    return a * b

Overwriting testdill.py
[20]:
import testdill
[21]:
v = rc[-1]
v['D'] = testdill.D
d = v.apply_sync(lambda : D())
print(d.a, d.b)
5 10
[22]:
v['b'] = 10
v.apply_sync(testdill.foo, 5)
[22]:
50