import itertools import struct import warnings from contextlib import suppress from io import SEEK_CUR, SEEK_END, BytesIO # Kaitai Struct runtime version, in the format defined by PEP 440. # Used by our setup.cfg to set the version number in # packaging/distribution metadata. # Also used in Python code generated by older ksc versions (0.7 through 0.9) # to check that the imported runtime is compatible with the generated code. # Since ksc 0.10, the compatibility check instead uses the API_VERSION constant, # so that the version string does not need to be parsed at runtime # (see https://github.com/kaitai-io/kaitai_struct/issues/804). __version__ = "0.11" # Kaitai Struct runtime API version, as a tuple of ints. # Used in generated Python code (since ksc 0.10) to check that the imported # runtime is compatible with the generated code. API_VERSION = (0, 11) class KaitaiStruct: def __init__(self, io): self._io = io def __enter__(self): return self def __exit__(self, *args, **kwargs): self.close() def close(self): self._io.close() @classmethod def from_file(cls, filename): # NOTE: we cannot use the `with` statement because KaitaiStream needs a # file descriptor that remains open and takes care of closing it itself. # Therefore, we're suppressing Ruff's rule # [SIM115](https://docs.astral.sh/ruff/rules/open-file-with-context-handler/). # # We also won't use `pathlib` here (so we're suppressing # [PTH123](https://docs.astral.sh/ruff/rules/builtin-open/)), because # `filename` is traditionally expected to be a string. f = open(filename, "rb") # noqa: SIM115, PTH123 try: return cls(KaitaiStream(f)) except Exception: # close file descriptor, then reraise the exception f.close() raise @classmethod def from_bytes(cls, buf): return cls(KaitaiStream(BytesIO(buf))) @classmethod def from_io(cls, io): return cls(KaitaiStream(io)) class ReadWriteKaitaiStruct(KaitaiStruct): def __init__(self, io): super().__init__(io) self._dirty = True def _fetch_instances(self): raise NotImplementedError def _write(self, io=None): self._write__seq(io) self._fetch_instances() self._io.write_back_child_streams() def _write__seq(self, io): if io is not None: self._io = io if self._dirty: raise ConsistencyNotCheckedError def __setattr__(self, key, value): super_setattr = super().__setattr__ if ( not key.startswith("_") or key in {"_parent", "_root"} or key.startswith("_unnamed") ): # NOTE: `__setattr__()` parameters are positional-only, which the FBT003 # rule doesn't know, see https://github.com/astral-sh/ruff/issues/3247 super_setattr("_dirty", True) # noqa: FBT003 super_setattr(key, value) class KaitaiStream: def __init__(self, io): self._io = io self.bits_left = 0 self.bits = 0 self.bits_le = False self.bits_write_mode = False self.write_back_handler = None self.child_streams = [] # The size() method calls tell() and seek(), which will fail when called # on a non-seekable stream. Non-seekable streams are supported for # reading, so this is not fatal and we need to suppress these errors. # Writing to a non-seekable stream (i.e. without the `_size` attribute) # is currently not supported and will fail, but at this point we don't # know whether any writing will be attempted on this stream. # # Although I haven't actually seen a bare ValueError raised in this case # in practice, chances are some implementation may be doing it (see # for reference: # "Also, implementations may raise a ValueError (or # UnsupportedOperation) when operations they do not support are # called."). And I've seen ValueError raised at least in Python 2 when # calling read() on an unreadable stream. with suppress(OSError, ValueError): self._size = self.size() def __enter__(self): return self def __exit__(self, *args, **kwargs): self.close() def close(self): try: if self.bits_write_mode: self.write_align_to_byte() else: self.align_to_byte() finally: self._io.close() # region Stream positioning def is_eof(self): if not self.bits_write_mode and self.bits_left > 0: return False # NB: previously, we first tried if self._io.read(1) did in fact read 1 # byte from the stream (and then seeked 1 byte back if so), but given # that is_eof() may be called from both read and write contexts, it's # more universal not to use read() at all. See also # . return self._io.tell() >= self.size() def seek(self, n): if n < 0: msg = f"cannot seek to invalid position {n}" raise InvalidArgumentError(msg) if self.bits_write_mode: self.write_align_to_byte() else: self.align_to_byte() self._io.seek(n) def pos(self): return self._io.tell() + ( 1 if self.bits_write_mode and self.bits_left > 0 else 0 ) def size(self): # Python has no internal File object API function to get # current file / StringIO size, thus we use the following # trick. io = self._io # Remember our current position cur_pos = io.tell() # Seek to the end of the stream and remember the full length full_size = io.seek(0, SEEK_END) # Seek back to the current position io.seek(cur_pos) return full_size # endregion # region Structs for numeric types packer_s1 = struct.Struct("b") packer_s2be = struct.Struct(">h") packer_s4be = struct.Struct(">i") packer_s8be = struct.Struct(">q") packer_s2le = struct.Struct("H") packer_u4be = struct.Struct(">I") packer_u8be = struct.Struct(">Q") packer_u2le = struct.Struct("f") packer_f8be = struct.Struct(">d") packer_f4le = struct.Struct(" 0: # 1 bit => 1 byte # 8 bits => 1 byte # 9 bits => 2 bytes bytes_needed = ((bits_needed - 1) // 8) + 1 # `ceil(bits_needed / 8)` buf = self._read_bytes_not_aligned(bytes_needed) res = int.from_bytes(buf, "big") new_bits = res res = res >> self.bits_left | self.bits << bits_needed self.bits = new_bits # will be masked at the end of the function else: res = self.bits >> -bits_needed # shift unneeded bits out mask = (1 << self.bits_left) - 1 # `bits_left` is in range 0..7 self.bits &= mask return res def read_bits_int(self, n): """Read an unsigned `n`-bit integer in big-endian parsing direction. Deprecated and no longer used as of KSC 0.9. It is only available for backwards compatibility and will be removed in the future. KSC 0.9 and later uses `read_bits_int_be()` instead. """ warnings.warn( "read_bits_int() is deprecated since 0.9, use read_bits_int_be() instead", DeprecationWarning, stacklevel=2, ) return self.read_bits_int_be(n) def read_bits_int_le(self, n): """Read an unsigned `n`-bit integer in little-endian parsing direction.""" self.bits_write_mode = False res = 0 bits_needed = n - self.bits_left if bits_needed > 0: # 1 bit => 1 byte # 8 bits => 1 byte # 9 bits => 2 bytes bytes_needed = ((bits_needed - 1) // 8) + 1 # `ceil(bits_needed / 8)` buf = self._read_bytes_not_aligned(bytes_needed) res = int.from_bytes(buf, "little") new_bits = res >> bits_needed res = res << self.bits_left | self.bits self.bits = new_bits else: res = self.bits self.bits >>= n self.bits_left = -bits_needed % 8 mask = ( 1 << n ) - 1 # no problem with this in Python (arbitrary precision integers) res &= mask return res # endregion # region Byte arrays def read_bytes(self, n): self.align_to_byte() return self._read_bytes_not_aligned(n) def _read_bytes_not_aligned(self, n): if n < 0: msg = f"requested invalid {n} amount of bytes" raise InvalidArgumentError(msg) is_satisfiable = True # When a large number of bytes is requested, try to check first # that there is indeed enough data left in the stream. # This avoids reading large amounts of data only to notice afterwards # that it's not long enough. For smaller amounts of data, it's faster to # first read the data unconditionally and check the length afterwards. if ( n >= 8 * 1024 * 1024 # = 8 MiB and self._io.seekable() ): num_bytes_available = self.size() - self.pos() is_satisfiable = n <= num_bytes_available if is_satisfiable: r = self._io.read(n) num_bytes_available = len(r) is_satisfiable = n <= num_bytes_available if not is_satisfiable: # noinspection PyUnboundLocalVariable msg = f"requested {n} bytes, but only {num_bytes_available} bytes available" raise EndOfStreamError(msg, n, num_bytes_available) # noinspection PyUnboundLocalVariable return r def read_bytes_full(self): self.align_to_byte() return self._io.read() def read_bytes_term(self, term, include_term, consume_term, eos_error): self.align_to_byte() term_byte = KaitaiStream.byte_from_int(term) r = bytearray() while True: c = self._io.read(1) if not c: if eos_error: raise NoTerminatorFoundError(term_byte, 0) return bytes(r) if c == term_byte: if include_term: r += c if not consume_term: self._io.seek(-1, SEEK_CUR) return bytes(r) r += c def read_bytes_term_multi(self, term, include_term, consume_term, eos_error): self.align_to_byte() unit_size = len(term) r = bytearray() while True: c = self._io.read(unit_size) if len(c) < unit_size: if eos_error: raise NoTerminatorFoundError(term, len(c)) r += c return bytes(r) if c == term: if include_term: r += c if not consume_term: self._io.seek(-unit_size, SEEK_CUR) return bytes(r) r += c def ensure_fixed_contents(self, expected): """Check that the `expected` bytes follow in the stream. Deprecated and no longer used as of KSC 0.9. It is only available for backwards compatibility and will be removed in the future. KSC 0.9 and later explicitly raises `ValidationNotEqualError` from an `if` statement instead. """ warnings.warn( "ensure_fixed_contents() is deprecated since 0.9, explicitly raise " "ValidationNotEqualError from an `if` statement instead", DeprecationWarning, stacklevel=2, ) actual = self._io.read(len(expected)) if actual != expected: msg = ( f"unexpected fixed contents: got {actual!r}, " f"was waiting for {expected!r}" ) # NOTE: this method has always raised `Exception` directly and is now # unused and slated for removal, so there's no point in "fixing" it. raise Exception(msg) # noqa: TRY002 return actual @staticmethod def bytes_strip_right(data, pad_byte): return data.rstrip(KaitaiStream.byte_from_int(pad_byte)) @staticmethod def bytes_terminate(data, term, include_term): term_index = KaitaiStream.byte_array_index_of(data, term) if term_index == -1: return data[:] return data[: term_index + (1 if include_term else 0)] @staticmethod def bytes_terminate_multi(data, term, include_term): unit_size = len(term) search_index = data.find(term) while True: if search_index == -1: return data[:] mod = search_index % unit_size if mod == 0: return data[: search_index + (unit_size if include_term else 0)] search_index = data.find(term, search_index + (unit_size - mod)) # endregion # endregion # region Writing def _ensure_bytes_left_to_write(self, n, pos): try: full_size = self._size except AttributeError: msg = "writing to non-seekable streams is not supported" raise ValueError(msg) from None num_bytes_left = full_size - pos if n > num_bytes_left: msg = ( f"requested to write {n} bytes, but only " f"{num_bytes_left} bytes left in the stream" ) raise EndOfStreamError(msg, n, num_bytes_left) # region Integer numbers # region Signed def write_s1(self, v): self.write_bytes(KaitaiStream.packer_s1.pack(v)) # region Big-endian def write_s2be(self, v): self.write_bytes(KaitaiStream.packer_s2be.pack(v)) def write_s4be(self, v): self.write_bytes(KaitaiStream.packer_s4be.pack(v)) def write_s8be(self, v): self.write_bytes(KaitaiStream.packer_s8be.pack(v)) # endregion # region Little-endian def write_s2le(self, v): self.write_bytes(KaitaiStream.packer_s2le.pack(v)) def write_s4le(self, v): self.write_bytes(KaitaiStream.packer_s4le.pack(v)) def write_s8le(self, v): self.write_bytes(KaitaiStream.packer_s8le.pack(v)) # endregion # endregion # region Unsigned def write_u1(self, v): self.write_bytes(KaitaiStream.packer_u1.pack(v)) # region Big-endian def write_u2be(self, v): self.write_bytes(KaitaiStream.packer_u2be.pack(v)) def write_u4be(self, v): self.write_bytes(KaitaiStream.packer_u4be.pack(v)) def write_u8be(self, v): self.write_bytes(KaitaiStream.packer_u8be.pack(v)) # endregion # region Little-endian def write_u2le(self, v): self.write_bytes(KaitaiStream.packer_u2le.pack(v)) def write_u4le(self, v): self.write_bytes(KaitaiStream.packer_u4le.pack(v)) def write_u8le(self, v): self.write_bytes(KaitaiStream.packer_u8le.pack(v)) # endregion # endregion # endregion # region Floating point numbers # region Big-endian def write_f4be(self, v): self.write_bytes(KaitaiStream.packer_f4be.pack(v)) def write_f8be(self, v): self.write_bytes(KaitaiStream.packer_f8be.pack(v)) # endregion # region Little-endian def write_f4le(self, v): self.write_bytes(KaitaiStream.packer_f4le.pack(v)) def write_f8le(self, v): self.write_bytes(KaitaiStream.packer_f8le.pack(v)) # endregion # endregion # region Unaligned bit values def write_align_to_byte(self): if self.bits_left > 0: b = self.bits if not self.bits_le: b <<= 8 - self.bits_left # We clear the `bits_left` and `bits` fields using align_to_byte() # before writing the byte in the stream so that it happens even in # case the write fails. The reason is that if the write fails, it # would likely be a permanent issue that's not going to resolve # itself when retrying the operation with the same stream state, and # since seek() calls write_align_to_byte() at the beginning too, you # wouldn't be even able to seek anywhere without getting the same # exception again. So the stream could be in a broken state, # throwing the same exception over and over again even though you've # already processed it and you'd like to move on. And the only way # to get rid of it would be to call align_to_byte() externally # (given how it's currently implemented), but that's really just a # coincidence - that's a method intended for reading (not writing) # and it should never be necessary to call it from the outside (it's # more like an internal method now). # # So it seems more reasonable to deliver the exception once and let # the user application process it, but otherwise clear the bit # buffer to make the stream ready for further operations and to # avoid repeatedly delivering an exception for one past failed # operation. The rationale behind this is that it's not really a # failure of the "align to byte" operation, but the writing of some # bits to the stream that was requested earlier. self.align_to_byte() self._write_bytes_not_aligned(KaitaiStream.byte_from_int(b)) def write_bits_int_be(self, n, val): self.bits_le = False self.bits_write_mode = True mask = ( 1 << n ) - 1 # no problem with this in Python (arbitrary precision integers) val &= mask bits_to_write = self.bits_left + n bytes_needed = ((bits_to_write - 1) // 8) + 1 # `ceil(bits_to_write / 8)` # Unlike self._io.tell(), pos() respects the `bits_left` field (it # returns the stream position as if it were already aligned on a byte # boundary), which ensures that we report the same numbers of bytes here # as read_bits_int_*() methods would. self._ensure_bytes_left_to_write( bytes_needed - (1 if self.bits_left > 0 else 0), self.pos() ) bytes_to_write = bits_to_write // 8 self.bits_left = bits_to_write % 8 if bytes_to_write > 0: buf = bytearray(bytes_to_write) mask = (1 << self.bits_left) - 1 # `bits_left` is in range 0..7 new_bits = val & mask val = val >> self.bits_left | self.bits << (n - self.bits_left) self.bits = new_bits for i in range(bytes_to_write - 1, -1, -1): buf[i] = val & 0xFF val >>= 8 self._write_bytes_not_aligned(buf) else: self.bits = self.bits << n | val def write_bits_int_le(self, n, val): self.bits_le = True self.bits_write_mode = True bits_to_write = self.bits_left + n bytes_needed = ((bits_to_write - 1) // 8) + 1 # `ceil(bits_to_write / 8)` # Unlike self._io.tell(), pos() respects the `bits_left` field (it # returns the stream position as if it were already aligned on a byte # boundary), which ensures that we report the same numbers of bytes here # as read_bits_int_*() methods would. self._ensure_bytes_left_to_write( bytes_needed - (1 if self.bits_left > 0 else 0), self.pos() ) bytes_to_write = bits_to_write // 8 old_bits_left = self.bits_left self.bits_left = bits_to_write % 8 if bytes_to_write > 0: buf = bytearray(bytes_to_write) new_bits = val >> ( n - self.bits_left ) # no problem with this in Python (arbitrary precision integers) val = val << old_bits_left | self.bits self.bits = new_bits for i in range(bytes_to_write): buf[i] = val & 0xFF val >>= 8 self._write_bytes_not_aligned(buf) else: self.bits |= val << old_bits_left mask = (1 << self.bits_left) - 1 # `bits_left` is in range 0..7 self.bits &= mask # endregion # region Byte arrays def write_bytes(self, buf): self.write_align_to_byte() self._write_bytes_not_aligned(buf) def _write_bytes_not_aligned(self, buf): n = len(buf) self._ensure_bytes_left_to_write(n, self._io.tell()) self._io.write(buf) def write_bytes_limit(self, buf, size, term, pad_byte): n = len(buf) # Strictly speaking, this check is redundant because it is already # done in the corresponding _check() method in the generated code, but # it seems to make sense to include it here anyway so that this method # itself does something reasonable for every set of arguments. # # However, it should never happen when operated correctly (and in # this case, assigning inconsistent values to fields of a KS-generated # object is considered correct operation if the user application calls # the corresponding _check(), which we know would raise an error and # thus the code should not reach _write() and this method at all). So # it's by design that this throws ValueError, not any more specific # error, because it's not intended to be caught in user applications, # but avoided by calling all _check() methods correctly. if n > size: msg = f"writing {size} bytes, but {n} bytes were given" raise ValueError(msg) self.write_bytes(buf) if n < size: self.write_u1(term) self.write_bytes(KaitaiStream.byte_from_int(pad_byte) * (size - n - 1)) # endregion # endregion # region Byte array processing @staticmethod def process_xor_one(data, key): return bytes(v ^ key for v in data) @staticmethod def process_xor_many(data, key): return bytes(a ^ b for a, b in zip(data, itertools.cycle(key))) @staticmethod def process_rotate_left(data, amount, group_size): if group_size != 1: msg = f"unable to rotate group of {group_size} bytes yet" raise NotImplementedError(msg) anti_amount = -amount % (group_size * 8) r = bytearray(data) for i, byte in enumerate(r): r[i] = (byte << amount) & 0xFF | (byte >> anti_amount) return bytes(r) # endregion # region Misc runtime operations @staticmethod def int_from_byte(v): """Convert a byte array item to an integer. Deprecated and no longer used as of KSC 0.12. It is only available for backwards compatibility and will be removed in the future. This method only made sense to ensure compatibility with Python 2. In Python 3, it's simply an identity function. """ warnings.warn( "int_from_byte() is deprecated since 0.12 " "(in Python 3, it is an identity function)", DeprecationWarning, stacklevel=2, ) return v @staticmethod def byte_from_int(i): return KaitaiStream.packer_u1.pack(i) @staticmethod def byte_array_index(data, i): """Return the byte at index `i` in the byte array `data` as an integer. Deprecated and no longer used as of KSC 0.12. It is only available for backwards compatibility and will be removed in the future. This method only made sense to ensure compatibility with Python 2. In Python 3, `data[i]` should be used instead. """ warnings.warn( "byte_array_index(data, i) is deprecated since 0.12, use data[i] instead", DeprecationWarning, stacklevel=2, ) return data[i] @staticmethod def byte_array_min(b): """Return the minimum byte in the byte array `data` as an integer. Deprecated and no longer used as of KSC 0.12. It is only available for backwards compatibility and will be removed in the future. This method only made sense to ensure compatibility with Python 2. In Python 3, `min(b)` should be used instead. """ warnings.warn( "byte_array_min(b) is deprecated since 0.12, use min(b) instead", DeprecationWarning, stacklevel=2, ) return min(b) @staticmethod def byte_array_max(b): """Return the maximum byte in the byte array `data` as an integer. Deprecated and no longer used as of KSC 0.12. It is only available for backwards compatibility and will be removed in the future. This method only made sense to ensure compatibility with Python 2. In Python 3, `max(b)` should be used instead. """ warnings.warn( "byte_array_max(b) is deprecated since 0.12, use max(b) instead", DeprecationWarning, stacklevel=2, ) return max(b) @staticmethod def byte_array_index_of(data, b): return data.find(KaitaiStream.byte_from_int(b)) @staticmethod def resolve_enum(enum_obj, value): """Convert an integer to the given enum if possible, otherwise return it back. Enums in Python raise a `ValueError` exception when they encounter an unknown value. This method works around this issue so that attempting to convert an unknown integer value to an enum does not stop parsing by default. If it is desired to stop parsing on unknown enum values, `valid/in-enum: true` can be used in the .ksy specification on a specific `seq` or `instances` enum field. This adds validation that the field contains one of the known (defined) enum values, otherwise a `ValidationNotInEnumError` exception is raised. """ try: return enum_obj(value) except ValueError: return value # endregion def to_byte_array(self): pos = self.pos() self.seek(0) r = self.read_bytes_full() self.seek(pos) return r class WriteBackHandler: def __init__(self, pos, handler): self.pos = pos self.handler = handler def write_back(self, parent): parent.seek(self.pos) self.handler(parent) def add_child_stream(self, child): self.child_streams.append(child) def write_back_child_streams(self, parent=None): _pos = self.pos() for child in self.child_streams: child.write_back_child_streams(self) self.child_streams.clear() self.seek(_pos) if parent is not None: self._write_back(parent) def _write_back(self, parent): self.write_back_handler.write_back(parent) class KaitaiStructError(Exception): """Common ancestor for all errors originating from correct Kaitai Struct usage. Use this exception class in the `except` clause if you want to handle all parse errors and serialization errors. "Correct usage" refers to errors that indicate a problem with user input, not errors indicating incorrect usage, which are not meant to be caught but fixed in the application code. If available, the `src_path` attribute will contain the KSY source path pointing to the element where the error occurred. If it is not available, `src_path` will be `None`. """ def __init__(self, msg, src_path): super().__init__(("" if src_path is None else src_path + ": ") + msg) self.src_path = src_path class InvalidArgumentError(KaitaiStructError, ValueError): """Raised when an invalid argument is encountered during parsing or serialization. This is a subclass of `ValueError`, which is the standard built-in exception class for invalid arguments in Python. It is also a subclass of `KaitaiStructError`, as it indicates invalid user input and therefore represents a parsing or serialization error. """ def __init__(self, msg): super().__init__(msg, None) class EndOfStreamError(KaitaiStructError, EOFError): """Raised when attempting to read or write beyond end of stream. Provides the `bytes_needed` (number of bytes requested to read or write) and `bytes_available` (number of bytes remaining in the stream) attributes. """ def __init__(self, msg, bytes_needed, bytes_available): super().__init__(msg, None) self.bytes_needed = bytes_needed self.bytes_available = bytes_available class NoTerminatorFoundError(EndOfStreamError): """Raised when end of stream is reached before the required terminator is found. This is a subclass of `EndOfStreamError` because it is a special type of reaching the end of the stream prematurely - see https://github.com/kaitai-io/kaitai_struct_python_runtime/issues/41 for an explanation. If you want to tolerate a missing terminator, you can specify `eos-error: false` in the .ksy specification. Reaching the end of the stream will then be considered a valid end of the field, and this error will no longer be raised. The `term` attribute contains a `bytes` object with the searched terminator. """ def __init__(self, term, bytes_available): super().__init__( f"end of stream reached, but no terminator {term!r} found", len(term), bytes_available, ) self.term = term class UndecidedEndiannessError(KaitaiStructError): """Raised when the calculated default endianness cannot be determined. The default endianness should have been decided with a switch, but no case matches (although the endianness switch suggests that the result should be positive). """ def __init__(self, src_path): super().__init__("unable to decide on endianness for a type", src_path) class ValidationFailedError(KaitaiStructError): """Common ancestor for all validation failures. The `io` attribute stores the `KaitaiStream` object representing the I/O stream associated with the error. Validation exceptions raised from `_check()` methods do not have an I/O stream available, so their `io` will be `None`. """ def __init__(self, msg, io, src_path): super().__init__( ("" if io is None else f"at pos {io.pos()}: ") + "validation failed: " + msg, src_path, ) self.io = io class ValidationNotEqualError(ValidationFailedError): """Raised when validation specified using the `contents` or `valid/eq` keys fails. This exception is the result of a validation failure: `actual` should have been equal to `expected`, but it was not. The `expected` attribute contains the expected value specified in the `contents` or `valid/eq` key in .ksy specifications. Note that the syntax `valid/eq: ` has a shorthand form `valid: `. The `actual` attribute contains the actual value parsed from the stream. """ def __init__(self, expected, actual, io, src_path): super().__init__( f"not equal, expected {expected!r}, but got {actual!r}", io, src_path ) self.expected = expected self.actual = actual class ValidationLessThanError(ValidationFailedError): """Raised when validation specified using the `valid/min` key fails. This exception is the result of a validation failure: `actual` should have been greater than or equal to `min`, but it was not. The `min` attribute contains the minimum value specified in the `valid/min` key in .ksy specifications. The `actual` attribute contains the actual value parsed from the stream. """ def __init__(self, min_bound, actual, io, src_path): super().__init__( f"not in range, min {min_bound!r}, but got {actual!r}", io, src_path ) self.min = min_bound self.actual = actual class ValidationGreaterThanError(ValidationFailedError): """Raised when validation specified using the `valid/max` key fails. This exception is the result of a validation failure: `actual` should have been less than or equal to `max`, but it was not. The `max` attribute contains the maximum value specified in the `valid/max` key in .ksy specifications. The `actual` attribute contains the actual value parsed from the stream. """ def __init__(self, max_bound, actual, io, src_path): super().__init__( f"not in range, max {max_bound!r}, but got {actual!r}", io, src_path ) self.max = max_bound self.actual = actual class ValidationNotAnyOfError(ValidationFailedError): """Raised when validation specified using the `valid/any-of` key fails. This exception is the result of a validation failure: `actual` should have been any of the values listed in the `valid/any-of` key, but it was not. The `actual` attribute contains the actual value parsed from the stream. """ def __init__(self, actual, io, src_path): super().__init__(f"not any of the list, got {actual!r}", io, src_path) self.actual = actual class ValidationNotInEnumError(ValidationFailedError): """Raised when validation specified using the `valid/in-enum` key fails. This exception is the result of a validation failure: `actual` should have been a known value defined in the enum, but it was not. The `actual` attribute contains the actual value parsed from the stream. """ def __init__(self, actual, io, src_path): super().__init__(f"not in the enum, got {actual!r}", io, src_path) self.actual = actual class ValidationExprError(ValidationFailedError): """Raised when validation specified using the `valid/expr` key fails. This exception is the result of a validation failure: `actual` should have satisfied the validation expression in the `valid/expr` key, but it did not. The `actual` attribute contains the actual value parsed from the stream. """ def __init__(self, actual, io, src_path): super().__init__(f"not matching the expression, got {actual!r}", io, src_path) self.actual = actual class ConsistencyError(Exception): def __init__(self, attr_id, expected, actual): super().__init__( f"Check failed: {attr_id}, expected: {expected!r}, actual: {actual!r}" ) self.id = attr_id self.expected = expected self.actual = actual class ConsistencyNotCheckedError(Exception): """Raised when attempting to write an object with unchecked consistency.""" def __init__(self): super().__init__( "consistency not checked: _check() has not been called " "since the last modification of the object" )