{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Introduction\n",
    "\n",
    "In this notebook, we will use the module concurrent.futures to benefit from multithreading to parallelize the evaluation of a function.\n",
    "\n",
    "We will also treat a common case where the executable code we want to wrap uses input/output files and how it affects parallelization.\n",
    "\n",
    "If you're using Python >= 3.2 this module is available by default\n",
    "\n",
    "else you might want to install it using one of these:\n",
    "\n",
    "$ pip install futures --user\n",
    "\n",
    "\\# sudo apt-get install python-concurrent.futures\n",
    "\n",
    "$ conda install futures\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "from __future__ import print_function\n",
    "import openturns as ot\n",
    "import openturns.coupling_tools as otct\n", 
    "import concurrent.futures\n",
    "import math as m\n",
    "import tempfile\n",
    "import shutil"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Multithreaded function decorator\n",
    "\n",
    "This will allow one regular Python function to take advantage of multi-threading thanks to concurrent.futures.\n",
    "\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "def multithread(f):\n",
    "    def inner(X):\n",
    "        size = len(X)\n",
    "        with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:\n",
    "            future_to_y = {executor.submit(f, X[i]): i for i in range(size)}\n",
    "        Y = [[]]*size\n",
    "        for future in concurrent.futures.as_completed(future_to_y):\n",
    "            i = future_to_y[future]\n",
    "            x = X[i]\n",
    "            if future.exception() is not None:\n",
    "                print('%s generated an exception: %s' % (str(x), future.exception()))\n",
    "            Y[i] = future.result()\n",
    "        return Y\n",
    "    return inner\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "\n",
    "The decorated function will be replaced by its multithreaded counterpart."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[[0.11056650323300793], [-0.7599636278619369], [-0.10410872208238846], [0.8245660033257063], [-0.05753438937971085], [-0.0428861679170567], [-0.6622663701610637], [0.09296920661207787], [0.6430689107595504], [0.44647079261717804]]\n"
     ]
    }
   ],
   "source": [
    "@multithread\n",
    "def my_func(X):\n",
    "    x0, x1, x2 = X\n",
    "    y = m.sin(x0)*m.cos(x1)*m.exp(x2)\n",
    "    return [y]\n",
    "X = ot.Normal(3).getSample(10)\n",
    "Y = my_func(X)\n",
    "print(Y)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## File-isolation decorator\n",
    "Sometimes you need to wrap an executable that needs input/output files.\n",
    "\n",
    "A standard way of handling multithreading for this kind of wrapper\n",
    "is to isolate the executable in a temporary directory.\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "def isolate(files):\n",
    "    def wrap(f):\n",
    "        def inner(*args, **kwargs):\n",
    "            tmpdir = tempfile.mkdtemp()\n",
    "            for filex in files:\n",
    "                shutil.copy(filex, tmpdir)\n",
    "            kwargs['cwd'] = tmpdir\n",
    "            out = f(*args, **kwargs)\n",
    "            shutil.rmtree(tmpdir)\n",
    "            return out\n",
    "        return inner\n",
    "    return wrap"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Let's say our executable reads input values from input.txt and outputs results in output.txt.\n",
    "\n",
    "We can create a template file input.txt.in in which our wrapper will replace the values of X.\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "with open('input.txt.in', 'w') as f:\n",
    "    f.write('x0=@x0@;x1=@x1@;x2=@x2@')\n",
    "with open('executable.py', 'w') as f:\n",
    "    f.write('exec(open(\"./input.txt\").read())\\n')\n",
    "    f.write('from math import *\\n')\n",
    "    f.write('y = cos(x0) * sin(x1) * exp(x2)\\n')\n",
    "    f.write('with open(\"output.txt\", \"w\") as f:\\n')\n",
    "    f.write('   f.write(\"y=\"+str(y))\\n')\n",
    "    "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "So we will have to copy the code and the template input file to the temporary directory."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "@multithread\n",
    "@isolate(['executable.py', 'input.txt.in'])\n",
    "def my_func(X, cwd='.'):\n",
    "    tokens = ['@x0@', '@x1@', '@x2@']\n",
    "    otct.replace(cwd+'/'+'input.txt.in', cwd+'/'+'input.txt', tokens, X)\n",
    "    err = otct.execute('python executable.py', cwd=cwd)\n",
    "    y = otct.get_value(cwd+'/'+'output.txt', token='y=')\n",
    "    return [y]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Then you can use it in OpenTURNS:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "probabilityEstimate=1.375000e-02 varianceEstimate=1.904381e-05 standard deviation=4.36e-03 coefficient of variation=3.17e-01 confidenceLength(0.95)=1.71e-02 outerSampling=100 blockSize=8\n"
     ]
    }
   ],
   "source": [
    "model = ot.PythonFunction(3, 1, func_sample=my_func)\n",
    "vect = ot.RandomVector(ot.Normal(3))\n",
    "composite = ot.CompositeRandomVector(model, vect)\n",
    "event = ot.ThresholdEvent(composite, ot.Less(), -3.0)\n",
    "experiment = ot.MonteCarloExperiment()\n", 
    "algo = ot.ProbabilitySimulationAlgorithm(event, experiment)\n",
    "algo.setMaximumOuterSampling(100)\n",
    "algo.setBlockSize(8)\n",
    "algo.run()\n",
    "print(algo.getResult())\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 2",
   "language": "python",
   "name": "python2"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 2
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython2",
   "version": "2.7.3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 0
}