More efficient broadcast of arrays with memmap#

Data movement is where IPython’s naive model suffers the most. But knowing about your cluster lets you make smarter decisions about data movement than a simple rc[:].push.

I ran this example with a cluster on a 64-core remote VM, so communication between the client and controller is over the public internet, while communication between the controller and engines is local.

This is an example of ‘broadcasting’ a numpy array using memmapped files, to reduce the amount of expensive network traffic when several engines are on the same host.

import socket
import os, sys, re

import numpy as np

import ipyparallel as ipp
rc = ipp.Client(profile="mpi")
eall = rc.broadcast_view(coalescing=True)

First, create a map of engine id to hostname

engine_hosts = eall.apply_async(socket.gethostname).get_dict()
engine_hosts
{0: 'ip-172-31-2-77',
 1: 'ip-172-31-2-77',
 2: 'ip-172-31-2-77',
 3: 'ip-172-31-2-77',
 4: 'ip-172-31-2-77',
 5: 'ip-172-31-2-77',
 6: 'ip-172-31-2-77',
 7: 'ip-172-31-2-77',
 8: 'ip-172-31-2-77',
 9: 'ip-172-31-2-77',
 10: 'ip-172-31-2-77',
 11: 'ip-172-31-2-77',
 12: 'ip-172-31-2-77',
 13: 'ip-172-31-2-77',
 14: 'ip-172-31-2-77',
 15: 'ip-172-31-2-77',
 16: 'ip-172-31-2-77',
 17: 'ip-172-31-2-77',
 18: 'ip-172-31-2-77',
 19: 'ip-172-31-2-77',
 20: 'ip-172-31-2-77',
 21: 'ip-172-31-2-77',
 22: 'ip-172-31-2-77',
 23: 'ip-172-31-2-77',
 24: 'ip-172-31-2-77',
 25: 'ip-172-31-2-77',
 26: 'ip-172-31-2-77',
 27: 'ip-172-31-2-77',
 28: 'ip-172-31-2-77',
 29: 'ip-172-31-2-77',
 30: 'ip-172-31-2-77',
 31: 'ip-172-31-2-77',
 32: 'ip-172-31-2-77',
 33: 'ip-172-31-2-77',
 34: 'ip-172-31-2-77',
 35: 'ip-172-31-2-77',
 36: 'ip-172-31-2-77',
 37: 'ip-172-31-2-77',
 38: 'ip-172-31-2-77',
 39: 'ip-172-31-2-77',
 40: 'ip-172-31-2-77',
 41: 'ip-172-31-2-77',
 42: 'ip-172-31-2-77',
 43: 'ip-172-31-2-77',
 44: 'ip-172-31-2-77',
 45: 'ip-172-31-2-77',
 46: 'ip-172-31-2-77',
 47: 'ip-172-31-2-77',
 48: 'ip-172-31-2-77',
 49: 'ip-172-31-2-77',
 50: 'ip-172-31-2-77',
 51: 'ip-172-31-2-77',
 52: 'ip-172-31-2-77',
 53: 'ip-172-31-2-77',
 54: 'ip-172-31-2-77',
 55: 'ip-172-31-2-77',
 56: 'ip-172-31-2-77',
 57: 'ip-172-31-2-77',
 58: 'ip-172-31-2-77',
 59: 'ip-172-31-2-77',
 60: 'ip-172-31-2-77',
 61: 'ip-172-31-2-77',
 62: 'ip-172-31-2-77',
 63: 'ip-172-31-2-77'}

Next, reverse that to create a map of hostname to engine ids

host_engines = {}

for eid, host in engine_hosts.items():
    if host not in host_engines:
        host_engines[host] = []
    host_engines[host].append(eid)

host_engines
{'ip-172-31-2-77': [0,
  1,
  2,
  3,
  4,
  5,
  6,
  7,
  8,
  9,
  10,
  11,
  12,
  13,
  14,
  15,
  16,
  17,
  18,
  19,
  20,
  21,
  22,
  23,
  24,
  25,
  26,
  27,
  28,
  29,
  30,
  31,
  32,
  33,
  34,
  35,
  36,
  37,
  38,
  39,
  40,
  41,
  42,
  43,
  44,
  45,
  46,
  47,
  48,
  49,
  50,
  51,
  52,
  53,
  54,
  55,
  56,
  57,
  58,
  59,
  60,
  61,
  62,
  63]}

Now we can measure our baseline overhead: how long does it take to roundrip an empty task on all engines. We shouldn’t expect anything to take less time than this.

%time _ = eall.apply_sync(lambda : None)
CPU times: user 165 ms, sys: 40.9 ms, total: 206 ms
Wall time: 1.06 s

Now let’s look at how long it takes to send data in the simplest possible way

