{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# N-Way merge with remote data" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "How to merge multiple *sorted* remote data streams using the `heapq.merge` function that ships with Python." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "from __future__ import print_function\n", "import heapq\n", "\n", "from IPython.display import display\n", "from IPython import parallel\n", "\n", "rc = parallel.Client()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Imagine we have some routine that is capable of loading/creating a sorted subset of our data in an engine, based on a parameter (such as the indes of which part of the data to read):" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "def load_data(arg):\n", " \"\"\"Load a dataset in the global namespace. The dataset *must* be sorted.\n", "\n", " Return the *name* of the variable in which the dataset was loaded.\"\"\"\n", " global data\n", " # Here, real data loading would occur\n", " s = 4-arg\n", " step = arg+1\n", " data = range(s, s+4*step**2, step)\n", " return 'data'" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Exercise" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We want a function that takes a given single-engine View and a variable name,\n", "and returns a local iterator on the remote object.\n", "It should look something like this skeleton function:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "def remote_iterator(view, name):\n", " \"\"\"Return an iterator on an object living on a remote engine.\"\"\"\n", " # TODO: create an iterator remotely\n", " while True:\n", " pass\n", " # TODO: yield the next item\n", " # TODO: turn remote StopIteration into local StopIteration" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Relevant Aside:\n", "\n", "Errors raised on engines will show up in the Client as a RemoteError.\n", "This means you have to be a little careful when trying to catch remote errors:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "try:\n", " rc[-1].execute(\"foo = barbarbar\", block=True)\n", "except NameError:\n", " print(\"caught NameError\")\n", "except Exception as e:\n", " print(\"Oops! Didn't catch %r\" % e)\n", " raise e\n", "print(\"safe and sound\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "A RemoteError has three attributes:\n", "\n", "* `err.ename` - the class name of the remote error (e.g. `NameError`, `ValueError`)\n", "* `err.evalue` - the string value of the error message\n", "* `err.traceback` - the remote traceback as a list of strings" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "For simple builtin exceptions,\n", "you can re-raise remote errors as the original exception class with a case like the following:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "def assign_foo():\n", " try:\n", " rc[-1].execute(\"foo = barbarbar\", block=True)\n", " except parallel.RemoteError as e:\n", " if e.ename == 'NameError':\n", " raise NameError(e.evalue)\n", " else:\n", " raise e" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "By doing this re-cast, any exception handling outside will handle remote exceptions as if they were local." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "try:\n", " assign_foo()\n", "except NameError:\n", " print(\"caught NameError\")\n", "except Exception as e:\n", " print(\"Oops! Didn't catch %r\" % e)\n", " raise e\n", "print(\"safe and sound\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Can you fill out this remote_iterator function?\n", "\n", "Potentially useful:\n", "\n", "* catching RemoteErrors\n", "* parallel.Reference\n", "* yield" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "def remote_iterator(view, name):\n", " \"\"\"Return an iterator on an object living on a remote engine.\"\"\"\n", " # TODO: create an iterator remotely\n", " while True:\n", " pass\n", " # TODO: yield the next item\n", " # TODO: turn remote StopIteration into local StopIteration" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "A local example that should be a good guideline for the remote version:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "%load soln/remote_iter_hint.py" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "And the solution:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "%load soln/remote_iter.py" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "And an ever-so-slightly fancier solution:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "%load soln/remote_iter_slightly_better.py" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now, we bring `IPython.parallel` into action:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "dview = rc.direct_view()\n", "print('Engine IDs:', rc.ids)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "# Load the data on the engines\n", "data_refs = dview.map(load_data, rc.ids)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "data_refs" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "list(data_refs)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "# And we now make a local object which represents the remote iterator\n", "iterators = [remote_iterator(rc[e], ref) for e,ref in zip(rc.ids, data_refs)]\n", "for it in iterators:\n", " print(list(it))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now, let's merge those datasets into a single sorted one:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "print('Locally merge the remote sets:')\n", "iterators = [remote_iterator(rc[e], ref) for e,ref in zip(rc.ids, data_refs)] \n", "remote = list(heapq.merge(*iterators))\n", "print(remote)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Validation" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "repeat the operation by copying the data from the engines to our local namespace and doing a regular merge here:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "# Key step here: pull data from each engine:\n", "local_data = [rc[e][ref] for e,ref in zip(rc.ids, data_refs)]\n", "print('Local data:')\n", "for subset in local_data:\n", " print(subset)\n", "print('Sorted:')\n", "local = list(heapq.merge(*local_data))\n", "print(local)\n", "print(\"local == remote: %s\" % (local==remote))" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [] } ], "metadata": {}, "nbformat": 4, "nbformat_minor": 0 }