Using dill to pickle anything#

ipyparallel doesn’t do much in the way of serialization. It has custom zero-copy handling of numpy arrays, but other than that, it doesn’t do anything other than the bare minimum to make basic interactively defined functions and classes sendable.

There are a few projects that extend pickle to make just about anything sendable, and one of these is dill. Another is cloudpickle.

To install dill:

pip install dill

First, as always, we create a task function, this time with a closure

def make_closure(a):
    """make a weird function with a closure on an open file, and return it"""
    import os

    f = open('/tmp/dilltest', 'a')

    def has_closure(b):
        product = a * b
        f.write("%i: %g\n" % (os.getpid(), product))
        f.flush()
        return product

    return has_closure
!rm -f /tmp/dilltest
closed = make_closure(5)
closed(2)
10
!cat /tmp/dilltest
25810: 10
import pickle

Without help, pickle can’t deal with closures

pickle.dumps(closed)
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
Cell In[6], line 1
----> 1 pickle.dumps(closed)

AttributeError: Can't pickle local object 'make_closure.<locals>.has_closure'

But after we import dill, magic happens

import dill
dill.dumps(closed)[:64] + b'...'
b'\x80\x04\x95E\x03\x00\x00\x00\x00\x00\x00\x8c\ndill._dill\x94\x8c\x10_create_function\x94\x93\x94(h\x00\x8c\x0c_create_code\x94\x93...'

So from now on, pretty much everything is pickleable.

Using dill in IPython Parallel#

As usual, we start by creating our Client and View

import ipyparallel as ipp

cluster = ipp.Cluster(n=2)
cluster.start_cluster_sync()
rc = cluster.connect_client_sync()
rc.wait_for_engines(n=2)
view = rc.load_balanced_view()
Starting 2 engines with <class 'ipyparallel.cluster.launcher.LocalEngineSetLauncher'>

Now let’s try sending our function with a closure:

view.apply_sync(closed, 3)
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Cell In[10], line 1
----> 1 view.apply_sync(closed, 3)

File ~/dev/ip/parallel/ipyparallel/client/view.py:228, in View.apply_sync(self, _View__ipp_f, *args, **kwargs)
    224 def apply_sync(self, __ipp_f, *args, **kwargs):
    225     """calls ``f(*args, **kwargs)`` on remote engines in a blocking manner,
    226     returning the result.
    227     """
--> 228     return self._really_apply(__ipp_f, args, kwargs, block=True)

File ~/.virtualenvs/ipp7-stable/lib/python3.11/site-packages/decorator.py:232, in decorate.<locals>.fun(*args, **kw)
    230 if not kwsyntax:
    231     args, kw = fix(args, kw, sig)
--> 232 return caller(func, *(extras + args), **kw)

File ~/dev/ip/parallel/ipyparallel/client/view.py:55, in sync_results(f, self, *args, **kwargs)
     53 self._in_sync_results = True
     54 try:
---> 55     ret = f(self, *args, **kwargs)
     56 finally:
     57     self._in_sync_results = False

File ~/.virtualenvs/ipp7-stable/lib/python3.11/site-packages/decorator.py:232, in decorate.<locals>.fun(*args, **kw)
    230 if not kwsyntax:
    231     args, kw = fix(args, kw, sig)
--> 232 return caller(func, *(extras + args), **kw)

File ~/dev/ip/parallel/ipyparallel/client/view.py:39, in save_ids(f, self, *args, **kwargs)
     37 n_previous = len(self.client.history)
     38 try:
---> 39     ret = f(self, *args, **kwargs)
     40 finally:
     41     nmsgs = len(self.client.history) - n_previous

File ~/dev/ip/parallel/ipyparallel/client/view.py:1364, in LoadBalancedView._really_apply(self, f, args, kwargs, block, track, after, follow, timeout, targets, retries)
   1359 follow = self._render_dependency(follow)
   1360 metadata = dict(
   1361     after=after, follow=follow, timeout=timeout, targets=idents, retries=retries
   1362 )
