''' Awesome people who have directly contributed to the project: Jon Palmer - Bug finder & advice on project direction Mahmut Uludag - Bug finder Help: print pybam.wat Github: http://github.com/JohnLonginotto/pybam This code was written by John Longinotto, a PhD student of the Pospisilik Lab at the Max Planck Institute of Immunbiology & Epigenetics, Freiburg. My PhD is funded by the Deutsches Epigenom Programm (DEEP), and the Max Planck IMPRS Program. I study Adipose Biology and Circadian Rhythm in mice, although it seems these days I spend most of my time at the computer :-) ''' import os import sys import zlib import time import tempfile import subprocess from array import array from struct import unpack CtoPy = { 'A':'"' } wat = ''' Main class: pybam.read Github: http://github.com/JohnLonginotto/pybam [ Dynamic Parser Example ] for alignment in pybam.read('/my/data.bam'): print alignment.sam_seq [ Static Parser Example ] for seq,mapq in pybam.read('/my/data.bam',['sam_seq','sam_mapq']): print seq print mapq [ Mixed Parser Example ] my_bam = pybam.read('/my/data.bam',['sam_seq','sam_mapq']) print my_bam._static_parser_code for seq,mapq in my_bam: if seq.startswith('ACGT') and mapq > 10: print my_bam.sam [ Custom Decompressor (from file path) Example ] my_bam = pybam.read('/my/data.bam.lzma',decompressor='lzma --decompress --stdout /my/data.bam.lzma') [ Custom Decompressor (from file object) Example ] my_bam = pybam.read(sys.stdin,decompressor='lzma --decompress --stdout') # data given to lzma via stdin [ Force Internal bgzip Decompressor ] my_bam = pybam.read('/my/data.bam',decompressor='internal') [ Parse Words (hah) ]''' wat += '\n'+''.join([('\n===============================================================================================\n\n ' if code is 'file_alignments_read' or code is 'sam' else ' ')+(code+' ').ljust(25,'-')+description+'\n' for code,description in sorted(parse_codes.items())]) + '\n' class read: ''' [ Dynamic Parser Example ] for alignment in pybam.read('/my/data.bam'): print alignment.sam_seq [ Static Parser Example ] for seq,mapq in pybam.read('/my/data.bam',['sam_seq','sam_mapq']): print seq print mapq [ Mixed Parser Example ] my_bam = pybam.read('/my/data.bam',['sam_seq','sam_mapq']) print my_bam._static_parser_code for seq,mapq in my_bam: if seq.startswith('ACGT') and mapq > 10: print my_bam.sam [ Custom Decompressor (from file path) Example ] my_bam = pybam.read('/my/data.bam.lzma',decompressor='lzma --decompress --stdout /my/data.bam.lzma') [ Custom Decompressor (from file object) Example ] my_bam = pybam.read(sys.stdin,decompressor='lzma --decompress --stdout') # data given to lzma via stdin [ Force Internal bgzip Decompressor ] my_bam = pybam.read('/my/data.bam',decompressor='internal') "print pybam.wat" in the python terminal to see the possible parsable values, or visit http://github.com/JohnLonginotto/pybam for the latest info. ''' def __init__(self,f,fields=False,decompressor=False): self.file_bytes_read = 0 self.file_chromosomes = [] self.file_alignments_read = 0 self.file_chromosome_lengths = {} if fields is not False: if type(fields) is not list or len(fields) is 0: raise PybamError('\n\nFields for the static parser must be provided as a non-empty list. You gave a ' + str(type(fields)) + '\n') else: for field in fields: if field.startswith('sam') or field.startswith('bam'): if field not in parse_codes.keys(): raise PybamError('\n\nStatic parser field "' + str(field) + '" from fields ' + str(fields) + ' is not known to this version of pybam!\nPrint "pybam.wat" to see available field names with explinations.\n') else: raise PybamError('\n\nStatic parser field "' + str(field) + '" from fields ' + str(fields) + ' does not start with "sam" or "bam" and thus is not an avaliable field for the static parsing.\nPrint "pybam.wat" in interactive python to see available field names with explinations.\n') if decompressor: if type(decompressor) is str: if decompressor is not 'internal' and '{}' not in decompressor: raise PybamError('\n\nWhen a custom decompressor is used and the input file is a string, the decompressor string must contain at least one occurence of "{}" to be substituted with a filepath by pybam.\n') else: raise PybamError('\n\nUser-supplied decompressor must be a string that when run on the command line decompresses a named file (or stdin), to stdout:\ne.g. "lzma --decompress --stdout {}" if pybam is provided a path as input file, where {} is substituted for that path.\nor just "lzma --decompress --stdout" if pybam is provided a file object instead of a file path, as data from that file object will be piped via stdin to the decompression program.\n') ## First we make a generator that will return chunks of uncompressed data, regardless of how we choose to decompress: def generator(): DEVNULL = open(os.devnull, 'wb') # First we need to figure out what sort of file we have - whether it's gzip compressed, uncompressed, or something else entirely! if type(f) is str: try: self._file = open(f,'rb') except: raise PybamError('\n\nCould not open "' + str(self._file.name) + '" for reading!\n') try: magic = os.read(self._file.fileno(),4) except: raise PybamError('\n\nCould not read from "' + str(self._file.name) + '"!\n') elif type(f) is file: self._file = f try: magic = os.read(self._file.fileno(),4) except: raise PybamError('\n\nCould not read from "' + str(self._file.name) + '"!\n') else: raise PybamError('\n\nInput file was not a string or a file object. It was: "' + str(f) + '"\n') self.file_name = os.path.basename(os.path.realpath(self._file.name)) self.file_directory = os.path.dirname(os.path.realpath(self._file.name)) if magic == 'BAM\1': # The user has passed us already unzipped BAM data! Job done :) data = 'BAM\1' + self._file.read(35536) self.file_bytes_read += len(data) self.file_decompressor = 'None' while data: yield data data = self._file.read(35536) self.file_bytes_read += len(data) self._file.close() DEVNULL.close() raise StopIteration elif magic == "\x1f\x8b\x08\x04": # The user has passed us compressed gzip/bgzip data, which is typical for a BAM file # use custom decompressor if provided: if decompressor is not False and decompressor is not 'internal': if type(f) is str: self._subprocess = subprocess.Popen( decompressor.replace('{}',f), shell=True, stdout=subprocess.PIPE, stderr=DEVNULL) else: self._subprocess = subprocess.Popen('{ printf "'+magic+'"; cat; } | ' + decompressor, stdin=self._file, shell=True, stdout=subprocess.PIPE, stderr=DEVNULL) self.file_decompressor = decompressor data = self._subprocess.stdout.read(35536) self.file_bytes_read += len(data) while data: yield data data = self._subprocess.stdout.read(35536) self.file_bytes_read += len(data) self._file.close() DEVNULL.close() raise StopIteration # else look for pigz or gzip: else: try: self._subprocess = subprocess.Popen(["pigz"],stdin=DEVNULL,stdout=DEVNULL,stderr=DEVNULL) if self._subprocess.returncode is None: self._subprocess.kill() use = 'pigz' except OSError: try: self._subprocess = subprocess.Popen(["gzip"],stdin=DEVNULL,stdout=DEVNULL,stderr=DEVNULL) if self._subprocess.returncode is None: self._subprocess.kill() use = 'gzip' except OSError: use = 'internal' if use is not 'internal' and decompressor is not 'internal': if type(f) is str: self._subprocess = subprocess.Popen([ use , '--decompress','--stdout', f ], stdout=subprocess.PIPE, stderr=DEVNULL) else: self._subprocess = subprocess.Popen('{ printf "'+magic+'"; cat; } | ' + use + ' --decompress --stdout', stdin=f, shell=True, stdout=subprocess.PIPE, stderr=DEVNULL) time.sleep(1) if self._subprocess.poll() == None: data = self._subprocess.stdout.read(35536) self.file_decompressor = use self.file_bytes_read += len(data) while data: yield data data = self._subprocess.stdout.read(35536) self.file_bytes_read += len(data) self._file.close() DEVNULL.close() raise StopIteration # Python's gzip module can't read from a stream that doesn't support seek(), and the zlib module cannot read the bgzip format without a lot of help: self.file_decompressor = 'internal' raw_data = magic + self._file.read(65536) self.file_bytes_read = len(raw_data) internal_cache = [] blocks_left_to_grab = 50 bs = 0 checkpoint = 0 decompress = zlib.decompress while raw_data: if len(raw_data) - bs < 35536: raw_data = raw_data[bs:] + self._file.read(65536) self.file_bytes_read += len(raw_data) - bs bs = 0 magic = raw_data[bs:bs+4] if not magic: break # a child's heart if magic != "\x1f\x8b\x08\x04": raise PybamError('\n\nThe input file is not in a format I understand. First four bytes: ' + repr(magic) + '\n') try: more_bs = bs + unpack(" sam.\n\nThese two headers should always be the same, but apparently they are not:\nThe ASCII header looks like: ' + self.file_header + '\nWhile the binary header has the following chromosomes: ' + self.file_chromosomes + '\n') ## Variable parsing: def new_entry(header_cache): cache = header_cache # we keep a small cache of X bytes of decompressed BAM data, to smoothen out disk access. p = 0 # where the next alignment/entry starts in the cache while True: try: while len(cache) < p + 4: cache = cache[p:] + next(self._generator); p = 0 # Grab enough bytes to parse blocksize self.sam_block_size = unpack('> 4 , cigar_codes[cig & 0b1111] ) for cig in array('I', self.bam[self._end_of_qname : self._end_of_cigar ])] @property def sam_cigar_string(self): return ''.join( [ str(cig >> 4) + cigar_codes[cig & 0b1111] for cig in array('I', self.bam[self._end_of_qname : self._end_of_cigar ])]) @property def sam_seq(self): return ''.join( [ dna_codes[dna >> 4] + dna_codes[dna & 0b1111] for dna in array('B', self.bam[self._end_of_cigar : self._end_of_seq ])])[:self.sam_l_seq] # As DNA is 4 bits packed 2-per-byte, there might be a trailing '0000', so we can either @property def sam_qual(self): return ''.join( [ chr(ord(quality) + 33) for quality in self.bam[self._end_of_seq : self._end_of_qual ]]) @property def sam_tags_list(self): result = [] offset = self._end_of_qual while offset != len(self.bam): tag_name = self.bam[offset:offset+2] tag_type = self.bam[offset+2] if tag_type == 'Z': offset_end = self.bam.index('\x00',offset+3)+1 tag_data = self.bam[offset+3:offset_end-1] elif tag_type in CtoPy: offset_end = offset+3+py4py[tag_type] tag_data = unpack(CtoPy[tag_type],self.bam[offset+3:offset_end])[0] elif tag_type == 'B': offset_end = offset+8+(unpack('