{ "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.5.0" }, "name": "" }, "nbformat": 3, "nbformat_minor": 0, "worksheets": [ { "cells": [ { "cell_type": "heading", "level": 1, "metadata": {}, "source": [ "Distributed Model Selection and Assessment" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Outline of the session:\n", "\n", "- Introduction to **IPython.parallel**\n", "- Sharing Data Between Processes with **Memory Mapping**\n", "- **Parallel Grid Search** and Model Selection\n", "- **Parallel** Computation of **Learning Curves** (TODO)\n", "- **Distributed** Computation on **EC2 Spot Instances with StarCluster**" ] }, { "cell_type": "heading", "level": 2, "metadata": {}, "source": [ "Motivation" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "When doing model evaluations and parameters tuning, many models must be trained independently on the same data. This is an embarrassingly parallel problem but having a copy of the dataset in memory for each process is waste of RAM:\n", "\n", "\n", "\n", "\n", "When doing 3 folds cross validation on a 9 parameters grid, a naive implementation could read the data from the disk and load it in memory 27 times. If this happens concurrently (e.g. on a compute node with 32 cores) the RAM might blow up hence breaking the potential linear speed up." ] }, { "cell_type": "heading", "level": 2, "metadata": {}, "source": [ "IPython.parallel, a Primer" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This section gives a primer on some tools best utilizing computational resources when doing predictive modeling in the Python / NumPy ecosystem namely:\n", "\n", "- optimal usage of available CPUs and cluster nodes with **`IPython.parallel`**\n", "\n", "- optimal memory re-use using shared memory between Python processes using **`numpy.memmap`** and **`joblib`**\n", "\n", "### What is so great about `IPython.parallel`:\n", "\n", "- Single node multi-CPUs\n", "- Multiple node multi-CPUs\n", "- Interactive In-memory computing\n", "- IPython notebook integration with `%px` and `%%px` magics\n", "- Possibility to interactively connect to individual computing processes to launch interactive debugger (`#priceless`)\n", "\n", "### Let's get started:\n", "\n", "Let start an IPython cluster using the `ipcluster` common (usually run from your operating system console). To make sure that we are not running several clusters on the same host, let's try to shut down any running IPython cluster first:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "!ipcluster stop" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "!ipcluster start -n=2 --daemon" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Go to the \"Cluster\" tab of the notebook and **start a local cluster with 2 engines**. Then come back here. We should now be able to use our cluster from our notebook session (or any other Python process running on localhost):" ] }, { "cell_type": "code", "collapsed": false, "input": [ "from IPython.parallel import Client\n", "client = Client()" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "len(client)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### The %px and %%px magics\n", "\n", "All the engines of the client can be accessed imperatively using the `%px` and `%%px` IPython cell magics:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "%%px\n", "\n", "import os\n", "import socket\n", "\n", "print(\"This is running in process with pid {0} on host '{1}'.\".format(\n", " os.getpid(), socket.gethostname()))" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The content of the `__main__` namespace can also be read and written via the `%px` magic:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "%px a = 1" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "%px print(a)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "%%px\n", "\n", "a *= 2\n", "print(a)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "It is possible to restrict the `%px` and `%%px` magic instructions to specific engines:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "%%px --targets=-1\n", "a *= 2\n", "print(a)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "%px print(a)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### The DirectView objects\n", "\n", "Cell magics are very nice to work interactively from the notebook but it's also possible to replicate their behavior programmatically with more flexibility with a `DirectView` instance. A `DirectView` can be created by slicing the client object:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "all_engines = client[:]\n", "all_engines" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The namespace of the `__main__` module of each running python engine can be accessed in read and write mode as a python dictionary:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "all_engines['a'] = 1" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "all_engines['a']" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Direct views can also execute the same code in parallel on each engine of the view:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "def my_sum(a, b):\n", " return a + b\n", "\n", "my_sum_apply_results = all_engines.apply(my_sum, 11, 31)\n", "my_sum_apply_results" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The ouput of the `apply` method is an asynchronous handle returned immediately without waiting for the end of the computation. To block until the results are ready use:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "my_sum_apply_results.get()" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Here is a more useful example to fetch the network hostname of each engine in the cluster. Let's study it in more details:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "def hostname():\n", " \"\"\"Return the name of the host where the function is being called\"\"\"\n", " import socket\n", " return socket.gethostname()\n", "\n", "hostname_apply_result = all_engines.apply(hostname)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "When doing the above, the `hostname` function is first defined locally (the client python process). The `DirectView.apply` method introspects it, serializes its name and bytecode and ships it to each engine of the cluster where it is reconstructed as local function on each engine. This function is then called on each engine of the view with the optionally provided arguments.\n", "\n", "In return, the client gets a python object that serves as an handle to asynchronously fetch the list of the results of the calls:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "hostname_apply_result" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "hostname_apply_result.get()" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "It is also possible to key the results explicitly with the engine ids with the `AsyncResult.get_dict` method. This is a very simple idiom to fetch metadata on the runtime environment of each engine of the direct view:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "hostnames = hostname_apply_result.get_dict()\n", "hostnames" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "It can be handy to invert this mapping to find one engine id per host in the cluster so as to execute host specific operation:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "one_engine_by_host = dict((hostname, engine_id) for engine_id, hostname\n", " in hostnames.items())\n", "one_engine_by_host" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "one_engine_by_host_ids = list(one_engine_by_host.values())\n", "one_engine_by_host_ids" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "one_engine_per_host_view = client[one_engine_by_host_ids]\n", "one_engine_per_host_view" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Trick:** you can even use those engines ids to execute shell commands in parallel on each host of the cluster:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "one_engine_by_host.values()" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "%%px --targets=[1]\n", "\n", "!pip install flask" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Note on Importing Modules on Remote Engines\n", "\n", "In the previous example we put the `import socket` statement inside the body of the `hostname` function to make sure to make sure that is is available when the rest of the function is executed in the python processes of the remote engines.\n", "\n", "Alternatively it is possible to import the required modules ahead of time on all the engines of a directview using a context manager / with syntax:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "with all_engines.sync_imports():\n", " import numpy" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "However this method does **not** support alternative import syntaxes:\n", " \n", " >>> import numpy as np\n", " >>> from numpy import linalg" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Hence the method of importing in the body of the \"applied\" functions is more flexible. Additionally, this does not pollute the `__main__` namespace of the engines as it only impact the local namespace of the function itself." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Exercise**:\n", "\n", "- Write a function that returns the memory usage of each engine process in the cluster.\n", "- Allocate a largish numpy array of zeros of known size (e.g. 100MB) on each engine of the cluster.\n", "\n", "Hints:\n", "\n", "Use the `psutil` module to collect the runtime info on a specific process or host. For instance to fetch the memory usage of the currently running process in MB:\n", "\n", " >>> import os\n", " >>> import psutil\n", " >>> psutil.Process(os.getpid()).memory_info().rss / 1e6\n", "\n", "To allocate a numpy array with 1000 zeros stored as 64bit floats you can use:\n", "\n", " >>> import numpy as np\n", " >>> z = np.zeros(1000, dtype=np.float64)\n", "\n", "The size in bytes of such a numpy array can then be fetched with ``z.nbytes``:\n", " \n", " >>> z.nbytes / 1e6\n", " 0.008" ] }, { "cell_type": "code", "collapsed": false, "input": [ "def get_engines_memory(client):\n", " def memory_mb():\n", " import os, psutil\n", " return psutil.Process(os.getpid()).memory_info().rss / 1e6\n", " \n", " return client[:].apply(memory_mb).get_dict()" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "get_engines_memory(client)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "sum(get_engines_memory(client).values())" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "%%px\n", "import numpy as np\n", "z = np.zeros(int(1e7), dtype=np.float64)\n", "print(\"Allocated {0}MB on engine.\".format(z.nbytes / 1e6))" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "get_engines_memory(client)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "heading", "level": 4, "metadata": {}, "source": [ "Load Balanced View" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "`LoadBalancedView` is an alternative to the `DirectView` to run one function call at a time on a free engine." ] }, { "cell_type": "code", "collapsed": false, "input": [ "lv = client.load_balanced_view()" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "def slow_square(x):\n", " import time\n", " time.sleep(2)\n", " return x ** 2" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "result = lv.apply(slow_square, 4)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "result" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "result.ready()" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "result.get() # blocking call" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "It is possible to spread some tasks among the engines of the LB view by passing a callable and an iterable of task arguments to the `LoadBalancedView.map` method:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "results = lv.map(slow_square, [0, 1, 2, 3])\n", "results" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "results.ready()" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "results.progress" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "# results.abort()" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "# Iteration on AsyncMapResult is blocking\n", "for r in results:\n", " print(r)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The load balanced view will be used in the following to schedule work on the cluster while being able to monitor progress and occasionally add new computing nodes to the cluster while computing to speed up the processing when using EC2 and StarCluster (see later)." ] }, { "cell_type": "heading", "level": 2, "metadata": {}, "source": [ "Sharing Read-only Data between Processes on the Same Host with Memmapping" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's **restart the cluster** to kill the existing python processes and restart with a new client instances to be able to monitor the memory usage in details:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "!ipcluster stop" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "!ipcluster start -n=2 --daemon" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "from IPython.parallel import Client\n", "client = Client()\n", "len(client)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The numpy package makes it possible to memory map large contiguous chunks of binary files as shared memory for all the Python processes running on a given host:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "%px import numpy as np" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Creating a `numpy.memmap` instance with the `w+` mode creates a file on the filesystem and zeros its content. Let's do it from the first engine process or our current IPython cluster:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "%%px --targets=-1\n", "\n", "# Cleanup any existing file from past session (necessary for windows)\n", "import os\n", "if os.path.exists('small.mmap'):\n", " os.unlink('small.mmap')\n", "\n", "mm_w = np.memmap('small.mmap', shape=10, dtype=np.float32, mode='w+')\n", "print(mm_w)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Assuming the notebook process was launched with:\n", "\n", " cd notebooks\n", " ipython notebook\n", "\n", "and the cluster was launched from the ipython notebook UI, the engines will have a the same current working directory as the notebook process, hence we can find the `small.mmap` file the current folder:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "ls -lh small.mmap" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This binary file can then be mapped as a new numpy array by all the engines having access to the same filesystem. The `mode='r+'` opens this shared memory area in read write mode:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "%%px\n", "\n", "mm_r = np.memmap('small.mmap', dtype=np.float32, mode='r+')\n", "print(mm_r)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "%%px --targets=-1\n", "\n", "mm_w[0] = 42\n", "print(mm_w)\n", "print(mm_r)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "%px print(mm_r)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Memory mapped arrays created with `mode='r+'` can be modified and the modifications are shared with all the engines:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "%%px --targets=1\n", "\n", "mm_r[1] = 43" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "%%px\n", "print(mm_r)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Be careful those, there is no builtin read nor write lock available on this such datastructures so it's better to avoid concurrent read & write operations on the same array segments unless there engine operations are made to cooperate with some synchronization or scheduling orchestrator." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Memmap arrays generally behave very much like regular in-memory numpy arrays:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "%%px\n", "print(\"sum={0:.3f}, mean={1:.3f}, std={2:.3f}\".format(\n", " float(mm_r.sum()), float(np.mean(mm_r)), float(np.std(mm_r))))" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Before allocating more data in memory on the cluster let us define a couple of utility functions from the previous exercise (and more) to monitor what is used by which engine and what is still free on the cluster as a whole:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "def get_engines_memory(client):\n", " \"\"\"Gather the memory allocated by each engine in MB\"\"\"\n", " def memory_mb():\n", " import os\n", " import psutil\n", " return psutil.Process(os.getpid()).memory_info().rss / 1e6\n", " \n", " return client[:].apply(memory_mb).get_dict()\n", "\n", "def get_host_free_memory(client):\n", " \"\"\"Free memory on each host of the cluster in MB.\"\"\"\n", " all_engines = client[:]\n", " def hostname():\n", " import socket\n", " return socket.gethostname()\n", " \n", " hostnames = all_engines.apply(hostname).get_dict()\n", " one_engine_per_host = dict((hostname, engine_id)\n", " for engine_id, hostname\n", " in hostnames.items())\n", "\n", " def host_free_memory():\n", " import psutil\n", " return psutil.virtual_memory().free / 1e6\n", " \n", " \n", " one_engine_per_host_ids = list(one_engine_per_host.values())\n", " host_mem = client[one_engine_per_host_ids].apply(\n", " host_free_memory).get_dict()\n", " \n", " return dict((hostnames[eid], m) for eid, m in host_mem.items())" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "get_engines_memory(client)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "get_host_free_memory(client)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's allocate a 80MB memmap array in the first engine and load it in readwrite mode in all the engines:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "%%px --targets=-1\n", "\n", "# Cleanup any existing file from past session (necessary for windows)\n", "import os\n", "if os.path.exists('big.mmap'):\n", " os.unlink('big.mmap')\n", "\n", "np.memmap('big.mmap', shape=10 * int(1e6), dtype=np.float64, mode='w+')" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "ls -lh big.mmap" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "get_host_free_memory(client)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "No significant memory was used in this operation as we just asked the OS to allocate the buffer on the hard drive and just maitain a virtual memory area as a cheap reference to this buffer.\n", "\n", "Let's open new references to the same buffer from all the engines at once:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "%px %time big_mmap = np.memmap('big.mmap', dtype=np.float64, mode='r+')" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "%px big_mmap" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "get_host_free_memory(client)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "No physical memory was allocated in the operation as it just took a couple of ms to do so. This is also confirmed by the engines process stats:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "get_engines_memory(client)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's trigger an actual load of the data from the drive into the in-memory disk cache of the OS, this can take some time depending on the speed of the hard drive (on the order of 100MB/s to 300MB/s hence 3s to 8s for this dataset):" ] }, { "cell_type": "code", "collapsed": false, "input": [ "%%px --targets=-1\n", "\n", "%time np.sum(big_mmap)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "get_engines_memory(client)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "get_host_free_memory(client)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can see that the first engine has now access to the data in memory and the free memory on the host has decreased by the same amount.\n", "\n", "We can now access this data from all the engines at once much faster as the disk will no longer be used: the shared memory buffer will instead accessed directly by all the engines:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "%px %time np.sum(big_mmap)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "get_engines_memory(client)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "get_host_free_memory(client)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "So it seems that the engines have loaded a whole copy of the data but this actually not the case as the total amount of free memory was not impacted by the parallel access to the shared buffer. Furthermore, once the data has been preloaded from the hard drive using one process, all the of the other processes on the same host can access it almost instantly saving a lot of IO wait.\n", "\n", "This strategy makes it very interesting to load the readonly datasets of machine learning problems, especially when the same data is reused over and over by concurrent processes as can be the case when doing learning curves analysis or grid search." ] }, { "cell_type": "heading", "level": 2, "metadata": {}, "source": [ "Memmaping Nested Numpy-based Data Structures with Joblib" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "joblib is a utility library included in the sklearn package. Among other things it provides tools to serialize objects that comprise large numpy arrays and reload them as memmap backed datastructures.\n", "\n", "To demonstrate it, let's create an arbitrary python datastructure involving numpy arrays:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "import numpy as np\n", "\n", "class MyDataStructure(object):\n", " \n", " def __init__(self, shape):\n", " self.float_zeros = np.zeros(shape, dtype=np.float32)\n", " self.integer_ones = np.ones(shape, dtype=np.int64)\n", " \n", "data_structure = MyDataStructure((3, 4))\n", "data_structure.float_zeros, data_structure.integer_ones" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can now persist this datastructure to disk:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "from sklearn.externals import joblib\n", "\n", "joblib.dump(data_structure, 'data_structure.pkl')" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "!ls -l data_structure*" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "A memmapped copy of this datastructure can then be loaded:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "memmaped_data_structure = joblib.load('data_structure.pkl', mmap_mode='r+')\n", "memmaped_data_structure.float_zeros, memmaped_data_structure.integer_ones" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "heading", "level": 2, "metadata": {}, "source": [ "Memmaping CV Splits for Multiprocess Dataset Sharing" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can leverage the previous tools to build a utility function that extracts Cross Validation splits ahead of time to persist them on the hard drive in a format suitable for memmaping by IPython engine processes." ] }, { "cell_type": "code", "collapsed": false, "input": [ "from sklearn.externals import joblib\n", "from sklearn.cross_validation import ShuffleSplit\n", "import os\n", "\n", "def persist_cv_splits(X, y, n_cv_iter=5, name='data',\n", " suffix=\"_cv_%03d.pkl\", test_size=0.25, random_state=None):\n", " \"\"\"Materialize randomized train test splits of a dataset.\"\"\"\n", "\n", " cv = ShuffleSplit(X.shape[0], n_iter=n_cv_iter,\n", " test_size=test_size, random_state=random_state)\n", " cv_split_filenames = []\n", " \n", " for i, (train, test) in enumerate(cv):\n", " cv_fold = (X[train], y[train], X[test], y[test])\n", " cv_split_filename = name + suffix % i\n", " cv_split_filename = os.path.abspath(cv_split_filename)\n", " joblib.dump(cv_fold, cv_split_filename)\n", " cv_split_filenames.append(cv_split_filename)\n", " \n", " return cv_split_filenames" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's try it on the digits dataset, we can run this from the :" ] }, { "cell_type": "code", "collapsed": false, "input": [ "from sklearn.datasets import load_digits\n", "\n", "digits = load_digits()\n", "digits_split_filenames = persist_cv_splits(digits.data, digits.target,\n", " name='digits', random_state=42)\n", "digits_split_filenames" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "ls -lh digits*" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Each of the persisted CV splits can then be loaded back again using memmaping:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "X_train, y_train, X_test, y_test = joblib.load(\n", " 'digits_cv_002.pkl', mmap_mode='r+')" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "X_train" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "y_train" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "heading", "level": 2, "metadata": {}, "source": [ "Parallel Model Selection and Grid Search" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's leverage IPython.parallel and the Memory Mapping features of joblib to write a custom grid search utility that runs on cluster in a memory efficient manner.\n", "\n", "Assume that we want to reproduce the grid search from the previous session:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "import numpy as np\n", "from pprint import pprint\n", "\n", "svc_params = {\n", " 'C': np.logspace(-1, 2, 4),\n", " 'gamma': np.logspace(-4, 0, 5),\n", "}\n", "pprint(svc_params)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "`GridSearchCV` internally uses the following `ParameterGrid` utility iterator class to build the possible combinations of parameters:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "from sklearn.grid_search import ParameterGrid\n", "\n", "list(ParameterGrid(svc_params))" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's write a function to load the data from a CV split file and compute the validation score for a given parameter set and model:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "def compute_evaluation(cv_split_filename, model, params):\n", " \"\"\"Function executed by a worker to evaluate a model on a CV split\"\"\"\n", " # All module imports should be executed in the worker namespace\n", " from sklearn.externals import joblib\n", "\n", " X_train, y_train, X_validation, y_validation = joblib.load(\n", " cv_split_filename, mmap_mode='c')\n", " \n", " model.set_params(**params)\n", " model.fit(X_train, y_train)\n", " validation_score = model.score(X_validation, y_validation)\n", " return validation_score" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "def grid_search(lb_view, model, cv_split_filenames, param_grid):\n", " \"\"\"Launch all grid search evaluation tasks.\"\"\"\n", " all_tasks = []\n", " all_parameters = list(ParameterGrid(param_grid))\n", " \n", " for i, params in enumerate(all_parameters):\n", " task_for_params = []\n", " \n", " for j, cv_split_filename in enumerate(cv_split_filenames): \n", " t = lb_view.apply(\n", " compute_evaluation, cv_split_filename, model, params)\n", " task_for_params.append(t) \n", " \n", " all_tasks.append(task_for_params)\n", " \n", " return all_parameters, all_tasks" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's try on the digits dataset that we splitted previously as memmapable files:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "from sklearn.svm import SVC\n", "from IPython.parallel import Client\n", "\n", "client = Client()\n", "lb_view = client.load_balanced_view()\n", "model = SVC()\n", "svc_params = {\n", " 'C': np.logspace(-1, 2, 4),\n", " 'gamma': np.logspace(-4, 0, 5),\n", "}\n", "\n", "all_parameters, all_tasks = grid_search(\n", " lb_view, model, digits_split_filenames, svc_params)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The `grid_search` function is using the asynchronous API of the `LoadBalancedView`, we can hence monitor the progress:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "import time\n", "time.sleep(5)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "def progress(tasks):\n", " return np.mean([task.ready() for task_group in tasks\n", " for task in task_group])" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "print(\"Tasks completed: {0}%\".format(100 * progress(all_tasks)))" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Even better, we can introspect the completed task to find the best parameters set so far:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "def find_bests(all_parameters, all_tasks, n_top=5):\n", " \"\"\"Compute the mean score of the completed tasks\"\"\"\n", " mean_scores = []\n", " \n", " for param, task_group in zip(all_parameters, all_tasks):\n", " scores = [t.get() for t in task_group if t.ready()]\n", " if len(scores) == 0:\n", " continue\n", " mean_scores.append((np.mean(scores), param))\n", " \n", " return sorted(mean_scores, reverse=True, key=lambda x: x[0])[:n_top]" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "from pprint import pprint\n", "\n", "print(\"Tasks completed: {0}%\".format(100 * progress(all_tasks)))\n", "pprint(find_bests(all_parameters, all_tasks))" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "[t.wait() for tasks in all_tasks for t in tasks]\n", "print(\"Tasks completed: {0}%\".format(100 * progress(all_tasks)))\n", "pprint(find_bests(all_parameters, all_tasks))" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "heading", "level": 3, "metadata": {}, "source": [ "Optimization Trick: Truncated Randomized Search" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "It is often wasteful to search all the possible combinations of parameters as done previously, especially if the number of parameters is large (e.g. more than 3).\n", "\n", "To speed up the discovery of good parameters combinations, it is often faster to randomized the search order and allocate a budget of evaluations, e.g. 10 or 100 combinations.\n", "\n", "See [this JMLR paper by James Bergstra](http://jmlr.csail.mit.edu/papers/v13/bergstra12a.html) for an empirical analysis of the problem. The interested reader should also have a look at [hyperopt](https://github.com/jaberg/hyperopt) that further refines this parameter search method using meta-optimizers.\n", "\n", "Randomized Parameter Search has just been implemented in the master branch of scikit-learn be part of the 0.14 release." ] }, { "cell_type": "heading", "level": 2, "metadata": {}, "source": [ "A More Complete Parallel Model Selection and Assessment Example" ] }, { "cell_type": "code", "collapsed": false, "input": [ "%matplotlib inline\n", "import matplotlib.pyplot as plt\n", "import numpy as np\n", "\n", "# Some nice default configuration for plots\n", "plt.rcParams['figure.figsize'] = 10, 7.5\n", "plt.rcParams['axes.grid'] = True\n", "plt.gray();" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "lb_view = client.load_balanced_view()\n", "model = SVC()" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "import sys, imp\n", "from collections import OrderedDict\n", "sys.path.append('..')\n", "import model_selection, mmap_utils\n", "imp.reload(model_selection), imp.reload(mmap_utils)\n", "\n", "lb_view.abort()\n", "\n", "svc_params = OrderedDict([\n", " ('gamma', np.logspace(-4, 0, 5)),\n", " ('C', np.logspace(-1, 2, 4)),\n", "])\n", "\n", "search = model_selection.RandomizedGridSeach(lb_view)\n", "search.launch_for_splits(model, svc_params, digits_split_filenames)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "time.sleep(5)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "print(search.report())" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "time.sleep(5)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "print(search.report())\n", "search.boxplot_parameters(display_train=False)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "#search.abort()" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "heading", "level": 2, "metadata": {}, "source": [ "Distributing the Computation on EC2 Spot Instances with StarCluster" ] }, { "cell_type": "heading", "level": 3, "metadata": {}, "source": [ "Installation" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To provision a cheap transient compute cluster on Amazon EC2, the first step is to register on EC2 with a credit card and put your EC2 credentials as environment variables. For instance under Linux / OSX:\n", "\n", " [laptop]% export AWS_ACCESS_KEY_ID=XXXXXXXXXXXXXXXXXXXXX\n", " [laptop]% export AWS_SECRET_ACCESS_KEY=XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\n", "\n", "You can put those exports in your `~/.bashrc` to automatically get those credentials loaded in new shell sessions.\n", "\n", "Then proceed to the installation of StarCluster it-self:\n", "\n", " [laptop]% pip install StarCluster" ] }, { "cell_type": "heading", "level": 3, "metadata": {}, "source": [ "Configuration" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's run the help command a first time and create a template configuration file:\n", "\n", " [laptop]% starcluster help\n", " StarCluster - (http://star.mit.edu/cluster)\n", " Software Tools for Academics and Researchers (STAR)\n", " Please submit bug reports to starcluster@mit.edu\n", " \n", " cli.py:87 - ERROR - config file /home/user/.starcluster/config does not exist\n", " \n", " Options:\n", " --------\n", " [1] Show the StarCluster config template\n", " [2] Write config template to /home/user/.starcluster/config\n", " [q] Quit\n", " \n", " Please enter your selection:\n", " 2\n", "\n", "and create a password-less ssh key that will be dedicated to this transient cluster:\n", " \n", " [laptop]% starcluster createkey mykey -o ~/.ssh/mykey.rsa\n", "\n", " \n", "You can now edit the file `/home/user/.starcluster/config` and remplace its content with the following sample configuration:\n", " \n", " [global]\n", " DEFAULT_TEMPLATE=iptemplate\n", " REFRESH_INTERVAL=5\n", " \n", " [key mykey]\n", " KEY_LOCATION=~/.ssh/mykey.rsa\n", " \n", " [plugin ipcluster]\n", " SETUP_CLASS = starcluster.plugins.ipcluster.IPCluster\n", " ENABLE_NOTEBOOK = True\n", " NOTEBOOK_PASSWD = aaaa\n", " \n", " [plugin ipclusterstop]\n", " SETUP_CLASS = starcluster.plugins.ipcluster.IPClusterStop\n", " \n", " [plugin ipclusterrestart]\n", " SETUP_CLASS = starcluster.plugins.ipcluster.IPClusterRestartEngines\n", " \n", " [plugin pypackages]\n", " setup_class = starcluster.plugins.pypkginstaller.PyPkgInstaller\n", " packages = scikit-learn, psutil\n", " \n", " # Base configuration for IPython.parallel cluster\n", " [cluster iptemplate]\n", " KEYNAME = mykey\n", " CLUSTER_SIZE = 1\n", " CLUSTER_USER = ipuser\n", " CLUSTER_SHELL = bash\n", " REGION = us-east-1\n", " NODE_IMAGE_ID = ami-5b3fb632 # REGION and NODE_IMAGE_ID go in pair\n", " NODE_INSTANCE_TYPE = c1.xlarge # 8 CPUs\n", " DISABLE_QUEUE = True # We don't need SGE, faster cluster startup\n", " PLUGINS = pypackages, ipcluster" ] }, { "cell_type": "heading", "level": 3, "metadata": {}, "source": [ "Launching a Cluster" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Start a new cluster using the `myclustertemplate` section of the `~/.startcluster/config` file:\n", "\n", " [laptop]% starcluster start -c iptemplate -s 3 -b 0.5 mycluster\n", " \n", "- The `-s` option makes it possible to select the number of EC2 instance to start.\n", "\n", "- The `-b` option makes it possible to provision non-master instances on the Spot Instance market\n", "\n", "- To also provision the master node on the Spot Instance market you can further add the `--force-spot-master` flag to the previous commandline.\n", "\n", "- Provisioning Spot Instances is typically up to 5x cheaper than regular instances for largish instance types such as `c1.xlarge` but you run the risk of having your instances shut down if the price goes up. Also provisioning new instances on the Spot market can be slower: often a couple of minutes instead of 30s for On Demand instances.\n", "\n", "- You can access the price history of spot instances of a specific region with:\n", "\n", " [laptop]% starcluster -r us-west-1 spothistory c1.xlarge\n", " StarCluster - (http://star.mit.edu/cluster) (v. 0.9999)\n", " Software Tools for Academics and Researchers (STAR)\n", " Please submit bug reports to starcluster@mit.edu\n", "\n", " >>> Current price: $0.11\n", " >>> Max price: $0.75\n", " >>> Average price: $0.13\n", "\n", "Connect to the master node via ssh:\n", "\n", " [laptop]% starcluster sshmaster -A -u ipuser\n", "\n", "- The `-A` flag makes it possible to use your local ssh agent to manage your keys: makes it possible to `git clone` / `git push` github repositories from the master node as you would from your local folder.\n", "\n", "- The StarCluster AMI comes with `tmux` installed by default.\n", "\n", "It is possible to ssh into other cluster nodes from the master using local DNS aliases such as:\n", "\n", " [myuser@master]% ssh node001" ] }, { "cell_type": "heading", "level": 3, "metadata": {}, "source": [ "Dynamically Resizing the Cluster" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "When using the `LoadBalancedView` API of `IPython.parallel.Client` is it possible to dynamically grow the cluster to shorten the duration of the processing of a queue of task without having to restart from scratch.\n", "\n", "This can be achieved using the `addnode` command, for instance to add 3 more nodes using $0.50 bid price on the Spot Instance market:\n", " \n", " [laptop]% starcluster addnode -s 3 -b 0.5 mycluster\n", " \n", "Each node will automatically run the `IPCluster` plugin and register new `IPEngine` processes to the existing `IPController` process running on master.\n", "\n", "It is also possible to terminate individual running nodes of the cluster with `removenode` command but this will kill any task running on that node and IPython.parallel will **not** restart the failed task automatically." ] }, { "cell_type": "heading", "level": 3, "metadata": {}, "source": [ "Terminating a Cluster" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Once your are done with your computation, don't forget to shutdown the whole cluster and EBS volume so as to only pay for the resources you used.\n", "\n", "Before doing so, don't forget to backup any result file you would like to keep, by either pushing them to the S3 storage service (recommended for large files that you would want to reuse on EC2 later) or fetching them locally using the `starcluster get` command.\n", "\n", "The cluster shutdown itself can be achieved with a single command:\n", "\n", " [laptop]% starcluster terminate mycluster\n", "\n", "Alternatively to can also keep your data by preserving the EBS volume attached to the master node by remplacing the `terminate` command with the `stop` command:\n", "\n", " [laptop]% starcluster stop mycluster\n", "\n", "You can then later restart the same cluster again with the `start` command to automatically remount the EBS volume." ] } ], "metadata": {} } ] }