{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Working with IPython and dask.distributed\n", "\n", "[dask.distributed](https://distributed.readthedocs.io) is a cool library for doing distributed execution. You should check it out, if you haven't already.\n", "\n", "In many cases, dask.distributed should replace using IPython Parallel if you primarily use the LoadBalancedView.\n", "\n", "However, you may already have infrastructure for deploying and managing IPython engines,\n", "and IPython Parallel's interactive debugging features can still be useful.\n", "\n", "Any IPython cluster can *become* a dask cluster at any time,\n", "and be used simultaneously via both APIs." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "You can turn your IPython cluster into a distributed cluster by calling `Client.become_dask()`:" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Using existing profile dir: '/Users/minrk/.ipython/profile_default'\n", "Starting 4 engines with \n" ] }, { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "12bcc9bb5f5849b281bc6b305c5cfe7d", "version_major": 2, "version_minor": 0 }, "text/plain": [ " 0%| | 0/4 [00:00\n", "
\n", "
\n", "

Client

\n", "

Client-4de93880-0b0d-11ec-b993-784f4385c030

\n", " \n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", "
Connection method: Direct
\n", " Dashboard: http://192.168.1.31:53263/status\n", "
\n", "\n", " \n", "
\n", "

Scheduler Info

\n", "
\n", "
\n", "
\n", "
\n", "

Scheduler

\n", "

Scheduler-faed09d3-e79f-4698-9162-b38cece51ddf

\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
\n", " Comm: tcp://192.168.1.31:53264\n", " \n", " Workers: 2\n", "
\n", " Dashboard: http://192.168.1.31:53263/status\n", " \n", " Total threads: 2\n", "
\n", " Started: Just now\n", " \n", " Total memory: 8.00 GiB\n", "
\n", "
\n", "
\n", "\n", "
\n", " \n", "

Workers

\n", "
\n", "\n", " \n", "
\n", "
\n", "
\n", "
\n", " \n", "

Worker: tcp://192.168.1.31:53626

\n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", "
\n", " Comm: tcp://192.168.1.31:53626\n", " \n", " Total threads: 1\n", "
\n", " Dashboard: http://192.168.1.31:53634/status\n", " \n", " Memory: 4.00 GiB\n", "
\n", " Nanny: None\n", "
\n", " Local directory: /Users/minrk/dev/ip/parallel/docs/source/examples/dask-worker-space/worker-4ra4jer1\n", "
\n", " Tasks executing: 0\n", " \n", " Tasks in memory: 0\n", "
\n", " Tasks ready: 0\n", " \n", " Tasks in flight: 0\n", "
\n", " CPU usage: 0.0%\n", " \n", " Last seen: Just now\n", "
\n", " Memory usage: 96.50 MiB\n", " \n", " Spilled bytes: 0 B\n", "
\n", " Read bytes: 0.0 B\n", " \n", " Write bytes: 0.0 B\n", "
\n", "
\n", "
\n", "
\n", " \n", "
\n", "
\n", "
\n", "
\n", " \n", "

Worker: tcp://192.168.1.31:53627

\n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", "
\n", " Comm: tcp://192.168.1.31:53627\n", " \n", " Total threads: 1\n", "
\n", " Dashboard: http://192.168.1.31:53635/status\n", " \n", " Memory: 4.00 GiB\n", "
\n", " Nanny: None\n", "
\n", " Local directory: /Users/minrk/dev/ip/parallel/docs/source/examples/dask-worker-space/worker-03ml7c0q\n", "
\n", " Tasks executing: 0\n", " \n", " Tasks in memory: 0\n", "
\n", " Tasks ready: 0\n", " \n", " Tasks in flight: 0\n", "
\n", " CPU usage: 0.0%\n", " \n", " Last seen: Just now\n", "
\n", " Memory usage: 96.43 MiB\n", " \n", " Spilled bytes: 0 B\n", "
\n", " Read bytes: 234.10 kiB\n", " \n", " Write bytes: 234.10 kiB\n", "
\n", "
\n", "
\n", "
\n", " \n", "\n", "
\n", "
\n", "
\n", " \n", "\n", "
\n", "" ], "text/plain": [ "" ] }, "execution_count": 2, "metadata": {}, "output_type": "execute_result" } ], "source": [ "dask_client = rc.become_dask(ncores=1)\n", "dask_client" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This will:\n", "\n", "1. start a Scheduler on the Hub\n", "2. start a Worker on each engine\n", "3. return an Executor, the distributed client API\n", "\n", "By default, distributed Workers will use threads to run on all cores of a machine. \n", "In this case, since I already have one *engine* per core,\n", "I tell distributed to run one core per Worker with `ncores=1`.\n", "\n", "We can now use our IPython cluster with distributed:" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "b9874c7d2ae04c6fa879f14f00135e30", "version_major": 2, "version_minor": 0 }, "text/plain": [ "VBox()" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "from distributed import progress\n", "\n", "\n", "def square(x):\n", " return x ** 2\n", "\n", "\n", "def neg(x):\n", " return -x\n", "\n", "\n", "A = dask_client.map(square, range(1000))\n", "B = dask_client.map(neg, A)\n", "total = dask_client.submit(sum, B)\n", "progress(total)" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "-332833500" ] }, "execution_count": 4, "metadata": {}, "output_type": "execute_result" } ], "source": [ "total.result()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "I could also let distributed do its multithreading thing, and run one multi-threaded Worker per engine.\n", "\n", "First, I need to get a mapping of one engine per host:" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "{0: 'touchy', 1: 'touchy', 2: 'touchy', 3: 'touchy'}" ] }, "execution_count": 5, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import socket\n", "\n", "engine_hosts = rc[:].apply_async(socket.gethostname).get_dict()\n", "engine_hosts" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "I can reverse this mapping, to get a list of engines on each host:" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "{'touchy': [0, 1, 2, 3]}" ] }, "execution_count": 6, "metadata": {}, "output_type": "execute_result" } ], "source": [ "host_engines = {}\n", "for engine_id, host in engine_hosts.items():\n", " if host not in host_engines:\n", " host_engines[host] = []\n", " host_engines[host].append(engine_id)\n", "\n", "host_engines" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now I can get one engine per host:" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[0]" ] }, "execution_count": 7, "metadata": {}, "output_type": "execute_result" } ], "source": [ "one_engine_per_host = [engines[0] for engines in host_engines.values()]\n", "one_engine_per_host" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "*Here's a concise, but more opaque version that does the same thing:*" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[3]" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "one_engine_per_host = list({host: eid for eid, host in engine_hosts.items()}.values())\n", "one_engine_per_host" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "I can now stop the first distributed cluster, and start a new one on just these engines, letting distributed allocate threads:" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "
\n", "
\n", "

Client

\n", "

Client-4f4545ca-0b0d-11ec-b993-784f4385c030

\n", " \n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", "
Connection method: Direct
\n", " Dashboard: http://192.168.1.31:54310/status\n", "
\n", "\n", " \n", "
\n", "

Scheduler Info

\n", "
\n", "
\n", "
\n", "
\n", "

Scheduler

\n", "

Scheduler-ecb384f5-10bb-4d53-9e4c-67db7ff07d03

\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
\n", " Comm: tcp://192.168.1.31:54313\n", " \n", " Workers: 1\n", "
\n", " Dashboard: http://192.168.1.31:54310/status\n", " \n", " Total threads: 1\n", "
\n", " Started: Just now\n", " \n", " Total memory: 4.00 GiB\n", "
\n", "
\n", "
\n", "\n", "
\n", " \n", "

Workers

\n", "
\n", "\n", " \n", "
\n", "
\n", "
\n", "
\n", " \n", "

Worker: tcp://192.168.1.31:54344

\n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", "
\n", " Comm: tcp://192.168.1.31:54344\n", " \n", " Total threads: 1\n", "
\n", " Dashboard: http://192.168.1.31:54345/status\n", " \n", " Memory: 4.00 GiB\n", "
\n", " Nanny: None\n", "
\n", " Local directory: /Users/minrk/dev/ip/parallel/docs/source/examples/dask-worker-space/worker-kmqmwu7_\n", "
\n", " Tasks executing: 0\n", " \n", " Tasks in memory: 0\n", "
\n", " Tasks ready: 0\n", " \n", " Tasks in flight: 0\n", "
\n", " CPU usage: 0.0%\n", " \n", " Last seen: Just now\n", "
\n", " Memory usage: 104.27 MiB\n", " \n", " Spilled bytes: 0 B\n", "
\n", " Read bytes: 1.46 MiB\n", " \n", " Write bytes: 1.46 MiB\n", "
\n", "
\n", "
\n", "
\n", " \n", "\n", "
\n", "
\n", "
\n", " \n", "\n", "
\n", "
" ], "text/plain": [ "" ] }, "execution_count": 9, "metadata": {}, "output_type": "execute_result" } ], "source": [ "rc.stop_distributed()\n", "\n", "dask_client = rc.become_dask(one_engine_per_host)\n", "dask_client" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "And submit the same tasks again:" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "b6eddbe0edfb49e09f289ed49a29e988", "version_major": 2, "version_minor": 0 }, "text/plain": [ "VBox()" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "A = dask_client.map(square, range(100))\n", "B = dask_client.map(neg, A)\n", "total = dask_client.submit(sum, B)\n", "progress(total)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Debugging distributed with IPython" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "
\n", "
\n", "

Client

\n", "

Client-4f8be14c-0b0d-11ec-b993-784f4385c030

\n", " \n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", "
Connection method: Direct
\n", " Dashboard: http://192.168.1.31:54467/status\n", "
\n", "\n", " \n", "
\n", "

Scheduler Info

\n", "
\n", "
\n", "
\n", "
\n", "

Scheduler

\n", "

Scheduler-9de91656-970d-405d-85ce-7e065b96c0c8

\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
\n", " Comm: tcp://192.168.1.31:54478\n", " \n", " Workers: 0\n", "
\n", " Dashboard: http://192.168.1.31:54467/status\n", " \n", " Total threads: 0\n", "
\n", " Started: Just now\n", " \n", " Total memory: 0 B\n", "
\n", "
\n", "
\n", "\n", "
\n", " \n", "

Workers

\n", "
\n", "\n", " \n", "\n", "
\n", "
\n", "
\n", " \n", "\n", "
\n", "
" ], "text/plain": [ "" ] }, "execution_count": 11, "metadata": {}, "output_type": "execute_result" } ], "source": [ "rc.stop_distributed()\n", "\n", "dask_client = rc.become_dask(one_engine_per_host)\n", "dask_client" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's set the %px magics to only run on our one engine per host:" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [], "source": [ "view = rc[one_engine_per_host]\n", "view.block = True\n", "view.activate()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's submit some work that's going to fail somewhere in the middle:" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "524ea13e69ef4fbc80650087f3f9b746", "version_major": 2, "version_minor": 0 }, "text/plain": [ "VBox()" ] }, "metadata": {}, "output_type": "display_data" }, { "ename": "ZeroDivisionError", "evalue": "division by zero", "output_type": "error", "traceback": [ "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", "\u001b[0;31mZeroDivisionError\u001b[0m Traceback (most recent call last)", "\u001b[0;32m/var/folders/qr/3vxfnp1x2t1fw55dr288mphc0000gn/T/ipykernel_14739/3426399645.py\u001b[0m in \u001b[0;36m\u001b[0;34m\u001b[0m\n\u001b[1;32m 16\u001b[0m \u001b[0mtotal\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mdask_client\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msubmit\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0msum\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0minverted\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 17\u001b[0m \u001b[0mdisplay\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mprogress\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mtotal\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 18\u001b[0;31m \u001b[0mtotal\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mresult\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m", "\u001b[0;32m~/conda/lib/python3.9/site-packages/distributed/client.py\u001b[0m in \u001b[0;36mresult\u001b[0;34m(self, timeout)\u001b[0m\n\u001b[1;32m 231\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mstatus\u001b[0m \u001b[0;34m==\u001b[0m \u001b[0;34m\"error\"\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 232\u001b[0m \u001b[0mtyp\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mexc\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mtb\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mresult\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 233\u001b[0;31m \u001b[0;32mraise\u001b[0m \u001b[0mexc\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mwith_traceback\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mtb\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 234\u001b[0m \u001b[0;32melif\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mstatus\u001b[0m \u001b[0;34m==\u001b[0m \u001b[0;34m\"cancelled\"\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 235\u001b[0m \u001b[0;32mraise\u001b[0m \u001b[0mresult\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", "\u001b[0;32m/var/folders/qr/3vxfnp1x2t1fw55dr288mphc0000gn/T/ipykernel_14739/3426399645.py\u001b[0m in \u001b[0;36minverse\u001b[0;34m()\u001b[0m\n\u001b[1;32m 8\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 9\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0minverse\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mx\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 10\u001b[0;31m \u001b[0;32mreturn\u001b[0m \u001b[0;36m1\u001b[0m \u001b[0;34m/\u001b[0m \u001b[0mx\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 11\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 12\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n", "\u001b[0;31mZeroDivisionError\u001b[0m: division by zero" ] } ], "source": [ "from IPython.display import display\n", "from distributed import progress\n", "\n", "\n", "def shift5(x):\n", " return x - 5\n", "\n", "\n", "def inverse(x):\n", " return 1 / x\n", "\n", "\n", "shifted = dask_client.map(shift5, range(1, 10))\n", "inverted = dask_client.map(inverse, shifted)\n", "\n", "total = dask_client.submit(sum, inverted)\n", "display(progress(total))\n", "total.result()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can see which task failed:" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[]" ] }, "execution_count": 14, "metadata": {}, "output_type": "execute_result" } ], "source": [ "[f for f in inverted if f.status == \"error\"]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "When IPython starts a worker on each engine,\n", "it stores it in the `distributed_worker` variable in the engine's namespace.\n", "This lets us query the worker interactively.\n", "\n", "We can check out the current data resident on each worker:" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "\u001b[0;31mOut[3:1]: \u001b[0mBuffer<, deserialize_bytes >>" ] }, "metadata": { "after": [], "completed": "2021-09-01T10:14:10.163710", "data": {}, "engine_id": 3, "engine_uuid": "f3a63abb-5985ad4fd1b9c71f02922bb4", "error": null, "execute_input": "dask_worker.data\n", "execute_result": { "data": { "text/plain": "Buffer<, deserialize_bytes >>" }, "execution_count": 1, "metadata": {} }, "follow": [], "is_broadcast": false, "is_coalescing": false, "msg_id": "867ada26-ed0b46a8d787906a2a56aca7_24", "outputs": [], "received": "2021-09-01T10:14:10.167670", "started": "2021-09-01T10:14:10.138718", "status": "ok", "stderr": "", "stdout": "", "submitted": "2021-09-01T10:14:10.130438" }, "output_type": "display_data" } ], "source": [ "%%px\n", "dask_worker.data" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now that we can poke around with each Worker,\n", "we can have a slightly easier time figuring out what went wrong." ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.6" }, "widgets": { "application/vnd.jupyter.widget-state+json": { "state": {}, "version_major": 2, "version_minor": 0 } } }, "nbformat": 4, "nbformat_minor": 4 }