# Note: Please start IPython parallel before running this

In [1]:
import numpy as np
import tables
import vcf

 rpy2.rinterface.initr()

 rpy2.rinterface.initr()

 rpy2.rinterface.initr()

 rpy2.rinterface.initr()

 rpy2.rinterface.initr()

 rpy2.rinterface.initr()

 rpy2.rinterface.initr()

 rpy2.rinterface.initr()

 rpy2.rinterface.initr()


In [2]:
max_pos = 41956556 # This should not be manually input, but we will indulge
chrom = '3L' # ditto
vcf_fname = '../raw/total-3L.vcf.gz'
block_size = 250000
out_dir = '../raw'
out_hdf5_fname = '../raw/example.h5'

In [3]:
import dask
import dask.multiprocessing
from dask import delayed
#from multiprocessing.pool import ThreadPool
dask.set_options(get=dask.multiprocessing.get)



In [4]:
@delayed(pure=True)
def get_pos_dp(out_dir, fname, block_size, block):
 vcf_file = vcf.Reader(filename=vcf_fname)
 start = block * block_size
 temp_block_fname = '%s/block_%06d.h5' % (out_dir, block)
 store = tables.open_file(temp_block_fname, 'w')
 group = store.create_group('/', 'DATA', 'Temporary data')
 positions = []
 annotations = []
 #We are assuming that we have memory for loading
 # that should be fine if careful with granularity
 for rec in vcf_file.fetch(chrom, start, start + block_size):
 #Note that fetch(x, 0, 10) does 1 to 10, compare with Python range
 positions.append(rec.POS)
 genomic_region = rec.INFO['ANN'][0].split('|')[1]
 annotations.append(genomic_region.encode('ascii'))
 pos_array = np.array(positions, dtype=np.uint32)
 ann_array = np.array(annotations, dtype=np.dtype('a'))
 
 # annotations could be compressed with enum
 store.create_array(group, 'positions', pos_array, 'Positions')
 store.create_array(group, 'annotations', ann_array, 'Annotations')
 store.close()
 return temp_block_fname, len(pos_array)

In [5]:
class PositionType(tables.IsDescription):
 name = tables.StringCol(32)
 bit = tables.Int32Col()
 

@delayed(pure=True)
def consolidate_blocks(out_hdf5_name, block_meta):
 def get_pos_annotations(str_ann):
 ann_pos = str_ann.decode('ascii')
 return ann_pos.split('&')
 store = tables.open_file(out_hdf5_name, 'w')
 group = store.create_group('/', 'DATA', 'All data')

 #lets find all genomic position types and encode them for compression
 position_types = set ()
 for block_fname, num_snp in block_meta:
 block_hdf5 = tables.open_file(block_fname, 'r')
 annotations = block_hdf5.root.DATA.annotations.read()
 for ann_pos in annotations:
 for ann in get_pos_annotations(ann_pos):
 position_types.add(ann)
 block_hdf5.close()
 position_type_table = store.create_table(group, 'position_type', PositionType, 'Key to solve position type')
 position_row = position_type_table.row
 name_to_bit = {}
 for bit_pos, position_type in enumerate(position_types):
 #position_row['name'] = position_type
 #position_row['bit'] = bit_pos
 #position_row.append()
 name_to_bit[position_type] = bit_pos
 
 total_size = sum([size for name, size in block_meta])

 
 positions = np.empty(total_size, dtype=np.uint32)
 #empty - talk about it...
 snp_type = np.empty(total_size, dtype=np.uint32)
 curr_pos = 0
 for block_fname, num_snp in block_meta:
 block_hdf5 = tables.open_file(block_fname, 'r')
 for pos, annotations in zip(block_hdf5.root.DATA.positions,
 block_hdf5.root.DATA.annotations):
 positions[curr_pos] = pos
 encode_type = 0
 for position_type in get_pos_annotations(annotations):
 encode_type += 2**name_to_bit[position_type]
 snp_type[curr_pos] = encode_type
 curr_pos += 1
 
 block_hdf5.close()
 #remember to delete
 store.create_array(group, 'positions', positions, 'SNP positions')
 store.create_array(group, 'snp_type', snp_type, 'Type per SNP')
 store.close()
 return block_meta

In [6]:
blocks = range(1 + max_pos // block_size)
block_meta = []
for block in blocks:
 block_meta.append(get_pos_dp(out_dir, vcf_fname, block_size, block))
consolidate = consolidate_blocks(out_hdf5_fname, block_meta)

In [7]:
block_run = consolidate.compute()

In [8]:
#import pickle
#
#pickle_w = open('block_run.pickle', 'wb')
#pickle.dump(block_run, pickle_w)
#pickle_w.close()

In [9]:
#import pickle

#pickle_f = open('block_run.pickle', 'rb')
#block_run = pickle.load(pickle_f)
#pickle_f.close()
#consolidate_blocks(out_hdf5_fname, block_run)

[('../raw/block_000000.h5', 3437),
 ('../raw/block_000001.h5', 3620),
 ('../raw/block_000002.h5', 2979),
 ('../raw/block_000003.h5', 6952),
 ('../raw/block_000004.h5', 3909),
 ('../raw/block_000005.h5', 5112),
 ('../raw/block_000006.h5', 10220),
 ('../raw/block_000007.h5', 15493),
 ('../raw/block_000008.h5', 22648),
 ('../raw/block_000009.h5', 17521),
 ('../raw/block_000010.h5', 17297),
 ('../raw/block_000011.h5', 21994),
 ('../raw/block_000012.h5', 20922),
 ('../raw/block_000013.h5', 22902),
 ('../raw/block_000014.h5', 24039),
 ('../raw/block_000015.h5', 20740),
 ('../raw/block_000016.h5', 16778),
 ('../raw/block_000017.h5', 2229),
 ('../raw/block_000018.h5', 277),
 ('../raw/block_000019.h5', 10580),
 ('../raw/block_000020.h5', 20375),
 ('../raw/block_000021.h5', 25542),
 ('../raw/block_000022.h5', 25291),
 ('../raw/block_000023.h5', 24665),
 ('../raw/block_000024.h5', 20060),
 ('../raw/block_000025.h5', 22722),
 ('../raw/block_000026.h5', 31329),
 ('../raw/block_000027.h5', 27807),
 

In [10]:
#If we have 20 CPUS, why not 20 processes with equal number of blocks --> SNP density.