{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Futures in IPython Parallel\n", "\n", "The IPython Parallel AsyncResult object extends `concurrent.futures.Future`,\n", "which makes it compatible with most async frameworks in Python.\n" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Using existing profile dir: '/Users/minrk/.ipython/profile_default'\n", "Starting 4 engines with \n" ] }, { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "107792592f044155ab2d9dec950ca6e7", "version_major": 2, "version_minor": 0 }, "text/plain": [ " 0%| | 0/4 [00:00" ] }, "execution_count": 1, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import ipyparallel as ipp\n", "rc = ipp.Cluster(n=4).start_and_connect_sync()\n", "\n", "dv = rc[:]\n", "dv.activate()\n", "dv" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Do some imports everywhere" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "%%px --local --block\n", "import os\n", "import time\n", "import numpy\n", "from numpy.linalg import norm" ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "tags": [] }, "outputs": [], "source": [ "def random_norm(n):\n", " \"\"\"Generates a 1xN array and computes its 2-norm\"\"\"\n", " A = numpy.random.random(n)\n", " return norm(A, 2)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The basic async API hasn't changed:" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "" ] }, "execution_count": 4, "metadata": {}, "output_type": "execute_result" } ], "source": [ "f = rc[-1].apply(random_norm, 100)\n", "f" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "5.854015134508366" ] }, "execution_count": 5, "metadata": {}, "output_type": "execute_result" } ], "source": [ "f.get()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "But the full Futures API is now available:" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "5.854015134508366" ] }, "execution_count": 6, "metadata": {}, "output_type": "execute_result" } ], "source": [ "f.result()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The standard futures API has methods for registering callbackes, etc." ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "I got PID: 12509" ] }, { "data": { "text/plain": [ "12509" ] }, "execution_count": 7, "metadata": {}, "output_type": "execute_result" }, { "name": "stdout", "output_type": "stream", "text": [ "\n" ] } ], "source": [ "import os\n", "f = rc[-1].apply(os.getpid)\n", "f.add_done_callback(lambda _: print(\"I got PID: %i\" % _.result()))\n", "f.result()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "A more complex example shows us how AsyncResults can be integrated into existing async applications, now that they are Futures:" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ ". [12507, 12506, 12508, 12509]\n", ". . [12507, 12506, 12508, 12509]\n", ". . . [12507, 12506, 12508, 12509]\n", ". . . . [12507, 12506, 12508, 12509]\n" ] } ], "source": [ "import asyncio\n", "from tornado.ioloop import IOLoop\n", "import sys\n", "\n", "def sleep_task(t):\n", " time.sleep(t)\n", " return os.getpid()\n", "\n", "async def background():\n", " \"\"\"A backgorund coroutine to demonstrate that we aren't blocking\"\"\"\n", " while True:\n", " await asyncio.sleep(1)\n", " print('.', end=' ')\n", " sys.stdout.flush() # not needed after ipykernel 4.3\n", "\n", "async def work():\n", " \"\"\"Submit some work and print the results when complete\"\"\"\n", " for t in [ 1, 2, 3, 4 ]:\n", " ar = rc[:].apply(sleep_task, t)\n", " result = await asyncio.wrap_future(ar) # this waits\n", " print(result)\n", "\n", "bg = asyncio.Task(background())\n", "await work()\n", "bg.cancel();\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "So if you have an existing async application using coroutines and/or Futures,\n", "you can now integrate IPython Parallel as a standard async component for submitting work and waiting for its results." ] }, { "cell_type": "markdown", "metadata": { "tags": [] }, "source": [ "## Executors\n", "\n", "Executors are a standard Python API provided by various job-submission tools.\n", "A standard API such as Executor is useful for different libraries to expose this common API for asynchronous execution,\n", "because it means different implementations can be easily swapped out for each other and compared,\n", "or the best one for a given context can be used without having to change the code.\n", "\n", "With IPython Parallel, every View has an `.executor` property, to provide the Executor API for the given View.\n", "Just like Views, the assignment of work for an Executor depends on the View from which it was created.\n", "\n", "You can get an Executor for any View by accessing `View.executor`:" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[0, 1, 2, 3]" ] }, "execution_count": 9, "metadata": {}, "output_type": "execute_result" } ], "source": [ "ex_all = rc[:].executor\n", "ex_all.view.targets" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "12507\n", "12508\n", "12507\n", "12508\n", "12507\n", "12508\n", "12507\n", "12508\n", "12507\n", "12508\n" ] } ], "source": [ "even_lbview = rc.load_balanced_view(targets=rc.ids[::2])\n", "ex_even = even_lbview.executor\n", "for pid in ex_even.map(lambda x: os.getpid(), range(10)):\n", " print(pid)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Typically, though, one will want an Executor for a LoadBalancedView on all the engines.\n", "This is what the top-level `Client.executor()` method will return:" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "" ] }, "execution_count": 11, "metadata": {}, "output_type": "execute_result" } ], "source": [ "ex = rc.executor()\n", "ex.view" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's create a few compatible Executor instances" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Comparing Executors\n", "\n", "Let's make a few Executors. Aside: [dask.distributed][] is a great library. Any IPython Parallel cluster can be bootstrapped into a dask cluster.\n", "\n", "[dask.distributed]: https://distributed.readthedocs.io\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "There *can* be serialization differences, especially for interactively defined functions (i.e. those in defined in a notebook itself).\n", "That's why we define our task function in a local module,\n", "rather than here. ProcessPoolExecutor doesn't serialize interactively defined functions.\n", "But for the most part working with functions defined in modules works consistently across implementations." ] }, { "cell_type": "code", "execution_count": 35, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "\u001b[0;32mimport\u001b[0m \u001b[0mnumpy\u001b[0m\u001b[0;34m\u001b[0m\n", "\u001b[0;34m\u001b[0m\u001b[0;32mfrom\u001b[0m \u001b[0mnumpy\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mlinalg\u001b[0m \u001b[0;32mimport\u001b[0m \u001b[0mnorm\u001b[0m\u001b[0;34m\u001b[0m\n", "\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\n", "\u001b[0;34m\u001b[0m\u001b[0;32mdef\u001b[0m \u001b[0mtask\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mn\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\n", "\u001b[0;34m\u001b[0m \u001b[0;34m\"\"\"Generates a 1xN array and computes its 2-norm\"\"\"\u001b[0m\u001b[0;34m\u001b[0m\n", "\u001b[0;34m\u001b[0m \u001b[0mA\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mnumpy\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mones\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mn\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\n", "\u001b[0;34m\u001b[0m \u001b[0;32mreturn\u001b[0m \u001b[0mnorm\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mA\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;36m2\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "%pycat task_mod.py\n", "from task_mod import task" ] }, { "cell_type": "code", "execution_count": 11, "metadata": { "tags": [] }, "outputs": [], "source": [ "def task(n):\n", " \"\"\"Generates a 1xN array and computes its 2-norm\"\"\"\n", " import numpy\n", " from numpy.linalg import norm\n", " A = numpy.ones(n)\n", " return norm(A, 2)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor\n", "import distributed\n", "\n", "distributed_client = rc.become_dask()\n", "dist_ex = distributed_client.get_executor()\n", "\n", "N = 4\n", "ip_ex = rc.executor(targets=range(N))\n", "thread_ex = ThreadPoolExecutor(N)\n", "process_ex = ProcessPoolExecutor(N)\n" ] }, { "cell_type": "code", "execution_count": 36, "metadata": { "tags": [] }, "outputs": [], "source": [ "executors = [process_ex, thread_ex, ip_ex, dist_ex]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can submit the same work with the same API,\n", "using four different mechanisms for distributing work.\n", "The results will be the same:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "for executor in executors:\n", " print(executor.__class__.__name__)\n", " it = executor.map(str, range(5))\n", " print(list(it))\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This makes it easy to compare the different implementations. We are going to submit some dummy work—allocate and compute 2-norms of arrays of various sizes." ] }, { "cell_type": "code", "execution_count": 40, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "array([ 1048576, 1261463, 1517571, 1825676, 2196334, 2642245,\n", " 3178688, 3824041, 4600417, 5534417, 6658042, 8009791,\n", " 9635980, 11592325, 13945857, 16777216])" ] }, "execution_count": 40, "metadata": {}, "output_type": "execute_result" } ], "source": [ "sizes = np.logspace(20, 24, 16, base=2, dtype=int)\n", "sizes" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Run the work locally, to get a reference:" ] }, { "cell_type": "code", "execution_count": 41, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Local time:\n", "CPU times: user 765 ms, sys: 403 ms, total: 1.17 s\n", "Wall time: 874 ms\n" ] } ], "source": [ "print(\"Local time:\")\n", "%time ref = list(map(task, sizes))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "And then run again with the various Executors:" ] }, { "cell_type": "code", "execution_count": 42, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "ProcessPoolExecutor\n", "246 ms ± 86 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)\n", "ThreadPoolExecutor\n", "182 ms ± 32.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)\n", "ViewExecutor\n", "228 ms ± 24.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)\n", "ClientExecutor\n", "246 ms ± 27.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)\n" ] } ], "source": [ "for executor in executors:\n", " print(executor.__class__.__name__)\n", " result = executor.map(task, sizes)\n", " rlist = list(result)\n", " assert rlist == ref, \"%s != %s\" % (rlist, ref)\n", " # time the task assignment\n", " %timeit list(executor.map(task, sizes))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "For this toy work, the stdlib ThreadPoolExecutor appears to perform the best.\n", "That's useful info, and likely to be true for most workloads that release the GIL and fit comfortably into memory.\n", "When the GIL is involved, ProcessPoolExecutor is often best for simple workloads.\n", "\n", "One benefit of IPython Parallel or Distributed Executors over the stdlib Executors is that they do not have to be confined to a single machine.\n", "This means the standard Executor API lets you develop small-scale parallel tools that run locally in threads or processes,\n", "and then extend the *exact same code* to make use of multiple machines,\n", "just by selecting a different Executor.\n", "\n", "That seems pretty useful. [joblib][] is another package to implement standardized APIs for parallel backends,\n", "which IPython Parallel [also supports](joblib.ipynb).\n", "\n", "[joblib]: https://joblib.readthedocs.io" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.6" }, "widgets": { "application/vnd.jupyter.widget-state+json": { "state": { "107792592f044155ab2d9dec950ca6e7": { "model_module": "@jupyter-widgets/controls", "model_module_version": "1.5.0", "model_name": "HBoxModel", "state": { "children": [ "IPY_MODEL_2d6218bb526d44f0acda723f0c6dc861", "IPY_MODEL_acda3a9b8ac74e12bfcc6408aa8b11ad", "IPY_MODEL_30dc8b7299de4c6f869fa41aebba7f45" ], "layout": "IPY_MODEL_412b13dee37849cd97baf8ab5f0552f3" } }, "2d6218bb526d44f0acda723f0c6dc861": { "model_module": "@jupyter-widgets/controls", "model_module_version": "1.5.0", "model_name": "HTMLModel", "state": { "layout": "IPY_MODEL_a17b92b8857c4bd5b883c5615021c713", "style": "IPY_MODEL_fd97190cd7094a429ec44137db60db39", "value": "100%" } }, "30dc8b7299de4c6f869fa41aebba7f45": { "model_module": "@jupyter-widgets/controls", "model_module_version": "1.5.0", "model_name": "HTMLModel", "state": { "layout": "IPY_MODEL_8783cf96737d4e03a7dd05208445afe1", "style": "IPY_MODEL_398a5bd036a643fc9be7e97200b091e3", "value": " 4/4 [00:04<00:00, 4.75s/engine]" } }, "398a5bd036a643fc9be7e97200b091e3": { "model_module": "@jupyter-widgets/controls", "model_module_version": "1.5.0", "model_name": "DescriptionStyleModel", "state": { "description_width": "" } }, "412b13dee37849cd97baf8ab5f0552f3": { "model_module": "@jupyter-widgets/base", "model_module_version": "1.2.0", "model_name": "LayoutModel", "state": {} }, "8783cf96737d4e03a7dd05208445afe1": { "model_module": "@jupyter-widgets/base", "model_module_version": "1.2.0", "model_name": "LayoutModel", "state": {} }, "97f0bee4e8c64f608146df6c4b9e30c0": { "model_module": "@jupyter-widgets/controls", "model_module_version": "1.5.0", "model_name": "ProgressStyleModel", "state": { "description_width": "" } }, "a17b92b8857c4bd5b883c5615021c713": { "model_module": "@jupyter-widgets/base", "model_module_version": "1.2.0", "model_name": "LayoutModel", "state": {} }, "acda3a9b8ac74e12bfcc6408aa8b11ad": { "model_module": "@jupyter-widgets/controls", "model_module_version": "1.5.0", "model_name": "FloatProgressModel", "state": { "bar_style": "success", "layout": "IPY_MODEL_c80f0d14fe6542e0b25385efb22e54e5", "max": 4, "style": "IPY_MODEL_97f0bee4e8c64f608146df6c4b9e30c0", "value": 4 } }, "c80f0d14fe6542e0b25385efb22e54e5": { "model_module": "@jupyter-widgets/base", "model_module_version": "1.2.0", "model_name": "LayoutModel", "state": {} }, "fd97190cd7094a429ec44137db60db39": { "model_module": "@jupyter-widgets/controls", "model_module_version": "1.5.0", "model_name": "DescriptionStyleModel", "state": { "description_width": "" } } }, "version_major": 2, "version_minor": 0 } } }, "nbformat": 4, "nbformat_minor": 4 }