{ "cells": [ { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "#!ls -lah /cellar/users/btsui/all_seq_snp/Homo_sapiens_all_merged_snp.pickle.gz" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "#!ls -lath /cellar/users/btsui/all_seq_snp/ | head" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "import pandas as pd\n", "import numpy as np\n", "import re\n", "import os\n", "import math\n", "from multiprocessing import Pool\n", "from tqdm import tqdm\n", "## init\n", "#/nrnb/users/btsui/Data/tcga_extracted_lgg_snp/\n", "mySpecie='Homo_sapiens'\n", "#mySpecie='Homo_sapiens'\n", "outMergedDir='/cellar/users/btsui/all_seq_snp/'+mySpecie+'_all_merged_snp.pickle'\n", "\n", "#outMergedDir='/cellar/users/btsui/all_seq_snp/'+mySpecie+'_all_merged_snp.TCGA.pickle'\n", "\n", "##change this dir to point to the updated csv\n", "#full_meta_dir=\"/cellar/users/btsui/Project/METAMAP/notebook/Parsing/sra_dump.csv\"\n", "inSrrDir='/nrnb/users/btsui/Data/all_seq/snp/'\n", "tmp_dir='/nrnb/users/btsui/Data/all_seq/tmp_chunks/'\n" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "#!ls -alh /nrnb/users/btsui/Data/tcga_extracted_lgg_snp/ |wc -l" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "/cellar/users/btsui/anaconda3/lib/python3.6/site-packages/IPython/core/interactiveshell.py:2785: DtypeWarning: Columns (0) have mixed types. Specify dtype option on import or set low_memory=False.\n", " interactivity=interactivity, compiler=compiler, result=result)\n" ] }, { "data": { "text/plain": [ "0" ] }, "execution_count": 5, "metadata": {}, "output_type": "execute_result" } ], "source": [ "tmpBedDf=pd.read_csv('/data/cellardata/users/btsui/dbsnp/snp_beds/'+mySpecie+'.bed',header=None,sep='\\t')\n", "unique_chroms=tmpBedDf[0].astype(np.str).unique()\n", "\n", "### start merging one by one \n", "if os.path.exists(tmp_dir):\n", " os.system('rm -r '+tmp_dir)\n", "os.system('mkdir -p '+tmp_dir)\n", "\n" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "snp_fname_postfix='.txt.snp.gz'" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 5.35 s, sys: 312 ms, total: 5.66 s\n", "Wall time: 3min 8s\n" ] } ], "source": [ "%%time\n", "os.chdir(tmp_dir)\n", "#identify non empty files\n", "os.system('ls -la '+inSrrDir+' > ls_out.txt ')\n", "ls_df=pd.read_csv('ls_out.txt',sep='\\s+',header=None,names=np.arange(9)).iloc[1:]\n", "#ls_df=\n", "size_S=ls_df[4]\n", "m4=size_S.astype(np.int)>1000\n", "m5=ls_df[8].str.contains(snp_fname_postfix+'$')\n", "non_empty_files=ls_df[m4&m5][8].str.split('/').str[-1].str.split('.').str[0].values\n" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "# of files merging 253056\n" ] } ], "source": [ "print ('# of files merging',len(non_empty_files))" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [], "source": [ "from pandas.api.types import CategoricalDtype" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [], "source": [ "chrom_type = CategoricalDtype(categories=unique_chroms,ordered=True)\n", "Run_db_type=CategoricalDtype(categories=['DRR','ERR','SRR'],ordered=True)\n", "myBases=['A','C','G','T']\n", "myBases_type=CategoricalDtype(categories=myBases,ordered=True)" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [], "source": [ "\n", "\n", "def parseSrr(inSrr):\n", " #print inSrr\n", " fname=inSrrDir+inSrr+snp_fname_postfix\n", " tmpDf_all=pd.read_csv(fname,sep='\\s+',low_memory=False,\n", " header=None,names=np.arange(50),index_col=None,error_bad_lines=False)\n", " myCols=['Chr','Pos','Ref','rd_all','','A','C','G','T','N']\n", " tmpDf=tmpDf_all.iloc[:,:len(myCols)]\n", " tmpDf.columns=myCols\n", " tmpDf2=tmpDf.set_index(['Chr','Pos'])\n", " \n", " myL=[]\n", " for base in myBases:\n", " splitL=tmpDf2[base].str.split(':',expand=True)\n", " ### extract the read count and base quality\n", " tmpDf5=splitL[[1,3]].astype(np.float)\n", " tmpDf5.columns=['ReadDepth','AverageBaseQuality']\n", " myL.append(tmpDf5)\n", " tmpDf6=pd.concat(myL,keys=myBases,axis=0,names=['base'])\n", " tmpDf6.columns.name='features'\n", " mergedDf=tmpDf6.astype(np.uint16)\n", " non_zero_df=mergedDf[mergedDf['ReadDepth']>0]\n", " tmpDf7=non_zero_df.reset_index()\n", " Run_digits=re.search('[DES]RR(\\d+)', inSrr)\n", " Run_Db=re.search('([DES]RR)\\d+', inSrr)\n", " tmpDf7['Run_digits']=Run_digits.group(1)\n", " tmpDf7['Run_db']=Run_Db.group(1)\n", " ###convert the datatypes\n", " tmpDf7['Pos']=tmpDf7['Pos'].astype(np.uint32) \n", " tmpDf7['Run_digits']=tmpDf7['Run_digits'].astype(np.uint64)\n", " \n", " tmpDf7['Chr']=tmpDf7['Chr'].astype(np.str).astype(dtype=chrom_type)\n", " tmpDf7['Run_db']=tmpDf7['Run_db'].astype(np.str).astype(dtype=Run_db_type)\n", " tmpDf7['base']=tmpDf7['base'].astype(dtype=myBases_type)\n", " myG=['Run_db','Run_digits',u'Chr', u'Pos',u'base']\n", " tmpDf7=tmpDf7.drop_duplicates(myG)\n", " srr_pickle_df=tmpDf7.set_index(myG).sort_index()\n", " return srr_pickle_df\n" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [], "source": [ "\"\"\"\n", "given: srr id \n", "return: the merged file\n", "\"\"\"\n", "\n", "### identify files to be merged\n", "fnames=pd.Series(os.listdir(inSrrDir))\n", "snpFnames=fnames[fnames.str.contains(snp_fname_postfix+'$')]\n", "srrsWithData=snpFnames.str.split('.').str[0]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### configure for loading data" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "n files to merge: 253056\n" ] } ], "source": [ "m3=srrsWithData.isin(non_empty_files)\n", "toMergeSrrs=srrsWithData[m3].values\n", "print ('n files to merge: ',len(toMergeSrrs) )\n", "TEST=False\n", "if TEST:\n", " toRunSrrs=toMergeSrrs[:10]\n", " chunkSize=5\n", " nThread=1\n", "else:\n", " toRunSrrs=toMergeSrrs\n", " chunkSize=100\n", " nThread=64" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### merge each chunkSize amount of VCFs among into different pickle chunks" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [], "source": [ "import tqdm" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "\r", " 0%| | 0/2531 [00:00