# # Ogg parser # Author Julien Muchembled # Created: 10 june 2006 # from hachoir.parser import Parser from hachoir.field import (Field, FieldSet, createOrphanField, NullBits, Bit, Bits, Enum, Fragment, MissingField, ParserError, UInt8, UInt16, UInt24, UInt32, UInt64, RawBytes, String, PascalString32, NullBytes) from hachoir.stream import FragmentedStream, InputStreamError from hachoir.core.endian import LITTLE_ENDIAN, BIG_ENDIAN from hachoir.core.tools import humanDurationNanosec from hachoir.core.text_handler import textHandler, hexadecimal MAX_FILESIZE = 1000 * 1024 * 1024 class XiphInt(Field): """ Positive integer with variable size. Values bigger than 254 are stored as (255, 255, ..., rest): value is the sum of all bytes. Example: 1000 is stored as (255, 255, 255, 235), total = 255*3+235 = 1000 """ def __init__(self, parent, name, max_size=None, description=None): Field.__init__(self, parent, name, size=0, description=description) value = 0 addr = self.absolute_address while max_size is None or self._size < max_size: byte = parent.stream.readBits(addr, 8, LITTLE_ENDIAN) value += byte self._size += 8 if byte != 0xff: break addr += 8 self.createValue = lambda: value class Lacing(FieldSet): def createFields(self): size = self.size while size: field = XiphInt(self, 'size[]', size) yield field size -= field.size def parseVorbisComment(parent): yield PascalString32(parent, 'vendor', charset="UTF-8") yield UInt32(parent, 'count') for index in range(parent["count"].value): yield PascalString32(parent, 'metadata[]', charset="UTF-8") if parent.current_size != parent.size: yield UInt8(parent, "framing_flag") PIXEL_FORMATS = { 0: "4:2:0", 2: "4:2:2", 3: "4:4:4", } def formatTimeUnit(field): return humanDurationNanosec(field.value * 100) def parseVideoHeader(parent): yield NullBytes(parent, "padding[]", 2) yield String(parent, "fourcc", 4) yield UInt32(parent, "size") yield textHandler(UInt64(parent, "time_unit", "Frame duration"), formatTimeUnit) yield UInt64(parent, "sample_per_unit") yield UInt32(parent, "default_len") yield UInt32(parent, "buffer_size") yield UInt16(parent, "bits_per_sample") yield NullBytes(parent, "padding[]", 2) yield UInt32(parent, "width") yield UInt32(parent, "height") yield NullBytes(parent, "padding[]", 4) def parseTheoraHeader(parent): yield UInt8(parent, "version_major") yield UInt8(parent, "version_minor") yield UInt8(parent, "version_revision") yield UInt16(parent, "width", "Width*16 in pixel") yield UInt16(parent, "height", "Height*16 in pixel") yield UInt24(parent, "frame_width") yield UInt24(parent, "frame_height") yield UInt8(parent, "offset_x") yield UInt8(parent, "offset_y") yield UInt32(parent, "fps_num", "Frame per second numerator") yield UInt32(parent, "fps_den", "Frame per second denominator") yield UInt24(parent, "aspect_ratio_num", "Aspect ratio numerator") yield UInt24(parent, "aspect_ratio_den", "Aspect ratio denominator") yield UInt8(parent, "color_space") yield UInt24(parent, "target_bitrate") yield Bits(parent, "quality", 6) yield Bits(parent, "gp_shift", 5) yield Enum(Bits(parent, "pixel_format", 2), PIXEL_FORMATS) yield Bits(parent, "spare_config", 3) def parseVorbisHeader(parent): yield UInt32(parent, "vorbis_version") yield UInt8(parent, "audio_channels") yield UInt32(parent, "audio_sample_rate") yield UInt32(parent, "bitrate_maximum") yield UInt32(parent, "bitrate_nominal") yield UInt32(parent, "bitrate_minimum") yield Bits(parent, "blocksize_0", 4) yield Bits(parent, "blocksize_1", 4) yield UInt8(parent, "framing_flag") class Chunk(FieldSet): tag_info = { "vorbis": { 3: ("comment", parseVorbisComment), 1: ("vorbis_hdr", parseVorbisHeader), }, "theora": { 128: ("theora_hdr", parseTheoraHeader), 129: ("comment", parseVorbisComment), }, "video\0": { 1: ("video_hdr", parseVideoHeader), }, } def __init__(self, *args, **kw): FieldSet.__init__(self, *args, **kw) if 7 * 8 <= self.size: try: self._name, self.parser = self.tag_info[ self["codec"].value][self["type"].value] if self._name == "theora_hdr": self.endian = BIG_ENDIAN except KeyError: self.parser = None else: self.parser = None def createFields(self): if 7 * 8 <= self.size: yield UInt8(self, 'type') yield String(self, 'codec', 6) if self.parser: yield from self.parser(self) else: size = (self.size - self.current_size) // 8 if size: yield RawBytes(self, "raw", size) class Packets: def __init__(self, first): self.first = first def __iter__(self): fragment = self.first size = None while fragment is not None: page = fragment.parent continued_packet = page["continued_packet"].value for segment_size in page.segment_size: if continued_packet: size += segment_size continued_packet = False else: if size: yield size * 8 size = segment_size fragment = fragment.__next__ if size: yield size * 8 class Segments(Fragment): def __init__(self, parent, *args, **kw): Fragment.__init__(self, parent, *args, **kw) if parent['last_page'].value: next = None else: next = self.createNext self.setLinks(parent.parent.streams.setdefault( parent['serial'].value, self), next) def _createInputStream(self, **args): if self.first is self: return FragmentedStream(self, packets=Packets(self), tags=[("id", "ogg_stream")], **args) return Fragment._createInputStream(self, **args) def _getData(self): return self def createNext(self): parent = self.parent index = parent.index parent = parent.parent first = self.first try: while True: index += 1 next = parent[index][self.name] if next.first is first: return next except MissingField: pass def createFields(self): for segment_size in self.parent.segment_size: if segment_size: yield Chunk(self, "chunk[]", size=segment_size * 8) class OggPage(FieldSet): def __init__(self, *args): FieldSet.__init__(self, *args) size = 27 self.lacing_size = self['lacing_size'].value if self.lacing_size: size += self.lacing_size lacing = self['lacing'] self.segment_size = [field.value for field in lacing] size += sum(self.segment_size) self._size = size * 8 def createFields(self): yield String(self, 'capture_pattern', 4, charset="ASCII") if self['capture_pattern'].value != "OggS": self.warning( 'Invalid signature. An Ogg page must start with "OggS".') yield UInt8(self, 'stream_structure_version') yield Bit(self, 'continued_packet') yield Bit(self, 'first_page') yield Bit(self, 'last_page') yield NullBits(self, 'unused', 5) yield UInt64(self, 'abs_granule_pos') yield textHandler(UInt32(self, 'serial'), hexadecimal) yield UInt32(self, 'page') yield textHandler(UInt32(self, 'checksum'), hexadecimal) yield UInt8(self, 'lacing_size') if self.lacing_size: yield Lacing(self, "lacing", size=self.lacing_size * 8) yield Segments(self, "segments", size=self._size - self._current_size) def validate(self): if self['capture_pattern'].value != "OggS": return "Wrong signature" if self['stream_structure_version'].value != 0: return "Unknown structure version (%s)" % self['stream_structure_version'].value return "" class OggFile(Parser): PARSER_TAGS = { "id": "ogg", "category": "container", "file_ext": ("ogg", "ogm"), "mime": ( "application/ogg", "application/x-ogg", "audio/ogg", "audio/x-ogg", "video/ogg", "video/x-ogg", "video/theora", "video/x-theora", ), "magic": ((b'OggS', 0),), "subfile": "skip", "min_size": 28 * 8, "description": "Ogg multimedia container" } endian = LITTLE_ENDIAN def validate(self): if self.stream.readBytes(0, 4) != b'OggS': return "Invalid magic string" # Validate first 3 pages for index in range(3): try: page = self[index] except MissingField: if self.done: return True return "Unable to get page #%u" % index except (InputStreamError, ParserError): return "Unable to create page #%u" % index err = page.validate() if err: return "Invalid page #%s: %s" % (index, err) return True def createMimeType(self): if "theora_hdr" in self["page[0]/segments"]: return "video/theora" elif "vorbis_hdr" in self["page[0]/segments"]: return "audio/vorbis" else: return "application/ogg" def createDescription(self): if "theora_hdr" in self["page[0]"]: return "Ogg/Theora video" elif "vorbis_hdr" in self["page[0]"]: return "Ogg/Vorbis audio" else: return "Ogg multimedia container" def createFields(self): self.streams = {} while not self.eof: yield OggPage(self, "page[]") def createLastPage(self): start = self[0].size end = MAX_FILESIZE * 8 if True: # FIXME: This doesn't work on all files (eg. some Ogg/Theora) offset = self.stream.searchBytes(b"OggS\0\5", start, end) if offset is None: offset = self.stream.searchBytes(b"OggS\0\4", start, end) if offset is None: return None return createOrphanField(self, offset, OggPage, "page") else: # Very slow version page = None while True: offset = self.stream.searchBytes(b"OggS\0", start, end) if offset is None: break page = createOrphanField(self, offset, OggPage, "page") start += page.size return page def createContentSize(self): page = self.createLastPage() if page: return page.absolute_address + page.size else: return None class OggStream(Parser): PARSER_TAGS = { "id": "ogg_stream", "category": "container", "subfile": "skip", "min_size": 7 * 8, "description": "Ogg logical stream" } endian = LITTLE_ENDIAN def validate(self): return False def createFields(self): for size in self.stream.packets: yield RawBytes(self, "packet[]", size // 8)