Using dill to pickle anything¶
ipyparallel doesn’t do much in the way of serialization. It has custom zero-copy handling of numpy arrays, but other than that, it doesn’t do anything other than the bare minimum to make basic interactively defined functions and classes sendable.
There are a few projects that extend pickle to make just about anything sendable, and one of these is dill. Another is cloudpickle.
To install dill:
pip install dill
First, as always, we create a task function, this time with a closure
[1]:
def make_closure(a):
"""make a weird function with a closure on an open file, and return it"""
import os
f = open('/tmp/dilltest', 'a')
def has_closure(b):
product = a * b
f.write("%i: %g\n" % (os.getpid(), product))
f.flush()
return product
return has_closure
[2]:
!rm -f /tmp/dilltest
closed = make_closure(5)
[3]:
closed(2)
[3]:
10
[4]:
cat /tmp/dilltest
33018: 10
[5]:
import pickle
Without help, pickle can’t deal with closures
[6]:
pickle.dumps(closed)
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
/var/folders/qr/3vxfnp1x2t1fw55dr288mphc0000gn/T/ipykernel_33018/935663835.py in <module>
----> 1 pickle.dumps(closed)
AttributeError: Can't pickle local object 'make_closure.<locals>.has_closure'
But after we import dill, magic happens
[7]:
import dill
[8]:
dill.dumps(closed)[:64] + b'...'
[8]:
b'\x80\x04\x95\xea\x01\x00\x00\x00\x00\x00\x00\x8c\ndill._dill\x94\x8c\x10_create_function\x94\x93\x94(h\x00\x8c\x0c_create_code\x94\x93...'
So from now on, pretty much everything is pickleable.
Using dill in IPython Parallel¶
As usual, we start by creating our Client and View
[9]:
import ipyparallel as ipp
cluster = ipp.Cluster(n=2)
cluster.start_cluster_sync()
rc = cluster.connect_client_sync()
rc.wait_for_engines(n=2)
view = rc.load_balanced_view()
Using existing profile dir: '/Users/minrk/.ipython/profile_default'
Starting 2 engines with <class 'ipyparallel.cluster.launcher.LocalEngineSetLauncher'>
Now let’s try sending our function with a closure:
[10]:
view.apply_sync(closed, 3)
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
/var/folders/qr/3vxfnp1x2t1fw55dr288mphc0000gn/T/ipykernel_33018/939238112.py in <module>
----> 1 view.apply_sync(closed, 3)
~/dev/ip/parallel/ipyparallel/client/view.py in apply_sync(self, _View__ipp_f, *args, **kwargs)
233 returning the result.
234 """
--> 235 return self._really_apply(__ipp_f, args, kwargs, block=True)
236
237 # ----------------------------------------------------------------
~/conda/lib/python3.9/site-packages/decorator.py in fun(*args, **kw)
230 if not kwsyntax:
231 args, kw = fix(args, kw, sig)
--> 232 return caller(func, *(extras + args), **kw)
233 fun.__name__ = func.__name__
234 fun.__doc__ = func.__doc__
~/dev/ip/parallel/ipyparallel/client/view.py in sync_results(f, self, *args, **kwargs)
60 self._in_sync_results = True
61 try:
---> 62 ret = f(self, *args, **kwargs)
63 finally:
64 self._in_sync_results = False
~/conda/lib/python3.9/site-packages/decorator.py in fun(*args, **kw)
230 if not kwsyntax:
231 args, kw = fix(args, kw, sig)
--> 232 return caller(func, *(extras + args), **kw)
233 fun.__name__ = func.__name__
234 fun.__doc__ = func.__doc__
~/dev/ip/parallel/ipyparallel/client/view.py in save_ids(f, self, *args, **kwargs)
44 n_previous = len(self.client.history)
45 try:
---> 46 ret = f(self, *args, **kwargs)
47 finally:
48 nmsgs = len(self.client.history) - n_previous
~/dev/ip/parallel/ipyparallel/client/view.py in _really_apply(self, f, args, kwargs, block, track, after, follow, timeout, targets, retries)
1215 )
1216
-> 1217 future = self.client.send_apply_request(
1218 self._socket, f, args, kwargs, track=track, metadata=metadata
1219 )
~/dev/ip/parallel/ipyparallel/client/client.py in send_apply_request(self, socket, f, args, kwargs, metadata, track, ident, message_future_hook)
1771 raise TypeError("metadata must be dict, not %s" % type(metadata))
1772
-> 1773 bufs = serialize.pack_apply_message(
1774 f,
1775 args,
~/dev/ip/parallel/ipyparallel/serialize/serialize.py in pack_apply_message(f, args, kwargs, buffer_threshold, item_threshold)
186
187 info = dict(nargs=len(args), narg_bufs=len(arg_bufs), kw_keys=kw_keys)
--> 188 msg = serialize_object(f)
189 msg.append(pickle.dumps(info, PICKLE_PROTOCOL))
190 msg.extend(arg_bufs)
~/dev/ip/parallel/ipyparallel/serialize/serialize.py in serialize_object(obj, buffer_threshold, item_threshold)
117 buffers.extend(_extract_buffers(cobj, buffer_threshold))
118
--> 119 buffers.insert(0, pickle.dumps(cobj, PICKLE_PROTOCOL))
120 return buffers
121
TypeError: cannot pickle '_io.TextIOWrapper' object
Oops, no dice. For IPython to work with dill, there are one or two more steps. IPython will do these for you if you call DirectView.use_dill
:
[11]:
rc[:].use_dill()
[11]:
<AsyncResult: use_dill>
Now let’s try again
[12]:
view.apply_sync(closed, 3)
[12]:
15
[13]:
cat /tmp/dilltest
33018: 10
33046: 15
Yay! Now we can use dill to allow ipyparallel to send anything.
And that’s it! We can send closures, open file handles, and other previously non-pickleables to our engines.
Let’s give it a try now:
[14]:
remote_closure = view.apply_sync(make_closure, 4)
remote_closure(5)
[14]:
20
[15]:
cat /tmp/dilltest
33018: 10
33046: 15
33018: 20
But wait, there’s more!
At this point, we can send/recv all kinds of stuff
[16]:
def outer(a):
def inner(b):
def inner_again(c):
return c * b * a
return inner_again
return inner
So outer returns a function with a closure, which returns a function with a closure.
Now, we can resolve the first closure on the engine, the second here, and the third on a different engine, after passing through a lambda we define here and call there, just for good measure.
[17]:
view.apply_sync(lambda f: f(3),view.apply_sync(outer, 1)(2))
[17]:
6
And for good measure, let’s test that normal execution still works:
[18]:
%px foo = 5
print(rc[:]['foo'])
rc[:]['bar'] = lambda : 2 * foo
rc[:].apply_sync(ipp.Reference('bar'))
[5, 5]
[18]:
[10, 10]
And test that the @interactive
decorator works
[19]:
%%file testdill.py
import ipyparallel as ipp
@ipp.interactive
class C:
a = 5
@ipp.interactive
class D(C):
b = 10
@ipp.interactive
def foo(a):
return a * b
Overwriting testdill.py
[20]:
import testdill
[21]:
v = rc[-1]
v['D'] = testdill.D
d = v.apply_sync(lambda : D())
print(d.a, d.b)
5 10
[22]:
v['b'] = 10
v.apply_sync(testdill.foo, 5)
[22]:
50