{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Introduction\n", "\n", "In this notebook, we will use the module concurrent.futures to benefit from multithreading to parallelize the evaluation of a function.\n", "\n", "We will also treat a common case where the executable code we want to wrap uses input/output files and how it affects parallelization.\n", "\n", "If you're using Python >= 3.2 this module is available by default\n", "\n", "else you might want to install it using one of these:\n", "\n", "$ pip install futures --user\n", "\n", "\\# sudo apt-get install python-concurrent.futures\n", "\n", "$ conda install futures\n" ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "collapsed": false }, "outputs": [], "source": [ "from __future__ import print_function\n", "import openturns as ot\n", "import openturns.coupling_tools as otct\n", "import concurrent.futures\n", "import math as m\n", "import tempfile\n", "import shutil" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Multithreaded function decorator\n", "\n", "This will allow one regular Python function to take advantage of multi-threading thanks to concurrent.futures.\n", "\n" ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "collapsed": true }, "outputs": [], "source": [ "def multithread(f):\n", " def inner(X):\n", " size = len(X)\n", " with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:\n", " future_to_y = {executor.submit(f, X[i]): i for i in range(size)}\n", " Y = [[]]*size\n", " for future in concurrent.futures.as_completed(future_to_y):\n", " i = future_to_y[future]\n", " x = X[i]\n", " if future.exception() is not None:\n", " print('%s generated an exception: %s' % (str(x), future.exception()))\n", " Y[i] = future.result()\n", " return Y\n", " return inner\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "The decorated function will be replaced by its multithreaded counterpart." ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[[0.11056650323300793], [-0.7599636278619369], [-0.10410872208238846], [0.8245660033257063], [-0.05753438937971085], [-0.0428861679170567], [-0.6622663701610637], [0.09296920661207787], [0.6430689107595504], [0.44647079261717804]]\n" ] } ], "source": [ "@multithread\n", "def my_func(X):\n", " x0, x1, x2 = X\n", " y = m.sin(x0)*m.cos(x1)*m.exp(x2)\n", " return [y]\n", "X = ot.Normal(3).getSample(10)\n", "Y = my_func(X)\n", "print(Y)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## File-isolation decorator\n", "Sometimes you need to wrap an executable that needs input/output files.\n", "\n", "A standard way of handling multithreading for this kind of wrapper\n", "is to isolate the executable in a temporary directory.\n" ] }, { "cell_type": "code", "execution_count": 4, "metadata": { "collapsed": true }, "outputs": [], "source": [ "def isolate(files):\n", " def wrap(f):\n", " def inner(*args, **kwargs):\n", " tmpdir = tempfile.mkdtemp()\n", " for filex in files:\n", " shutil.copy(filex, tmpdir)\n", " kwargs['cwd'] = tmpdir\n", " out = f(*args, **kwargs)\n", " shutil.rmtree(tmpdir)\n", " return out\n", " return inner\n", " return wrap" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's say our executable reads input values from input.txt and outputs results in output.txt.\n", "\n", "We can create a template file input.txt.in in which our wrapper will replace the values of X.\n" ] }, { "cell_type": "code", "execution_count": 5, "metadata": { "collapsed": false }, "outputs": [], "source": [ "with open('input.txt.in', 'w') as f:\n", " f.write('x0=@x0@;x1=@x1@;x2=@x2@')\n", "with open('executable.py', 'w') as f:\n", " f.write('exec(open(\"./input.txt\").read())\\n')\n", " f.write('from math import *\\n')\n", " f.write('y = cos(x0) * sin(x1) * exp(x2)\\n')\n", " f.write('with open(\"output.txt\", \"w\") as f:\\n')\n", " f.write(' f.write(\"y=\"+str(y))\\n')\n", " " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "So we will have to copy the code and the template input file to the temporary directory." ] }, { "cell_type": "code", "execution_count": 6, "metadata": { "collapsed": true }, "outputs": [], "source": [ "@multithread\n", "@isolate(['executable.py', 'input.txt.in'])\n", "def my_func(X, cwd='.'):\n", " tokens = ['@x0@', '@x1@', '@x2@']\n", " otct.replace(cwd+'/'+'input.txt.in', cwd+'/'+'input.txt', tokens, X)\n", " err = otct.execute('python executable.py', cwd=cwd)\n", " y = otct.get_value(cwd+'/'+'output.txt', token='y=')\n", " return [y]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Then you can use it in OpenTURNS:" ] }, { "cell_type": "code", "execution_count": 7, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "probabilityEstimate=1.375000e-02 varianceEstimate=1.904381e-05 standard deviation=4.36e-03 coefficient of variation=3.17e-01 confidenceLength(0.95)=1.71e-02 outerSampling=100 blockSize=8\n" ] } ], "source": [ "model = ot.PythonFunction(3, 1, func_sample=my_func)\n", "vect = ot.RandomVector(ot.Normal(3))\n", "composite = ot.CompositeRandomVector(model, vect)\n", "event = ot.ThresholdEvent(composite, ot.Less(), -3.0)\n", "experiment = ot.MonteCarloExperiment()\n", "algo = ot.ProbabilitySimulationAlgorithm(event, experiment)\n", "algo.setMaximumOuterSampling(100)\n", "algo.setBlockSize(8)\n", "algo.run()\n", "print(algo.getResult())\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 2", "language": "python", "name": "python2" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 2 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython2", "version": "2.7.3" } }, "nbformat": 4, "nbformat_minor": 0 }