{ "metadata": { "name": "", "signature": "sha256:1e905ee58d7350d06d531e9bfad1e8d2e8967052c3b55ee4214e605e9d6112a4" }, "nbformat": 3, "nbformat_minor": 0, "worksheets": [ { "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "Example with nested parallelism\n", "============================\n", "\n", "This example shows how to benefit from existing parallelism (and optimizations like warm restart)" ] }, { "cell_type": "code", "collapsed": false, "input": [ "from mempamal.datasets import iris\n", "import numpy as np\n", "# iris dataset as usual but with linear regression (Why not! :p).\n", "X, y = iris.get_data()\n", "# adding i.i.d noise to X -> 4 \"real\" features + 10,000 noise features\n", "# provide a sufficient workload to see the interest of nested parallelism\n", "X = np.hstack((X, np.random.randn(X.shape[0], 10000)))" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 1 }, { "cell_type": "code", "collapsed": false, "input": [ "from sklearn.linear_model import ElasticNetCV\n", "from sklearn.cross_validation import StratifiedKFold\n", "from sklearn.pipeline import Pipeline\n", "from sklearn.metrics import r2_score" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 2 }, { "cell_type": "markdown", "metadata": {}, "source": [ "We use the ``ElasticNetCV`` wich provide warm restart and nested parallism (provided by ``joblib``). So, we don't need a inner cross-validation loop since the estimator already provide it. in this example, the grid of parameters contains 500 tuples." ] }, { "cell_type": "code", "collapsed": false, "input": [ "l1_ratios = np.linspace(0.1,1,10).tolist()\n", "s = ElasticNetCV(l1_ratio=l1_ratios, n_alphas=50, n_jobs=-1)\n", "est = Pipeline([(\"enet\", s)])" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 3 }, { "cell_type": "code", "collapsed": false, "input": [ "from mempamal.configuration import JSONify_estimator, JSONify_cv, build_dataset\n", "from mempamal.workflow import create_wf, save_wf" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 4 }, { "cell_type": "markdown", "metadata": {}, "source": [ "We jsonify the estimator and the cross-validation configuration.\n", "We build the dataset in the current directory. \n", "It's create a ``dataset.joblib`` file. \n", "Then we create the workflow in our internal format (``create_wf``). \n", "With ``verbose=True``, it prints the commands on ``stdout``.\n", "And finally, we output the workflow (``save_wf``) in the soma-workflow format \n", "and write it to ``workflow.json`` (need soma-workflow)." ] }, { "cell_type": "code", "collapsed": false, "input": [ "method_conf = JSONify_estimator(est, out=\"./est.json\")\n", "cv_conf = JSONify_cv(StratifiedKFold, cv_kwargs={\"n_folds\": 5},\n", " score_func=r2_score, \n", " stratified=True,\n", " out=\"./cv.json\")\n", "dataset = build_dataset(X, y, method_conf, cv_conf, outputdir=\".\")\n", "wfi = create_wf(dataset['folds'], cv_conf, method_conf, \".\",\n", " verbose=True)\n", "wf = save_wf(wfi, \"./workflow.json\", mode=\"soma-workflow\")" ], "language": "python", "metadata": {}, "outputs": [ { "output_type": "stream", "stream": "stdout", "text": [ "python mempamal/scripts/mapper.py ./cv.json ./est.json ./dataset.joblib ./red_res_0.pkl 0\n", "python mempamal/scripts/mapper.py ./cv.json ./est.json ./dataset.joblib ./red_res_1.pkl 1\n", "python mempamal/scripts/mapper.py ./cv.json ./est.json ./dataset.joblib ./red_res_2.pkl 2\n", "python mempamal/scripts/mapper.py ./cv.json ./est.json ./dataset.joblib ./red_res_3.pkl 3\n", "python mempamal/scripts/mapper.py ./cv.json ./est.json ./dataset.joblib ./red_res_4.pkl 4\n", "python mempamal/scripts/outer_reducer.py ./final_res.pkl ./red_res_{outer}.pkl\n" ] } ], "prompt_number": 5 }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now, we create a ``WorkflowController`` and we set the number of processors to 1 in order to let the nested parallelism using the ressources.\n", "We submit the workflow and we wait for workflow completion, then we read the final results." ] }, { "cell_type": "code", "collapsed": false, "input": [ "from soma_workflow.client import WorkflowController\n", "\n", "import time\n", "import json\n", "import sklearn.externals.joblib as joblib\n", "\n", "controller = WorkflowController()\n", "# limit the scheduler to 1 task (for the nested parallelism)\n", "old_nproc = controller.scheduler_config.get_proc_nb()\n", "controller.scheduler_config.set_proc_nb(1)\n", "wf_id = controller.submit_workflow(workflow=wf, name=\"fourth example\")\n", "\n", "while controller.workflow_status(wf_id) != 'workflow_done':\n", " time.sleep(2)\n", "# reset the scheduler policy\n", "controller.scheduler_config.set_proc_nb(old_nproc)\n", "\n", "print(joblib.load('./final_res.pkl'))" ], "language": "python", "metadata": {}, "outputs": [ { "output_type": "stream", "stream": "stdout", "text": [ "light mode\n", "{'std': 0.01165793880264987, 'raw': array([ 0.88502578, 0.88896717, 0.89306213, 0.88935599, 0.86066079]), 'median': 0.88896717072541787, 'mean': 0.88341437212120044}" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "\n" ] } ], "prompt_number": 6 } ], "metadata": {} } ] }