{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# R-based metabolomics workflow by Kultima lab " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This notebook aims to show, through a series of examples, how to set up a metabolomics workflow using the Chronos REST API. As benchmark case we use a R-based pipeline by the Kultima lab. The aim of this pipeline is to: \n", "1. Remove contaminants present in blank samples;\n", "2. Remove batch specific features;\n", "3. Transform the intensities to the log2 base scale; \n", "4. Perform variable selection based on coefficients of variation (CV) on metabolites across replicates." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "

\n", " \n", "

" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The code snippets in this notebook use Python to consume the Chronos REST API, in order to set up a Direct Acyclic Graph (DAG), which defines the workflow. Each node in the DAG represents a microservice that performs a specific task. Once the DAG is properly setup, Chronos will figure out the dependencies between the various microservices, running them in the correct order, and keeping them alive only for the time they are needed. Furthermore, independent microservices will be run in parallel.\n", "\n", "Chronos REST API calls define nodes in the DAG. REST calls are performed through HTTP requests on some well-defined URLs, which contain arguments in JSON format. The Chronos REST API is documented in this [page](https://mesos.github.io/chronos/docs/api.html)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Prerequisites\n", "\n", "There are some prerequisites to fulfill in order to successfully run the examples in this page." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* Please run the following snippet and insert your MANTL control node URL" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "control=input()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* Please run the following code snippet and insert your MANTL admin password" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "import getpass\n", "password=getpass.getpass()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* Please run the following code snippet to get some input data for the workflow" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "import urllib.request\n", "urllib.request.urlretrieve(\n", " \"https://raw.githubusercontent.com/phnmnl/workflow-demo/master/data/inputdata_workshop.xls\", # mdownload URL\n", " \"inputdata_workshop.xls\" # local path\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* Please run the following code snippet to setup Python requests package" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "import requests\n", "from requests.packages.urllib3.exceptions import InsecureRequestWarning\n", "requests.packages.urllib3.disable_warnings(InsecureRequestWarning) # suppress warnings" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 1: Blank filter - Contaminants Removal\n", "\n", "When perfoming any mass spectrometry (MS) study, it happens that you get plastic or other contaminent within your samples. When you furher analyze your samples with MS, these contaminant will be recorded together with the metabolites. To be able to detect and filter these out you often add blank samples (samples with only DMSO in them) in between the normal samples in the runorder.\n", "\n", "In this step we aim to remove the contaminants detected in the blanks, from the rest of our samples. The theory behind it is to remove everything that has an intensity of X in the blanks compared to the samples. For example, everything that, in the blanks, has an intensity of 1% or higher of the intensities in the other samples.\n", "\n", "The input data in the prerequisites will be used here as input. Hence, we need to mount the Jupyther working directory (/mnt/container-volumes/jupyter) as a volume in the docker container. Please go through the following code snippet, and use the [Chronos REST API documentation](https://mesos.github.io/chronos/docs/api.html), to figure out the meaning of the JSON data that is sent with the HTTP request. Once you are done with that please run the snippet and check the Chronos interface (which you can access throug the MANTL UI)." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false, "scrolled": true }, "outputs": [], "source": [ "url=\"https://admin:\"+password+\"@\"+control+\"/chronos/scheduler/iso8601\"\n", "json=\"\"\"\n", "{ \n", " \"schedule\" : \"R0/2030-01-01T12:00:00Z/PT1H\", \n", " \"cpus\": \"0.25\",\n", " \"mem\": \"128\", \n", " \"epsilon\" : \"PT10M\", \n", " \"name\" : \"blank-filter\",\n", " \"container\": {\n", " \"type\": \"DOCKER\",\n", " \"image\": \"farmbio/blankfilter\",\n", " \"volumes\": [{\n", " \"hostPath\": \"/mnt/container-volumes/jupyter\",\n", " \"containerPath\": \"/data\",\n", " \"mode\": \"RW\"\n", " }]\n", " },\n", " \"command\" : \"Rscript BlankFilter.r /data/inputdata_workshop.xls /data/output_BlankFilter.xls\",\n", " \"owner\" : \"user@example.com\"\n", "}\n", "\"\"\"\n", "response=requests.post(url, headers = {'content-type' : 'application/json'}, data=json, verify=False)\n", "print(\"HTTP response code: \" + str(response.status_code))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "N.B. You can ignore any warning about unverified HTTP requests. Response code 204 means that the REST call succeed." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 2: BatchfeatureRemoval - Removal of batch specific features\n", "\n", "In MS studies you may, if you have many samples, prepare the samples in batches. By doing so you introduce a risk of having features that are unique within a batch, which is not desirable. \n", "\n", "In this step we remove features that have a coverage of 80% within one batch, but not in any other. In other words, here we remove batch specific features.\n", "\n", "The input data of this step comes form the *blank filter*, hence in the JSON parameters we will set the previous step as parent. In this way Chronos will make sure to run the microservices in the correct order. Please go through the following code snippet, run it and check the Chronos interface. " ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "url=\"https://admin:\"+password+\"@\"+control+\"/chronos/scheduler/dependency\"\n", "json=\"\"\"\n", "{ \n", " \"parents\" : [\"blank-filter\"],\n", " \"cpus\": \"0.25\",\n", " \"mem\": \"128\", \n", " \"epsilon\" : \"PT10M\", \n", " \"name\" : \"batchfeature-removal\",\n", " \"container\": {\n", " \"type\": \"DOCKER\",\n", " \"image\": \"farmbio/batchfeatureremoval\",\n", " \"volumes\": [{\n", " \"hostPath\": \"/mnt/container-volumes/jupyter\",\n", " \"containerPath\": \"/data\",\n", " \"mode\": \"RW\"\n", " }]\n", " },\n", " \"command\" : \"Rscript BatchfeatureRemoval.r /data/output_BlankFilter.xls /data/output_BatchfeatureRemoval.xls\",\n", " \"owner\" : \"user@example.com\"\n", "}\n", "\"\"\"\n", "response=requests.post(url, headers = {'content-type' : 'application/json'}, data=json, verify=False)\n", "print(\"HTTP response code: \" + str(response.status_code))" ] }, { "cell_type": "markdown", "metadata": { "collapsed": true }, "source": [ "## Step 3: log2transformation - Transforming the data to the log2 base scale\n", "\n", "In this step we map features intensities to the the log2 scale. Additionally, missing values are imputed by zeros.\n", "\n", "Please go through the following code snippet, run it and check the Chronos interface." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "url=\"https://admin:\"+password+\"@\"+control+\"/chronos/scheduler/dependency\"\n", "json=\"\"\"\n", "{ \n", " \"parents\" : [\"batchfeature-removal\"],\n", " \"cpus\": \"0.25\",\n", " \"mem\": \"128\", \n", " \"epsilon\" : \"PT10M\", \n", " \"name\" : \"log2-transformation\",\n", " \"container\": {\n", " \"type\": \"DOCKER\",\n", " \"image\": \"farmbio/log2transformation\",\n", " \"volumes\": [{\n", " \"hostPath\": \"/mnt/container-volumes/jupyter\",\n", " \"containerPath\": \"/data\",\n", " \"mode\": \"RW\"\n", " }]\n", " },\n", " \"command\" : \"Rscript log2transformation.r /data/output_BatchfeatureRemoval.xls /data/output_log2transformation.xls\",\n", " \"owner\" : \"user@example.com\"\n", "}\n", "\"\"\"\n", "response=requests.post(url, headers = {'content-type' : 'application/json'}, data=json, verify=False)\n", "print(\"HTTP response code: \" + str(response.status_code))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 4: Splitter - Splits data according to sample names\n", "\n", "In order to parallelize microservices, we need to split the data into the subsets that will be processed simultaneously. In this step we divide the data basing on the first five letters of the sample names.\n", "\n", "Please go through the following code snippet, run it and check the Chronos interface." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "url=\"https://admin:\"+password+\"@\"+control+\"/chronos/scheduler/dependency\"\n", "json=\"\"\"\n", "{ \n", " \"parents\" : [\"log2-transformation\"],\n", " \"cpus\": \"0.25\",\n", " \"mem\": \"128\", \n", " \"epsilon\" : \"PT10M\", \n", " \"name\" : \"splitter\",\n", " \"container\": {\n", " \"type\": \"DOCKER\",\n", " \"image\": \"farmbio/splitter\",\n", " \"volumes\": [{\n", " \"hostPath\": \"/mnt/container-volumes/jupyter\",\n", " \"containerPath\": \"/data\",\n", " \"mode\": \"RW\"\n", " }]\n", " },\n", " \"command\" : \"Rscript Splitter.r /data/output_log2transformation.xls /data/output_splitter\",\n", " \"owner\" : \"user@example.com\"\n", "}\n", "\"\"\"\n", "response=requests.post(url, headers = {'content-type' : 'application/json'}, data=json, verify=False)\n", "print(\"HTTP response code: \" + str(response.status_code))" ] }, { "cell_type": "markdown", "metadata": { "collapsed": true }, "source": [ "## Step 5: CV - Calculate the coefficient of variation\n", "\n", "In this step we calculate the coefficient of variation for each feature present within the samples.\n", "\n", "In the previous step we divided the working set by sample. It is very convenient now to run multiple instances of the CV microservice in parallel, in order to save running time. In the code snippet we use the header of the *inputdata_workshop.xls* file to figure out the file names that will come out of the previous step. Then for each file we submit a new job to Chronos. \n", "\n", "Please go through the following code snippet, run it and check the Chronos interface." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "import os\n", "os.mkdir(\"output_cv\") # Create a folder for CV output\n", "# Figure out samples from the header of the input file\n", "with open('inputdata_workshop.xls', 'r') as f: header = f.readline()\n", "samples = list(set(map(lambda s: s[1:-1][:5],header.split(\"\\t\"))))\n", "samples.remove('BLANK') # BLANK doesn't need to be processed\n", "# Create a microservice for each sample\n", "url=\"https://admin:\"+password+\"@\"+control+\"/chronos/scheduler/dependency\"\n", "for s in samples:\n", " json=\"\"\"\n", " { \n", " \"parents\" : [\"splitter\"],\n", " \"cpus\": \"0.25\",\n", " \"mem\": \"128\", \n", " \"epsilon\" : \"PT10M\", \n", " \"name\" : \"cv-%(jobname)s\",\n", " \"container\": {\n", " \"type\": \"DOCKER\",\n", " \"image\": \"farmbio/cv\",\n", " \"volumes\": [{\n", " \"hostPath\": \"/mnt/container-volumes/jupyter\",\n", " \"containerPath\": \"/data\",\n", " \"mode\": \"RW\"\n", " }]\n", " },\n", " \"command\" : \"Rscript CV.r /data/output_splitter/%(sample)s.xls /data/output_cv/%(sample)s_cv.xls\",\n", " \"owner\" : \"user@example.com\"\n", " }\n", " \"\"\" % {\"sample\" : s, \"jobname\": s.replace(\".\", \"_\")}\n", " response=requests.post(url, headers = {'content-type' : 'application/json'}, data=json, verify=False)\n", " print(s + \" HTTP response code: \" + str(response.status_code))" ] }, { "cell_type": "markdown", "metadata": { "collapsed": true }, "source": [ "## Step 6: Merger - Merges several files into one, columnwise\n", "\n", "In step 5 we processed many samples in parallel, and before proceding to the feature selection we need to merge them again in a single file. This step can be run only after all of the jobs that have been generated by the previous step are finished. Hence, in the JSON we will specify all the jobs from step 5 in the parent field.\n", "\n", "Please go through the following code snippet, run it and check the Chronos interface." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "url=\"https://admin:\"+password+\"@\"+control+\"/chronos/scheduler/dependency\"\n", "# Format job names from step 5 in JSON array format\n", "jobNames = \"[\" + \",\".join(map(lambda s: '\"cv-'+s.replace(\".\", \"_\")+'\"',samples)) + \"]\"\n", "json=\"\"\"\n", "{ \n", " \"parents\" : %(parents)s,\n", " \"cpus\": \"0.25\",\n", " \"mem\": \"128\", \n", " \"epsilon\" : \"PT10M\", \n", " \"name\" : \"merger\",\n", " \"container\": {\n", " \"type\": \"DOCKER\",\n", " \"image\": \"farmbio/merger\",\n", " \"volumes\": [{\n", " \"hostPath\": \"/mnt/container-volumes/jupyter\",\n", " \"containerPath\": \"/data\",\n", " \"mode\": \"RW\"\n", " }]\n", " },\n", " \"command\" : \"Rscript Merger.r /data/output_cv /data/output_Merger.xls\",\n", " \"owner\" : \"user@example.com\"\n", "}\n", "\"\"\" % {\"parents\" : jobNames}\n", "#print(\"HTTP response code: \" + json)\n", "response=requests.post(url, headers = {'content-type' : 'application/json'}, data=json, verify=False)\n", "print(\"HTTP response code: \" + str(response.status_code))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 7: FeatureSelection - Extract features with low CV\n", "\n", "In this final step we extract stable features, basing on the median coefficient of variation (step 5). \n", "\n", "Please go through the following code snippet, run it and check the Chronos interface." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "url=\"https://admin:\"+password+\"@\"+control+\"/chronos/scheduler/dependency\"\n", "json=\"\"\"\n", "{ \n", " \"parents\" : [\"merger\"],\n", " \"cpus\": \"0.25\",\n", " \"mem\": \"128\", \n", " \"epsilon\" : \"PT10M\", \n", " \"name\" : \"feature-selection\",\n", " \"container\": {\n", " \"type\": \"DOCKER\",\n", " \"image\": \"farmbio/featureselection\",\n", " \"volumes\": [{\n", " \"hostPath\": \"/mnt/container-volumes/jupyter\",\n", " \"containerPath\": \"/data\",\n", " \"mode\": \"RW\"\n", " }]\n", " },\n", " \"command\" : \"Rscript FeatureSelection.r /data/output_log2transformation.xls /data/output_Merger.xls /data/output_FeatureSelection.xls\",\n", " \"owner\" : \"user@example.com\"\n", "}\n", "\"\"\"\n", "response=requests.post(url, headers = {'content-type' : 'application/json'}, data=json, verify=False)\n", "print(\"HTTP response code: \" + str(response.status_code))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Run the workflow\n", "\n", "Now the DAG that defines our workflow is setup, and you can force the run of the *blank filter* which is scheduled for (01/01/2030). This can be done via REST API as well, but it is conveniet to do this through the Chronos UI." ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.5.1" } }, "nbformat": 4, "nbformat_minor": 0 }