""" FLV video parser. Documentation: - FLV File format: http://osflash.org/flv - libavformat from ffmpeg project - flashticle: Python project to read Flash (SWF and FLV with AMF metadata) http://undefined.org/python/#flashticle Author: Victor Stinner Creation date: 4 november 2006 """ from hachoir.parser import Parser from hachoir.field import (FieldSet, UInt8, UInt24, UInt32, NullBits, NullBytes, Bit, Bits, String, RawBytes, Enum) from hachoir.core.endian import BIG_ENDIAN from hachoir.parser.audio.mpeg_audio import Frame from hachoir.parser.video.amf import AMFObject from hachoir.core.tools import createDict SAMPLING_RATE = { 0: (5512, "5.5 kHz"), 1: (11025, "11 kHz"), 2: (22050, "22.1 kHz"), 3: (44100, "44.1 kHz"), } SAMPLING_RATE_VALUE = createDict(SAMPLING_RATE, 0) SAMPLING_RATE_TEXT = createDict(SAMPLING_RATE, 1) AUDIO_CODEC_MP3 = 2 AUDIO_CODEC_NAME = { 0: "Uncompressed", 1: "ADPCM", 2: "MP3", 5: "Nellymoser 8kHz mono", 6: "Nellymoser", } VIDEO_CODEC_NAME = { 2: "Sorensen H.263", 3: "Screen video", 4: "On2 VP6", } FRAME_TYPE = { 1: "keyframe", 2: "inter frame", 3: "disposable inter frame", } class Header(FieldSet): def createFields(self): yield String(self, "signature", 3, "FLV format signature", charset="ASCII") yield UInt8(self, "version") yield NullBits(self, "reserved[]", 5) yield Bit(self, "type_flags_audio") yield NullBits(self, "reserved[]", 1) yield Bit(self, "type_flags_video") yield UInt32(self, "data_offset") def parseAudio(parent, size): yield Enum(Bits(parent, "codec", 4, "Audio codec"), AUDIO_CODEC_NAME) yield Enum(Bits(parent, "sampling_rate", 2, "Sampling rate"), SAMPLING_RATE_TEXT) yield Bit(parent, "is_16bit", "16-bit or 8-bit per sample") yield Bit(parent, "is_stereo", "Stereo or mono channel") size -= 1 if 0 < size: if parent["codec"].value == AUDIO_CODEC_MP3: yield Frame(parent, "music_data", size=size * 8) else: yield RawBytes(parent, "music_data", size) def parseVideo(parent, size): yield Enum(Bits(parent, "frame_type", 4, "Frame type"), FRAME_TYPE) yield Enum(Bits(parent, "codec", 4, "Video codec"), VIDEO_CODEC_NAME) if 1 < size: yield RawBytes(parent, "data", size - 1) def parseAMF(parent, size): while parent.current_size < parent.size: yield AMFObject(parent, "entry[]") class Chunk(FieldSet): tag_info = { 8: ("audio[]", parseAudio, ""), 9: ("video[]", parseVideo, ""), 18: ("metadata", parseAMF, ""), } def __init__(self, *args, **kw): FieldSet.__init__(self, *args, **kw) self._size = (11 + self["size"].value) * 8 tag = self["tag"].value if tag in self.tag_info: self._name, self.parser, self._description = self.tag_info[tag] else: self.parser = None def createFields(self): yield UInt8(self, "tag") yield UInt24(self, "size", "Content size") yield UInt24(self, "timestamp", "Timestamp in millisecond") yield NullBytes(self, "reserved", 4) size = self["size"].value if size: if self.parser: yield from self.parser(self, size) else: yield RawBytes(self, "content", size) def getSampleRate(self): try: return SAMPLING_RATE_VALUE[self["sampling_rate"].value] except LookupError: return None class FlvFile(Parser): PARSER_TAGS = { "id": "flv", "category": "video", "file_ext": ("flv",), "mime": ("video/x-flv",), "min_size": 9 * 4, "magic": ( # Signature, version=1, flags=5 (video+audio), header size=9 (b"FLV\1\x05\0\0\0\x09", 0), # Signature, version=1, flags=5 (video), header size=9 (b"FLV\1\x01\0\0\0\x09", 0), ), "description": "Macromedia Flash video" } endian = BIG_ENDIAN def validate(self): if self.stream.readBytes(0, 3) != b"FLV": return "Wrong file signature" if self["header/data_offset"].value != 9: return "Unknown data offset in main header" return True def createFields(self): yield Header(self, "header") yield UInt32(self, "prev_size[]", "Size of previous chunk") while not self.eof: yield Chunk(self, "chunk[]") yield UInt32(self, "prev_size[]", "Size of previous chunk") def createDescription(self): return "Macromedia Flash video version %s" % self["header/version"].value