{ "cells": [ { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "#!pwd" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "/cellar/users/btsui/anaconda3/lib/python3.6/site-packages/IPython/core/interactiveshell.py:2785: DtypeWarning: Columns (5,6,25,26) have mixed types. Specify dtype option on import or set low_memory=False.\n", " interactivity=interactivity, compiler=compiler, result=result)\n" ] } ], "source": [ "import os\n", "os.chdir('/cellar/users/btsui/Project/METAMAP/notebook/RapMapTest/Pipelines/RNAseq/merge')\n", "\n", "import sys\n", "sys.path+=['../']\n", "\n", "import pandas as pd\n", "import numpy as np\n", "from tqdm import tqdm\n", "from multiprocessing import Pool\n", "import param\n", "#from IPython.utils import io\n", "import gc\n", "\n", "sra_dump_df=pd.read_csv(param.full_meta_dir)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### check if data is in SRA" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "n SRR: 841085\n" ] } ], "source": [ "allProcessedFnames=pd.Series(os.listdir(param.count_out_dir),dtype=np.str)\n", "\n", "postfix_m=allProcessedFnames.str.contains('.abundance.tsv.gz$')\n", "processedRnaseqSrr=allProcessedFnames[postfix_m].str.split('.').str[0].unique()\n", "\n", "print ('n SRR: ', len(processedRnaseqSrr))\n", "\n", "sra_dump_df['Skymap_TranscriptCount_Processed']=sra_dump_df['Run'].isin(processedRnaseqSrr)\n", "\n", "m_processed=sra_dump_df['Skymap_TranscriptCount_Processed']" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "outputFolder='/nrnb/users/btsui/Data/all_seq/rnaseq_merged_chunks/'" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "31488" ] }, "execution_count": 5, "metadata": {}, "output_type": "execute_result" } ], "source": [ "os.system('find -size {} 0 -print0 |xargs -0 rm --'.format(outputFolder))" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "processedFnames=pd.Series(os.listdir(outputFolder))" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "refreshDb=False\n", "if refreshDb: \n", " os.system('rm -r {}'.format(outputFolder))" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [], "source": [ "#processedFnames" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### select the species" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "" ] }, "execution_count": 9, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import itertools\n", "import gc\n", "\n", "itertools" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Homo_sapiens est_counts\n", "# of samples to merge: 304908\n", "n chunks: 109\n", "n chunks remaining: 0\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "\r", "0it [00:00, ?it/s]" ] }, { "name": "stdout", "output_type": "stream", "text": [ "Homo_sapiens tpm\n", "# of samples to merge: 304908\n", "n chunks: 109\n", "n chunks remaining: 0\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "\n", "0it [00:00, ?it/s]\u001b[A\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "Mus_musculus est_counts\n", "# of samples to merge: 489541\n", "n chunks: 110\n", "n chunks remaining: 1\n" ] } ], "source": [ "faBaseDir='/cellar/users/btsui/Data/ensembl/release/cdna/'\n", "\n", "nProcess=3\n", "\n", "metricToDtypeDict={'est_counts':np.uint32,'tpm':np.float32}\n", "\n", "from itertools import product\n", "\n", "specieMetricPairs=list(product( param.supporting_species,metricToDtypeDict.keys()))\n", "#selectedSpecies=\n", "\n", "#%%capture Stdout\n", "#selectedSpecies ,countMetric=('Canis_familiaris', 'tpm') \n", "ignoreLastNDigitsForChunking=5 #less than 1000\n", "for selectedSpecies ,countMetric in specieMetricPairs:\n", " \n", " #identify samples\n", " print (selectedSpecies ,countMetric)\n", " m_specie=sra_dump_df['new_ScientificName']==selectedSpecies\n", " m=m_processed&m_specie\n", " sra_dump_df_sub=sra_dump_df[m]\n", " sra_dump_df_in=sra_dump_df_sub#.head(n=10)\n", " nTotal=sra_dump_df_sub.shape[0]\n", " print ('# of samples to merge:',nTotal)\n", " inSrrs=sra_dump_df_in['Run'].unique()\n", " ### identify the datatype in consideration. \n", " myDtype=metricToDtypeDict[countMetric]\n", " #paralize on chunk level instead. \n", "\n", " inSrrS=pd.Series(inSrrs)\n", " #each prefix is a Chunk ID\n", " groupRunDf=pd.DataFrame({'Prefix':inSrrS.str[:-ignoreLastNDigitsForChunking],'Run':inSrrS})\n", " \n", " print (\"n chunks:\",groupRunDf['Prefix'].nunique())\n", " \"\"\"\n", " For each chunk, export to the folder directly after merging, no message passing\n", " \"\"\"\n", " #check metric if it is matching\n", " processedFnamesSub=processedFnames[processedFnames.str.contains(\"{}.transcript.{}\"\n", " .format( selectedSpecies,countMetric))]\n", " groupRunDf['Processed']=groupRunDf['Prefix'].isin(processedFnamesSub.str.split('.').str[-2].unique())\n", " def parseOne(inSrr):\n", " abundanceDir=param.count_out_dir+'{}.abundance.tsv.gz'.format(inSrr)\n", " tmpDf=pd.read_csv(abundanceDir,sep='\\t')\n", " tmpDf2=tmpDf.set_index('target_id')[countMetric].astype(myDtype)\n", " return tmpDf2\n", " \n", " g=groupRunDf.groupby('Prefix')['Run']\n", " def mergeChunk(Prefix):\n", " outputFname='{}.transcript.{}.{}.pickle'.format(\n", " selectedSpecies,countMetric,Prefix)\n", " if outputFname not in processedFnames:\n", " outputDir=outputFolder+outputFname\n", " try :\n", " subRunS=g.get_group(Prefix)\n", " myL=list(map(parseOne,subRunS))\n", " mergedDf=pd.concat(myL,axis=1,keys=subRunS.values,copy=False,sort=False)\n", "\n", " mergedDf.to_pickle(outputDir)\n", " del (mergedDf, myL)\n", " gc.collect()\n", " except: \n", " print ('Failed: '+Prefix)\n", " ### check if the data is processed or not, remove if it is first iteration.\n", " m=~groupRunDf['Processed']\n", " Prefixes=groupRunDf['Prefix'][m].unique()\n", " print ('n chunks remaining: {}'.format(len(Prefixes)))\n", " with Pool(nProcess) as p:\n", " r=tqdm( p.map(mergeChunk,Prefixes), total=len(Prefixes))\n", " #list(tqdm( map(mergeChunk,Prefixes), total=len(Prefixes)))\n", " " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "len(Prefixes)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#processedFnamesSub.str.split('.').str[-2]" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "Prefixes" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!ls -laSh /nrnb/users/btsui/Data/all_seq/rnaseq_merged_chunks/ | wc -l" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "asdfaf" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#ERR17\n", "groupRunDf[groupRunDf.Prefix=='ERR17'].shape" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#Prefixes" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "\n", "#map( , g.get_group()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "groupRunDf" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#pd.__version__" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# scratch" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!ls -laht /nrnb/users/btsui/Data/all_seq/rnaseq_merged_chunks/" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "\"\"\"#%%capture Stdout\n", "#selectedSpecies ,countMetric=('Canis_familiaris', 'tpm') \n", "ignoreLastNDigitsForChunking=4 #less than 1000\n", "for selectedSpecies ,countMetric in specieMetricPairs:\n", " \n", " #identify samples\n", " print (selectedSpecies ,countMetric)\n", " m_specie=sra_dump_df['new_ScientificName']==selectedSpecies\n", " m=m_processed&m_specie\n", " sra_dump_df_sub=sra_dump_df[m]\n", " sra_dump_df_in=sra_dump_df_sub#.head(n=10)\n", " nTotal=sra_dump_df_sub.shape[0]\n", " print ('# of samples to merge:',nTotal)\n", " inSrrs=sra_dump_df_in['Run'].unique()\n", " ### identify the datatype in consideration. \n", " myDtype=metricToDtypeDict[countMetric]\n", " #paralize on chunk level instead. \n", " def parseOne(inSrr):\n", " abundanceDir=param.count_out_dir+'{}.abundance.tsv.gz'.format(inSrr)\n", " tmpDf=pd.read_csv(abundanceDir,sep='\\t')\n", " tmpDf2=tmpDf.set_index('target_id')[countMetric].astype(myDtype)\n", " return tmpDf2\n", " \n", " #do the merge\n", " inSrrS=pd.Series(inSrrs)\n", " #each prefix is a Chunk ID\n", " groupRunDf=pd.DataFrame({'Prefix':inSrrS.str[:-ignoreLastNDigitsForChunking],'Run':inSrrS})\n", " print (\"n chunks:\",groupRunDf['Prefix'].nunique())\n", " \"\"\"\n", " For each chunk, export to the folder directly after merging, no message passing\n", " \"\"\"\n", " \n", " for Prefix,subRunS in groupRunDf.groupby('Prefix')['Run']:\n", " with Pool(nProcess) as p:\n", " myL=list(tqdm( p.imap(parseOne,subRunS),total=len(subRunS)))\n", " #3 mins per chunk\n", " mergedDf=pd.concat(myL,axis=1,keys=subRunS.values,copy=False,sort=False)\n", " outputDir='/nrnb/users/btsui/Data/all_seq/rnaseq_merged_chunks/{}.transcript.{}.{}.pickle'.format(\n", " selectedSpecies,countMetric,Prefix)\n", " mergedDf.to_pickle(outputDir)\n", " del (mergedDf, myL)\n", " gc.collect()\n", " \"\"\"" ] } ], "metadata": { "kernelspec": { "display_name": "Python [default]", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.5" } }, "nbformat": 4, "nbformat_minor": 2 }