-> 1364 future = self.client.send_apply_request(
   1365     self._socket, f, args, kwargs, track=track, metadata=metadata
   1366 )
   1368 ar = AsyncResult(
   1369     self.client,
   1370     future,
   (...)
   1373     owner=True,
   1374 )
   1375 if block:

File ~/dev/ip/parallel/ipyparallel/client/client.py:1954, in Client.send_apply_request(self, socket, f, args, kwargs, metadata, track, ident, message_future_hook)
   1951 if not isinstance(metadata, dict):
   1952     raise TypeError(f"metadata must be dict, not {type(metadata)}")
-> 1954 bufs = serialize.pack_apply_message(
   1955     f,
   1956     args,
   1957     kwargs,
   1958     buffer_threshold=self.session.buffer_threshold,
   1959     item_threshold=self.session.item_threshold,
   1960 )
   1962 future = self._send(
   1963     socket,
   1964     "apply_request",
   (...)
   1970     message_future_hook=message_future_hook,
   1971 )
   1972 msg_id = future.msg_id

File ~/dev/ip/parallel/ipyparallel/serialize/serialize.py:182, in pack_apply_message(f, args, kwargs, buffer_threshold, item_threshold)
    174 kwarg_bufs = list(
    175     chain.from_iterable(
    176         serialize_object(kwargs[key], buffer_threshold, item_threshold)
    177         for key in kw_keys
    178     )
    179 )
    181 info = dict(nargs=len(args), narg_bufs=len(arg_bufs), kw_keys=kw_keys)
--> 182 msg = serialize_object(f)
    183 msg.append(pickle.dumps(info, PICKLE_PROTOCOL))
    184 msg.extend(arg_bufs)

File ~/dev/ip/parallel/ipyparallel/serialize/serialize.py:113, in serialize_object(obj, buffer_threshold, item_threshold)
    110     cobj = can(obj)
    111     buffers.extend(_extract_buffers(cobj, buffer_threshold))
--> 113 buffers.insert(0, pickle.dumps(cobj, PICKLE_PROTOCOL))
    114 return buffers

TypeError: cannot pickle '_io.TextIOWrapper' object

Oops, no dice. For IPython to work with dill, there are one or two more steps. IPython will do these for you if you call DirectView.use_dill:

rc[:].use_dill()
<AsyncResult(use_dill): pending>

Now let’s try again

view.apply_sync(closed, 3)
15
!cat /tmp/dilltest
25810: 10
26717: 15

Yay! Now we can use dill to allow ipyparallel to send anything.

And that’s it! We can send closures, open file handles, and other previously non-pickleables to our engines.

Let’s give it a try now:

remote_closure = view.apply_sync(make_closure, 4)
remote_closure(5)
20
!cat /tmp/dilltest
25810: 10
26717: 15
25810: 20

But wait, there’s more!

At this point, we can send/recv all kinds of stuff

def outer(a):
    def inner(b):
        def inner_again(c):
            return c * b * a

        return inner_again

    return inner

So outer returns a function with a closure, which returns a function with a closure.

Now, we can resolve the first closure on the engine, the second here, and the third on a different engine, after passing through a lambda we define here and call there, just for good measure.

view.apply_sync(lambda f: f(3), view.apply_sync(outer, 1)(2))
6

And for good measure, let’s test that normal execution still works:

%%px
foo = 5
print(rc[:]['foo'])
rc[:]['bar'] = lambda: 2 * foo
rc[:].apply_sync(ipp.Reference('bar'))
[5, 5]
[10, 10]

And test that the @interactive decorator works

%%file testdill.py
import ipyparallel as ipp

@ipp.interactive
class C:
    a = 5

@ipp.interactive
class D(C):
    b = 10

@ipp.interactive
def foo(a):
    return a * b
Writing testdill.py
import testdill
v = rc[-1]
v['D'] = testdill.D
d = v.apply_sync(lambda: D())  # noqa: F821
print(d.a, d.b)
5 10
v['b'] = 10
v.apply_sync(testdill.foo, 5)
50