{ "cells": [ { "cell_type": "markdown", "id": "b9dd40a2-26ae-48b7-8dc7-3b4995b2b6e6", "metadata": {}, "source": [ "# Broadcast View\n", "\n", "IPython Parallel is built on the idea of working with N remote IPython instances.\n", "This means that most of the 'parallelism' when working with N engines\n", "via the DirectView is really implemented in the client.\n", "\n", "DirectView methods are all implemented as:\n", "\n", "```python\n", "for engine in engines:\n", " do_something_on(engine)\n", "```\n", "\n", "which means that all operations are at least O(N) *at the client*.\n", "This can be really inefficient when the client is telling all the engines to do the same thing,\n", "*especially* when the client is across the Internet from the cluster.\n", "\n", "IPython 7 includes a new, experimental *broadcast* scheduler that implements the same message pattern,\n", "but sending a request to any number of engines is O(1) in the client.\n", "It is the *scheduler* that implements the fan-out to engines.\n", "\n", "With the broadcast view, a client always sends one message to the root node of the scheduler.\n", "The scheduler then sends one message to each of its two branches,\n", "and so on until it reaches the leaves of the tree,\n", "at which point each leaf sends $\\frac{N}{2^{depth}}$ messages to its assigned engines.\n", "\n", "The performance should be strictly better with a depth of 0 (one scheduler node),\n", "network-wise,\n", "though the Python implementation of the scheduler may have a cost relative to the pure-C default DirectView scheduler. \n", "\n", "\n", "This approach allows deplyments to exchange single-message latency (due to increased depth) for parallelism (due to concurrent sends at the leaves).\n", "\n", "This trade-off increases the baseline cost for small numbers of engines,\n", "while dramatically improving the total time to deliver messages to large numbers of engines,\n", "especially when they contain a lot of data.\n", "\n", "These tests were run with a depth of 3 (8 leaf nodes)" ] }, { "cell_type": "code", "execution_count": 1, "id": "0e46054d-5082-40d9-ace3-413cef646622", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "/Users/minrk/dev/ip/parallel/ipyparallel/util.py:205: RuntimeWarning: IPython could not determine IPs for ip-172-31-2-77: [Errno 8] nodename nor servname provided, or not known\n", " warnings.warn(\n" ] } ], "source": [ "import ipyparallel as ipp\n", "\n", "rc = ipp.Client(profile=\"mpi\")\n", "rc.wait_for_engines(100)\n", "direct_view = rc.direct_view()\n", "bcast_view = rc.broadcast_view()" ] }, { "cell_type": "code", "execution_count": 2, "id": "339a80ab-bbeb-423c-abb3-6d429377017d", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "I have 100 engines\n" ] } ], "source": [ "print(f\"I have {len(rc)} engines\")" ] }, { "cell_type": "markdown", "id": "ff036951-029a-486e-ada9-ec0ee5edd3bc", "metadata": {}, "source": [ "To test baseline latency, we can run an empty `apply` task with both views:" ] }, { "cell_type": "code", "execution_count": 3, "id": "9c30c899-68fd-470d-80ec-c10e724bd27a", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "311 ms ± 67.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)\n" ] } ], "source": [ "%timeit direct_view.apply_sync(lambda: None)" ] }, { "cell_type": "code", "execution_count": 4, "id": "ee82f5fa-a2a1-4b72-ad1f-f36427168317", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "259 ms ± 14.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)\n" ] } ], "source": [ "%timeit bcast_view.apply_sync(lambda: None)" ] }, { "cell_type": "markdown", "id": "b4f994af-21b5-440e-b0b3-97e3abd73493", "metadata": {}, "source": [ "With 100 engines, it takes noticeably longer to complete on the direct view as the new broadcast view.\n", "\n", "Sending the same data to engines is probably the pattern that most suffers from this design,\n", "because identical data is duplicated *once per engine* in the client.\n", "\n", "We can compare `view.push` with increasing array sizes:" ] }, { "cell_type": "code", "execution_count": 5, "id": "e5f49dd7-e903-4126-888c-0130261e5fd7", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ " 16 x 16: 2 kB\n", "Direct view\n", "287 ms ± 72 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)\n", "Broadcast view\n", "266 ms ± 10.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)\n", " 64 x 64: 32 kB\n", "Direct view\n", "598 ms ± 62.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)\n", "Broadcast view\n", "274 ms ± 30.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)\n", " 256 x 256: 512 kB\n", "Direct view\n", "5.99 s ± 199 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)\n", "Broadcast view\n", "297 ms ± 43.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)\n", " 512 x 512: 2 MB\n", "Direct view\n", "22.9 s ± 428 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)\n", "Broadcast view\n", "559 ms ± 107 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)\n" ] } ], "source": [ "import numpy as np\n", "\n", "dview_times = []\n", "bcast_times = []\n", "\n", "n_list = [16, 64, 256, 512]\n", "for n in n_list:\n", " chunk = np.random.random((n, n))\n", " b = chunk.nbytes\n", " unit = 'B'\n", " if b > 1024:\n", " b //= 1024\n", " unit = 'kB'\n", " if b > 1024:\n", " b //= 1024\n", " unit = 'MB'\n", " print(f\"{n:4} x {n:4}: {b:4} {unit}\")\n", " print(\"Direct view\")\n", " tr = %timeit -o -n 1 direct_view.push({\"chunk\": chunk}, block=True)\n", " dview_times.append(tr.average)\n", " print(\"Broadcast view\")\n", " tr = %timeit -o -n 1 bcast_view.push({\"chunk\": chunk}, block=True)\n", " bcast_times.append(tr.average)" ] }, { "cell_type": "code", "execution_count": 7, "id": "8926cf3e-a247-413b-be61-7a7db73437e9", "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", " | n | \n", "direct view | \n", "broadcast | \n", "broadcast speedup | \n", "MB | \n", "
---|---|---|---|---|---|
0 | \n", "16 | \n", "0.287495 | \n", "0.265786 | \n", "1.081679 | \n", "0.195312 | \n", "
1 | \n", "64 | \n", "0.598065 | \n", "0.273835 | \n", "2.184031 | \n", "3.125000 | \n", "
2 | \n", "256 | \n", "5.986274 | \n", "0.297034 | \n", "20.153521 | \n", "50.000000 | \n", "
3 | \n", "512 | \n", "22.946806 | \n", "0.558653 | \n", "41.075245 | \n", "200.000000 | \n", "