{ "metadata": { "name": "", "signature": "sha256:4e6942c31bdc43fb673826bcb3816ccaa9ecac04e2be02d84f691bc81e566ff9" }, "nbformat": 3, "nbformat_minor": 0, "worksheets": [ { "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Load-balancing with IPython.parallel" ] }, { "cell_type": "code", "collapsed": false, "input": [ "import os,sys,time\n", "import numpy as np\n", "\n", "from IPython.core.display import display\n", "from IPython import parallel\n", "rc = parallel.Client()\n", "dview = rc[:]" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Create a LoadBalancedView" ] }, { "cell_type": "code", "collapsed": false, "input": [ "lview = rc.load_balanced_view()\n", "lview" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "LoadBalancedViews behave very much like a DirectView on a single engine:\n", "\n", "Each call to `apply()` results in a single remote computation,\n", "and the result (or AsyncResult) of that call is returned directly,\n", "rather than in a list, as in the multi-engine DirectView." ] }, { "cell_type": "code", "collapsed": true, "input": [ "e0 = rc[0]" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "from numpy.linalg import norm\n", "A = np.random.random(1024)\n", "\n", "e0.apply_sync(norm, A, 2)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "lview.apply_sync(norm, A, 2)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "However, unlike the DirectView of a single engine, you are letting the IPython Scheduler decide which engine should do the work:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "e0.apply_sync(os.getpid)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "for i in range(2*len(rc.ids)):\n", " pid = lview.apply_sync(os.getpid)\n", " print \"task %i ran on: %i\" % (i, pid)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Map\n", "\n", "The LoadBalancedView also has a load-balanced version of the builtin `map()`" ] }, { "cell_type": "code", "collapsed": false, "input": [ "lview.block = True\n", "\n", "serial_result = map(lambda x:x**10, range(32))\n", "parallel_result = lview.map(lambda x:x**10, range(32))\n", "\n", "serial_result==parallel_result" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Just like `apply()`, you can use non-blocking map with `block=False` or `map_async`" ] }, { "cell_type": "code", "collapsed": true, "input": [ "amr = lview.map_async(lambda x:x**10, range(32))\n", "amr.msg_ids" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "lview.map??" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "amr = lview.map_async(lambda x:x**10, range(32), chunksize=4)\n", "amr.msg_ids" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Map results are iterable!\n", "\n", "AsyncResults with multiple results are actually iterable before their\n", "results arrive.\n", "\n", "This means that you can perform map/reduce operations on elements as\n", "they come in:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "lview.block = False" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "# scatter 'id', so id=0,1,2 on engines 0,1,2\n", "dv = rc[:]\n", "dv.scatter('id', rc.ids, flatten=True)\n", "print dv['id']\n", "\n", "# create a Reference to `id`. This will be a different value on each engine\n", "ref = parallel.Reference('id')\n", "\n", "tic = time.time()\n", "ar = dv.apply(time.sleep, ref)\n", "for i,r in enumerate(ar):\n", " print \"%i: %.3f\"%(i, time.time()-tic)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "amr = lview.map_async(time.sleep, [1] * 12)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "amr.wait_interactive()" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "amr.wall_time, amr.elapsed" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "amr.serial_time" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "amr.wall_time" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "amr.elapsed" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now we submit a bunch of tasks of increasing magnitude, and\n", "watch where they happen, iterating through the results as they come." ] }, { "cell_type": "code", "collapsed": false, "input": [ "def sleep_here(t):\n", " \"\"\"sleep here for a time, return where it happened\"\"\"\n", " import time\n", " time.sleep(t)\n", " return id\n", "\n", "amr = lview.map(sleep_here, [.01*t for t in range(100)])\n", "tic = time.time()\n", "for i,r in enumerate(amr):\n", " print i,r\n", " print \"task %i on engine %i: %.3f\" % (i, r, time.time()-tic)\n" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "amr.wall_time" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "amr.serial_time" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "amr.serial_time / amr.wall_time" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Unlike `DirectView.map()`, which always results in one task per engine,\n", "LoadBalance map defaults to one task per *item* in the sequence. This\n", "can be changed by specifying the `chunksize` keyword arg." ] }, { "cell_type": "code", "collapsed": false, "input": [ "amr = lview.map(sleep_here, [.01*t for t in range(100)], chunksize=4)\n", "tic = time.time()\n", "for i,r in enumerate(amr):\n", " print \"task %i on engine %i: %.3f\"%(i, r, time.time()-tic)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Excercise\n", "\n", "## Parallelize nested loops\n", "\n", "Often we want to run a function with a variety of combinations of arguments.\n", "A useful skill is the ability to express a nested loop in terms of a map." ] }, { "cell_type": "code", "collapsed": false, "input": [ "def area(w,h):\n", " return w*h\n", "\n", "\n", "widths = range(1,4)\n", "heights = range(6,10)\n", "\n", "areas = []\n", "for w in widths:\n", " for h in heights:\n", " areas.append(area(w,h))\n", "areas" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "%run ../hints\n", "nesthint()" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": true, "input": [ "%load ../soln/nestedloop.py" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "# To parallelize every call with map, you just need to get a list for each argument.\n", "# You can use `itertools.product` + `zip` to get this:\n", "\n", "\n", "import itertools\n", "\n", "product = list(itertools.product(widths, heights))\n", "# [(1, 6), (1, 7), (2, 6), (2, 7), (3, 6), (3, 7)]\n", "\n", "# So we have a \"list of pairs\", \n", "# but what we really want is a single list for each argument, i.e. a \"pair of lists\".\n", "# This is exactly what the slightly weird `zip(*product)` syntax gets us:\n", "\n", "allwidths, allheights = zip(*itertools.product(widths, heights))\n", "\n", "print \" widths\", allwidths\n", "print \"heights\", allheights\n", "\n", "# Now we just map our function onto those two lists, to parallelize nested for loops:\n", "\n", "ar = lview.map_async(area, allwidths, allheights)\n" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Validate the result:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "p_areas = ar.get()\n", "p_areas" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "areas == p_areas" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "heading", "level": 2, "metadata": {}, "source": [ "Examples and Exercises" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "- [Counting Words!](../examples/Counting%20Words.ipynb)\n", "- [Monte Carlo Options Pricing](../examples/MC%20Options.ipynb)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now that we've seen multiplexing and load-balancing, let's see how they are [used together](All%20Together.ipynb)." ] } ], "metadata": {} } ] }