import numpy as np
data = np.random.random((512, 512))
%%time
ar = rc[:].push({'data': data}, block=False)
ar.wait_interactive()
CPU times: user 601 ms, sys: 235 ms, total: 836 ms
Wall time: 14.6 s

Here we get to the broadcast implementation. Instead of seinding the array directly to every engine via IPP, we:

  1. lookup each engine’s host

  2. pick one engine on each host

  3. send the data to one engine per host

  4. on all engines, load the memmapped array from disk

This results in the same data to all engines, but only one send per remote host instead of per remote engine.

%px import numpy as np
@ipp.interactive
def array_to_file(A, name):
    """write an array to a temporary file, return its filename"""
    import tempfile
    with tempfile.NamedTemporaryFile(suffix='.np', delete=False) as tf:
        np.save(tf, A)
        data_path = tf.name
    if name:
        globals()[name] = data_path
    return data_path
@ipp.interactive
def load_memmap(name, path, mode='r+'):
    """load a file on disk into the interactive namespace as a memmapped array"""
    globals()[name] = np.memmap(path, mode=mode)
def bcast_memmap(data, name, client, host_engines):
    """broadcast a numpy array efficiently
    
    - sends data to each remote host only once
    - loads with memmap everywhere
    """

    # actually push the data, just once to each machine
    memmap_path_name = f"_bcast_array_{name}"
    
    one_per_host = rc.broadcast_view([engines[0] for engines in host_engines.values()], coalescing=True)
    send_ar = one_per_host.apply_async(array_to_file, data, name=memmap_path_name)
    
    # load the data on all engines into a memmapped array
    async_results = []
    e_all = rc.broadcast_view(coalescing=True)
    return e_all.apply_async(load_memmap, name, ipp.Reference(memmap_path_name))
%%time
ar = bcast_memmap(data, 'data', rc, host_engines)
ar.wait_interactive()
CPU times: user 237 ms, sys: 50.8 ms, total: 288 ms
Wall time: 1.65 s

So that’s a lot quicker! And a lot less memory used in both the client and the scheduler.

%px np.linalg.norm(data, 2)
Out[0:6]: 217636.91910151643
Out[1:6]: 255.8632962531706
Out[2:6]: 255.8632962531706
Out[3:6]: 255.8632962531706
Out[4:6]: 255.8632962531706
Out[5:6]: 255.8632962531706
Out[6:6]: 255.8632962531706
Out[7:6]: 255.8632962531706
Out[8:6]: 255.8632962531706
Out[9:6]: 255.8632962531706
Out[10:6]: 255.8632962531706
Out[11:6]: 255.8632962531706
Out[12:6]: 255.8632962531706
Out[13:6]: 255.8632962531706
Out[14:6]: 255.8632962531706
Out[15:6]: 255.8632962531706
Out[16:6]: 255.8632962531706
Out[17:6]: 255.8632962531706
Out[18:6]: 255.8632962531706
Out[19:6]: 255.8632962531706
Out[20:6]: 255.8632962531706
Out[21:6]: 255.8632962531706
Out[22:6]: 255.8632962531706
Out[23:6]: 255.8632962531706
Out[24:6]: 255.8632962531706
Out[25:6]: 255.8632962531706
Out[26:6]: 255.8632962531706
Out[27:6]: 255.8632962531706
Out[28:6]: 255.8632962531706
Out[29:6]: 255.8632962531706
Out[30:6]: 255.8632962531706
Out[31:6]: 255.8632962531706
Out[32:6]: 255.8632962531706
Out[33:6]: 255.8632962531706
Out[34:6]: 255.8632962531706
Out[35:6]: 255.8632962531706
Out[36:6]: 255.8632962531706
Out[37:6]: 255.8632962531706
Out[38:6]: 255.8632962531706
Out[39:6]: 255.8632962531706
Out[40:6]: 255.8632962531706
Out[41:6]: 255.8632962531706
Out[42:6]: 255.8632962531706
Out[43:6]: 255.8632962531706
Out[44:6]: 255.8632962531706
Out[45:6]: 255.8632962531706
Out[46:6]: 255.8632962531706
Out[47:6]: 255.8632962531706
Out[48:6]: 255.8632962531706
Out[49:6]: 255.8632962531706
Out[50:6]: 255.8632962531706
Out[51:6]: 255.8632962531706
Out[52:6]: 255.8632962531706
Out[53:6]: 255.8632962531706
Out[54:6]: 255.8632962531706
Out[55:6]: 255.8632962531706
Out[56:6]: 255.8632962531706
Out[57:6]: 255.8632962531706
Out[58:6]: 255.8632962531706
Out[59:6]: 255.8632962531706
Out[60:6]: 255.8632962531706
Out[61:6]: 255.8632962531706
Out[62:6]: 255.8632962531706
Out[63:6]: 255.8632962531706

You can also do the same thing with MPI.