{ "metadata": { "name": "", "signature": "sha256:da03414f2ab592310b23f6a557936f90bea89e0e61ca0e32b137abdb2154348a" }, "nbformat": 3, "nbformat_minor": 0, "worksheets": [ { "cells": [ { "cell_type": "heading", "level": 1, "metadata": {}, "source": [ "More efficient data movement with MPI" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Just like [we did](memmap.ipynb) manually with memmap,\n", "you can move data more efficiently with MPI by sending it to just one engine,\n", "and using MPI to broadcast it to the rest of the engines.\n" ] }, { "cell_type": "code", "collapsed": false, "input": [ "import socket\n", "import os, sys, re\n", "\n", "import numpy as np\n", "\n", "from IPython import parallel" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "For this demo, I will connect to a cluster with engines started with MPI.\n", "If you have MPI and mpi4py on your machine, you can start a local cluster with MPI with:\n", "\n", " ipcluster start -n 8 --engines=MPI --profile mpi" ] }, { "cell_type": "code", "collapsed": false, "input": [ "mpi_profile = 'dirac'\n", "rc = parallel.Client(profile=mpi_profile)\n", "eall = rc[:]\n", "root = rc[-1]" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "%px from mpi4py.MPI import COMM_WORLD as MPI" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "mpi_ranks = eall.apply_async(lambda : MPI.Get_rank()).get_dict()\n", "root_rank = root.apply_sync(lambda : MPI.Get_rank())\n", "mpi_ranks" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "sz = 256\n", "data = np.random.random((sz, sz))\n", "data = data.dot(data.T)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "%%time \n", "ar = eall.push({'data': data}, block=False)\n", "ar.wait_interactive()" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "@parallel.interactive\n", "def _bcast(key, root_rank):\n", " \"\"\"function to run on engines as part of broadcast\"\"\"\n", " g = globals()\n", " obj = g.get(key, None)\n", " obj = MPI.bcast(obj, root_rank)\n", " g[key] = obj\n", "\n", "def broadcast(key, obj, dv, root, root_rank):\n", " \"\"\"More efficient broadcast by doing push to root,\n", " and MPI broadcast to other engines.\n", " \n", " Still O(N) messages, but all but one message is always small.\n", " \"\"\"\n", " root.push({key : obj}, block=False)\n", " return dv.apply_async(_bcast, key, root_rank)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "%%time\n", "ar = broadcast('data', data, eall, root, root_rank)\n", "ar.wait_interactive()" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "eall.apply_sync(np.linalg.norm, parallel.Reference('data'), 2)" ], "language": "python", "metadata": {}, "outputs": [] } ], "metadata": {} } ] }