#!/usr/bin/env python # # SPDX-FileCopyrightText: 2024 Espressif Systems (Shanghai) CO LTD # SPDX-License-Identifier: Apache-2.0 from __future__ import division, print_function import argparse import re import os import sys from typing import Any, Dict, List, Optional from zlib import crc32 from shutil import copyfile, rmtree from struct import Struct import array import binascii import csv import distutils.dir_util import struct import zlib from builtins import bytes, int, range from io import open __version__ = 'v1.0-dev' # To locate the starting position of the parameter partition parameter_pattern = re.compile(b'\xFC\xFC[\x03|\x02|\x01]') # To locate the starting position of the mfg_nvs.bin partition ## 0xAA 0x50 is the default magic code of partition bin (generated by gen_esp32part.py) ## 0x01 0x02 is the type and subtype of mfg_nvs.bin partition (type: data, subtype: nvs) ## first .{4} is the offset of mfg_nvs.bin partition, sencod .{4} is the size of mfg_nvs.bin partition mfg_nvs_pattern = re.compile(b'\xAA\x50\x01\x02.{4}.{4}mfg_nvs') sec_size = 4096 min_firmware_size = (1024 * 1024) para_partition_size = (4 * 1024) # manufacturing nvs partition mfg_directory = 'mfg_nvs' mfg_csv_filename = 'mfg_nvs.csv' mfg_bin_filename = 'mfg_nvs.bin' def ESP_LOGI(x): print('\033[32m{}\033[0m'.format(x)) def ESP_LOGE(x): print('\033[31m{}\033[0m'.format(x)) def arg_auto_int(x): r = int(x, 0) return r if r >= 0 else -1 def at_read_records(format, f): record_struct = Struct(format) chunks = f.read(record_struct.size) return (record_struct.unpack(chunks)) def at_write_records(records, format, f): record_struct = Struct(format) x = record_struct.pack(*records) f.write(x) def at_parameter_assign_int(arg, fixed_len, l, lidx): if arg != None: l[lidx] = arg & (256 ** fixed_len - 1) if arg >= 0 else -1 def at_parameter_assign_str(arg, fixed_len, l, lidx): if arg != None: larg = list(arg) larg = larg + ['\x00'] * (fixed_len - len(larg)) arg_tmp = [x.encode() for x in larg] l[lidx : (lidx+fixed_len)] = arg_tmp[0 : fixed_len] def modify_bin(esp, args): print(args) if not os.path.exists(args.input): ESP_LOGE('File does not exist: {}'.format(args.input)) sys.exit(2) fsize = os.path.getsize(args.input) if (fsize != para_partition_size) and ((fsize % min_firmware_size) or (fsize / min_firmware_size > 16)): ESP_LOGE('Invalid file size: {}'.format(fsize)) sys.exit(2) copyfile(args.input, args.output) with open(args.input, 'rb') as fp: data = fp.read() if re.search(mfg_nvs_pattern, data): return modify_param_bin_in_nvs(esp, args) else: return modify_param_bin_in_partition(esp, args) # ----------------------------------------------------------------------------------------- # """ The following part is used to generate mfg_nvs.csv The lightweight version of https://github.com/espressif/esp-idf/tree/master/components/nvs_flash/nvs_partition_tool """ class NVS_Constants: class ConstantError(AttributeError): pass def __init__(self) -> None: self.page_size = 4096 self.entry_size = 32 self.item_type = { 0x01: 'u8', 0x11: 'i8', 0x02: 'u16', 0x12: 'i16', 0x04: 'u32', 0x14: 'i32', 0x08: 'u64', 0x18: 'i64', 0x21: 'string', 0x41: 'blob', 0x42: 'blob_data', 0x48: 'blob_index', } self.page_status = { 0xFFFFFFFF: 'Empty', 0xFFFFFFFE: 'Active', 0xFFFFFFFC: 'Full', 0xFFFFFFF8: 'Erasing', 0x00000000: 'Corrupted', } self.entry_status = { 0b11: 'Empty', 0b10: 'Written', 0b00: 'Erased', } def __setattr__(self, key: str, val: Any) -> None: if self.__dict__.get(key, None) is None: self.__dict__[key] = val else: raise NVS_Constants.ConstantError('Cannot change a constant!') nvs_const = NVS_Constants() class NotAlignedError(ValueError): pass class NVS_Partition: def __init__(self, raw_data: bytearray): if len(raw_data) % nvs_const.page_size != 0: raise NotAlignedError( f'Given partition data is not aligned to page size ({len(raw_data)} % {nvs_const.page_size} = {len(raw_data)%nvs_const.page_size})' ) # Divide partition into pages self.pages = [] for i in range(0, len(raw_data), nvs_const.page_size): self.pages.append(NVS_Page(raw_data[i: i + nvs_const.page_size], i)) class NVS_Page: def __init__(self, page_data: bytearray, address: int): if len(page_data) != nvs_const.page_size: raise NotAlignedError( f'Size of given page does not match page size ({len(page_data)} != {nvs_const.page_size})' ) # Initialize class self.is_empty = ( page_data[0: nvs_const.entry_size] == bytearray({0xFF}) * nvs_const.entry_size ) self.start_address = address self.raw_header = page_data[0: nvs_const.entry_size] self.raw_entry_state_bitmap = page_data[ nvs_const.entry_size: 2 * nvs_const.entry_size ] self.entries = [] # Load header self.header: Dict[str, Any] = { 'status': nvs_const.page_status.get( int.from_bytes(page_data[0:4], byteorder='little'), 'Invalid' ), 'page_index': int.from_bytes(page_data[4:8], byteorder='little'), 'version': 256 - page_data[8], 'crc': { 'original': int.from_bytes(page_data[28:32], byteorder='little'), 'computed': crc32(page_data[4:28], 0xFFFFFFFF), }, } # Load entry state bitmap entry_states = [] for c in self.raw_entry_state_bitmap: for index in range(0, 8, 2): entry_states.append( nvs_const.entry_status.get((c >> index) & 3, 'Invalid') ) entry_states = entry_states[:-2] # Load entries i = 2 while i < int( nvs_const.page_size / nvs_const.entry_size ): # Loop through every entry span = page_data[(i * nvs_const.entry_size) + 2] if span in [0xFF, 0]: # 'Default' span length to prevent span overflow span = 1 # Load an entry entry = NVS_Entry( i - 2, page_data[i * nvs_const.entry_size: (i + 1) * nvs_const.entry_size], entry_states[i - 2], ) self.entries.append(entry) # Load all children entries if span != 1: for span_idx in range(1, span): page_addr = i + span_idx entry_idx = page_addr - 2 if page_addr * nvs_const.entry_size >= nvs_const.page_size: break child_entry = NVS_Entry( entry_idx, page_data[ page_addr * nvs_const.entry_size: (page_addr + 1) * nvs_const.entry_size ], entry_states[entry_idx], ) entry.child_assign(child_entry) i += span class NVS_Entry: def __init__(self, index: int, entry_data: bytearray, entry_state: str): if len(entry_data) != nvs_const.entry_size: raise NotAlignedError( f'Given entry is not aligned to entry size ({len(entry_data)} % {nvs_const.entry_size} = {len(entry_data)%nvs_const.entry_size})' ) def item_convert(i_type: int, data: bytearray) -> Dict: byte_size_mask = 0x0F number_sign_mask = 0xF0 fixed_entry_length_threshold = ( 0x20 # Fixed length entry type number is always smaller than this ) if i_type in nvs_const.item_type: # Deal with non variable length entries if i_type < fixed_entry_length_threshold: size = i_type & byte_size_mask num = int.from_bytes( data[:size], byteorder='little', signed=bool(i_type & number_sign_mask), ) return {'value': num} # Deal with variable length entries if nvs_const.item_type[i_type] in ['string', 'blob_data', 'blob']: size = int.from_bytes(data[:2], byteorder='little') crc = int.from_bytes(data[4:8], byteorder='little') return {'value': [size, crc], 'size': size, 'crc': crc} if nvs_const.item_type[i_type] == 'blob_index': size = int.from_bytes(data[:4], byteorder='little') chunk_count = data[4] chunk_start = data[5] return { 'value': [size, chunk_count, chunk_start], 'size': size, 'chunk_count': chunk_count, 'chunk_start': chunk_start, } return {'value': None} def key_decode(data: bytearray) -> Optional[str]: decoded = '' for n in data.rstrip(b'\x00'): char = chr(n) if char.isascii(): decoded += char else: return None return decoded self.raw = entry_data self.state = entry_state self.is_empty = self.raw == bytearray({0xFF}) * nvs_const.entry_size self.index = index namespace = self.raw[0] entry_type = self.raw[1] span = self.raw[2] chunk_index = self.raw[3] crc = self.raw[4:8] key = self.raw[8:24] data = self.raw[24:32] raw_without_crc = self.raw[:4] + self.raw[8:32] self.metadata: Dict[str, Any] = { 'namespace': namespace, 'type': nvs_const.item_type.get(entry_type, f'0x{entry_type:02x}'), 'span': span, 'chunk_index': chunk_index, 'crc': { 'original': int.from_bytes(crc, byteorder='little'), 'computed': crc32(raw_without_crc, 0xFFFFFFFF), 'data_original': int.from_bytes(data[-4:], byteorder='little'), 'data_computed': 0, }, } self.children: List['NVS_Entry'] = [] self.key = key_decode(key) if self.key is None: self.data = None else: self.data = item_convert(entry_type, data) def child_assign(self, entry: 'NVS_Entry') -> None: if not isinstance(entry, type(self)): raise ValueError('You can assign only NVS_Entry') self.children.append(entry) def dump_key_value_pairs(nvs_partition: NVS_Partition, fp) -> None: # Get namespace list ns = {} for page in nvs_partition.pages: for entry in page.entries: if entry.state == 'Written' and entry.metadata['namespace'] == 0: ns[entry.data['value']] = entry.key fp.write('key,type,encoding,value\n') last_ns = '' last_key = '' findex = 0 for page in nvs_partition.pages: for entry in page.entries: if ( entry.state == 'Written' and entry.metadata['namespace'] != 0 ): # Ignore non-written entries chunk_index = '' data = '' if entry.metadata['type'] not in [ 'string', 'blob_data', 'blob_index', 'blob', ]: # Non-variable length entry data = entry.data['value'] elif entry.metadata['type'] == 'blob_index': continue else: # Variable length entries tmp = b'' for e in entry.children: # Merge all children entries tmp += bytes(e.raw) tmp = tmp[: entry.data['size']] # Discard padding if entry.metadata['type'] == 'blob_data': if entry.metadata['chunk_index'] >= 128: # Get real chunk index chunk_index = f'[{entry.metadata["chunk_index"] - 128}]' else: chunk_index = f'[{entry.metadata["chunk_index"]}]' # data = str(tmp) data = tmp if entry.metadata['namespace'] not in ns: continue else: # print(ns[entry.metadata['namespace']] + ':'+ entry.key + '(' + entry.metadata['type'] + ')' + f'{chunk_index} = {data}') now_ns = ns[entry.metadata['namespace']] if last_ns != now_ns: last_ns = now_ns fp.write(now_ns + ',namespace,,\n') if chunk_index == '': if entry.metadata['type'] == 'string': fp.write(entry.key + ',data,' + entry.metadata['type'] + ',' + '"' + str(data.decode('utf-8').rstrip('\x00')) + '"' +'\n') else: fp.write(entry.key + ',data,' + entry.metadata['type'] + ',' + str(data) + '\n') else: # blob data now_key = entry.key if last_key != now_key: findex += 1 last_key = now_key dup_key = False else: dup_key = True last_filename = 'v' + str(findex) + '.txt' with open(os.path.join(mfg_directory, last_filename), 'a+') as ftxt: ftxt.write(str(data.decode('utf-8'))) if not dup_key: fp.write(entry.key + ',file,binary,' + os.path.abspath(os.path.join(mfg_directory, last_filename)) + '\n') # ----------------------------------------------------------------------------------------- # # ----------------------------------------------------------------------------------------- # """ The following part is used to generate mfg_nvs.bin The lightweight version of https://github.com/espressif/esp-idf/blob/17451f1fb3d/components/nvs_flash/nvs_partition_generator """ VERSION1_PRINT = 'V1 - Multipage Blob Support Disabled' VERSION2_PRINT = 'V2 - Multipage Blob Support Enabled' class Page(object): # Item type codes U8 = 0x01 I8 = 0x11 U16 = 0x02 I16 = 0x12 U32 = 0x04 I32 = 0x14 U64 = 0x08 I64 = 0x18 SZ = 0x21 BLOB = 0x41 BLOB_DATA = 0x42 BLOB_IDX = 0x48 # Few Page constants HEADER_SIZE = 32 BITMAPARRAY_OFFSET = 32 BITMAPARRAY_SIZE_IN_BYTES = 32 FIRST_ENTRY_OFFSET = 64 SINGLE_ENTRY_SIZE = 32 CHUNK_ANY = 0xFF ACTIVE = 0xFFFFFFFE FULL = 0xFFFFFFFC VERSION1 = 0xFF VERSION2 = 0xFE PAGE_PARAMS = { 'max_size': 4096, 'max_blob_size': {VERSION1: 1984, VERSION2: 4000}, 'max_entries': 126 } def __init__(self, page_num, version, is_rsrv_page=False): self.entry_num = 0 self.bitmap_array = array.array('B') self.version = version self.page_buf = bytearray(b'\xff') * Page.PAGE_PARAMS['max_size'] if not is_rsrv_page: self.bitmap_array = self.create_bitmap_array() self.set_header(page_num, version) def set_header(self, page_num, version): # set page state to active page_header = bytearray(b'\xff') * 32 page_state_active_seq = Page.ACTIVE struct.pack_into('<I', page_header, 0, page_state_active_seq) # set page sequence number struct.pack_into('<I', page_header, 4, page_num) # set version if version == Page.VERSION2: page_header[8] = Page.VERSION2 elif version == Page.VERSION1: page_header[8] = Page.VERSION1 # set header's CRC crc_data = bytes(page_header[4:28]) crc = zlib.crc32(crc_data, 0xFFFFFFFF) struct.pack_into('<I', page_header, 28, crc & 0xFFFFFFFF) self.page_buf[0:len(page_header)] = page_header def create_bitmap_array(self): bitarray = array.array('B') charsize = 32 # bitmaparray has 256 bits, hence 32 bytes fill = 255 # Fill all 8 bits with 1's bitarray.extend((fill,) * charsize) return bitarray def write_bitmaparray(self): bitnum = self.entry_num * 2 byte_idx = bitnum // 8 # Find byte index in the array bit_offset = bitnum & 7 # Find bit offset in given byte index mask = ~(1 << bit_offset) self.bitmap_array[byte_idx] &= mask start_idx = Page.BITMAPARRAY_OFFSET end_idx = Page.BITMAPARRAY_OFFSET + Page.BITMAPARRAY_SIZE_IN_BYTES self.page_buf[start_idx:end_idx] = self.bitmap_array def write_entry_to_buf(self, data, entrycount,nvs_obj): encr_data = bytearray() if nvs_obj.encrypt: encr_data_ret = self.encrypt_data(data, entrycount,nvs_obj) encr_data[0:len(encr_data_ret)] = encr_data_ret data = encr_data data_offset = Page.FIRST_ENTRY_OFFSET + (Page.SINGLE_ENTRY_SIZE * self.entry_num) start_idx = data_offset end_idx = data_offset + len(data) self.page_buf[start_idx:end_idx] = data # Set bitmap array for entries in current page for i in range(0, entrycount): self.write_bitmaparray() self.entry_num += 1 def set_crc_header(self, entry_struct): crc_data = bytearray(b'28') crc_data[0:4] = entry_struct[0:4] crc_data[4:28] = entry_struct[8:32] crc_data = bytes(crc_data) crc = zlib.crc32(crc_data, 0xFFFFFFFF) struct.pack_into('<I', entry_struct, 4, crc & 0xFFFFFFFF) return entry_struct def write_varlen_binary_data(self, entry_struct, ns_index, key, data, data_size, total_entry_count, encoding, nvs_obj): chunk_start = 0 chunk_count = 0 chunk_index = Page.CHUNK_ANY offset = 0 remaining_size = data_size tailroom = None while True: chunk_size = 0 # Get the size available in current page tailroom = (Page.PAGE_PARAMS['max_entries'] - self.entry_num - 1) * Page.SINGLE_ENTRY_SIZE assert tailroom >= 0, 'Page overflow!!' # Split the binary data into two and store a chunk of available size onto curr page if tailroom < remaining_size: chunk_size = tailroom else: chunk_size = remaining_size remaining_size = remaining_size - chunk_size # Change type of data to BLOB_DATA entry_struct[1] = Page.BLOB_DATA # Calculate no. of entries data chunk will require datachunk_rounded_size = (chunk_size + 31) & ~31 datachunk_entry_count = datachunk_rounded_size // 32 datachunk_total_entry_count = datachunk_entry_count + 1 # +1 for the entry header # Set Span entry_struct[2] = datachunk_total_entry_count # Update the chunkIndex chunk_index = chunk_start + chunk_count entry_struct[3] = chunk_index # Set data chunk data_chunk = data[offset:offset + chunk_size] # Compute CRC of data chunk struct.pack_into('<H', entry_struct, 24, chunk_size) if type(data) != bytes: data_chunk = bytes(data_chunk, encoding='utf8') crc = zlib.crc32(data_chunk, 0xFFFFFFFF) struct.pack_into('<I', entry_struct, 28, crc & 0xFFFFFFFF) # compute crc of entry header entry_struct = self.set_crc_header(entry_struct) # write entry header self.write_entry_to_buf(entry_struct, 1,nvs_obj) # write actual data self.write_entry_to_buf(data_chunk, datachunk_entry_count,nvs_obj) chunk_count = chunk_count + 1 if remaining_size or (tailroom - chunk_size) < Page.SINGLE_ENTRY_SIZE: nvs_obj.create_new_page() self = nvs_obj.cur_page offset = offset + chunk_size # All chunks are stored, now store the index if not remaining_size: # Initialise data field to 0xff data_array = bytearray(b'\xff') * 8 entry_struct[24:32] = data_array # change type of data to BLOB_IDX entry_struct[1] = Page.BLOB_IDX # Set Span entry_struct[2] = 1 # Update the chunkIndex chunk_index = Page.CHUNK_ANY entry_struct[3] = chunk_index struct.pack_into('<I', entry_struct, 24, data_size) entry_struct[28] = chunk_count entry_struct[29] = chunk_start # compute crc of entry header entry_struct = self.set_crc_header(entry_struct) # write last entry self.write_entry_to_buf(entry_struct, 1,nvs_obj) break return entry_struct def write_single_page_entry(self, entry_struct, data, datalen, data_entry_count, nvs_obj): # compute CRC of data struct.pack_into('<H', entry_struct, 24, datalen) if type(data) != bytes: data = bytes(data, encoding='utf8') crc = zlib.crc32(data, 0xFFFFFFFF) struct.pack_into('<I', entry_struct, 28, crc & 0xFFFFFFFF) # compute crc of entry header entry_struct = self.set_crc_header(entry_struct) # write entry header self.write_entry_to_buf(entry_struct, 1, nvs_obj) # write actual data self.write_entry_to_buf(data, data_entry_count, nvs_obj) """ Low-level function to write variable length data into page buffer. Data should be formatted according to encoding specified. """ def write_varlen_data(self, key, data, encoding, ns_index,nvs_obj): # Set size of data datalen = len(data) max_blob_size = Page.PAGE_PARAMS['max_blob_size'][self.version] # V2 blob size limit only applies to strings blob_limit_applies = self.version == Page.VERSION1 or encoding == 'string' if blob_limit_applies and datalen > max_blob_size: raise InputError(' Input File: Size (%d) exceeds max allowed length `%s` bytes for key `%s`.' % (datalen, max_blob_size, key)) # Calculate no. of entries data will require rounded_size = (datalen + 31) & ~31 data_entry_count = rounded_size // 32 total_entry_count = data_entry_count + 1 # +1 for the entry header # Check if page is already full and new page is needed to be created right away if self.entry_num >= Page.PAGE_PARAMS['max_entries']: raise PageFullError() elif (self.entry_num + total_entry_count) >= Page.PAGE_PARAMS['max_entries']: if not (self.version == Page.VERSION2 and encoding in ['hex2bin', 'binary', 'base64']): raise PageFullError() # Entry header entry_struct = bytearray(b'\xff') * 32 # Set Namespace Index entry_struct[0] = ns_index # Set Span if self.version == Page.VERSION2: if encoding == 'string': entry_struct[2] = data_entry_count + 1 # Set Chunk Index chunk_index = Page.CHUNK_ANY entry_struct[3] = chunk_index else: entry_struct[2] = data_entry_count + 1 # set key key_array = b'\x00' * 16 entry_struct[8:24] = key_array entry_struct[8:8 + len(key)] = key.encode() # set Type if encoding == 'string': entry_struct[1] = Page.SZ elif encoding in ['hex2bin', 'binary', 'base64']: entry_struct[1] = Page.BLOB if self.version == Page.VERSION2 and (encoding in ['hex2bin', 'binary', 'base64']): entry_struct = self.write_varlen_binary_data(entry_struct,ns_index,key,data, datalen,total_entry_count, encoding, nvs_obj) else: self.write_single_page_entry(entry_struct, data, datalen, data_entry_count, nvs_obj) """ Low-level function to write data of primitive type into page buffer. """ def write_primitive_data(self, key, data, encoding, ns_index,nvs_obj): # Check if entry exceeds max number of entries allowed per page if self.entry_num >= Page.PAGE_PARAMS['max_entries']: raise PageFullError() entry_struct = bytearray(b'\xff') * 32 entry_struct[0] = ns_index # namespace index entry_struct[2] = 0x01 # Span chunk_index = Page.CHUNK_ANY entry_struct[3] = chunk_index # write key key_array = b'\x00' * 16 entry_struct[8:24] = key_array entry_struct[8:8 + len(key)] = key.encode() if encoding == 'u8': entry_struct[1] = Page.U8 struct.pack_into('<B', entry_struct, 24, data) elif encoding == 'i8': entry_struct[1] = Page.I8 struct.pack_into('<b', entry_struct, 24, data) elif encoding == 'u16': entry_struct[1] = Page.U16 struct.pack_into('<H', entry_struct, 24, data) elif encoding == 'i16': entry_struct[1] = Page.I16 struct.pack_into('<h', entry_struct, 24, data) elif encoding == 'u32': entry_struct[1] = Page.U32 struct.pack_into('<I', entry_struct, 24, data) elif encoding == 'i32': entry_struct[1] = Page.I32 struct.pack_into('<i', entry_struct, 24, data) elif encoding == 'u64': entry_struct[1] = Page.U64 struct.pack_into('<Q', entry_struct, 24, data) elif encoding == 'i64': entry_struct[1] = Page.I64 struct.pack_into('<q', entry_struct, 24, data) # Compute CRC crc_data = bytearray(b'28') crc_data[0:4] = entry_struct[0:4] crc_data[4:28] = entry_struct[8:32] crc_data = bytes(crc_data) crc = zlib.crc32(crc_data, 0xFFFFFFFF) struct.pack_into('<I', entry_struct, 4, crc & 0xFFFFFFFF) # write to file self.write_entry_to_buf(entry_struct, 1,nvs_obj) """ Get page buffer data of a given page """ def get_data(self): return self.page_buf """ NVS class encapsulates all NVS specific operations to create a binary with given key-value pairs. Binary can later be flashed onto device via a flashing utility. """ class NVS(object): def __init__(self, fout, input_size, version, encrypt=False, key_input=None): self.size = input_size self.encrypt = encrypt self.encr_key = None self.namespace_idx = 0 self.page_num = -1 self.pages = [] self.version = version self.fout = fout if self.encrypt: self.encr_key = key_input self.cur_page = self.create_new_page(version) def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): if exc_type is None and exc_value is None: # Create pages for remaining available size while True: try: self.create_new_page() except InsufficientSizeError: self.size = None # Creating the last reserved page self.create_new_page(is_rsrv_page=True) break result = self.get_binary_data() self.fout.write(result) def create_new_page(self, version=None, is_rsrv_page=False): # Set previous page state to FULL before creating new page if self.pages: curr_page_state = struct.unpack('<I', self.cur_page.page_buf[0:4])[0] if curr_page_state == Page.ACTIVE: page_state_full_seq = Page.FULL struct.pack_into('<I', self.cur_page.page_buf, 0, page_state_full_seq) # Set version for NVS binary generated version = self.version # Update available size as each page is created if self.size == 0: raise InsufficientSizeError('Error: Size parameter is less than the size of data in csv.Please increase size.') if not is_rsrv_page: self.size = self.size - Page.PAGE_PARAMS['max_size'] self.page_num += 1 # Set version for each page and page header new_page = Page(self.page_num, version, is_rsrv_page) self.pages.append(new_page) self.cur_page = new_page return new_page """ Write namespace entry and subsequently increase namespace count so that all upcoming entries will be mapped to a new namespace. """ def write_namespace(self, key): self.namespace_idx += 1 try: self.cur_page.write_primitive_data(key, self.namespace_idx, 'u8', 0,self) except PageFullError: new_page = self.create_new_page() new_page.write_primitive_data(key, self.namespace_idx, 'u8', 0,self) """ Write key-value pair. Function accepts value in the form of ascii character and converts it into appropriate format before calling Page class's functions to write entry into NVS format. Function handles PageFullError and creates a new page and re-invokes the function on a new page. We don't have to guard re-invocation with try-except since no entry can span multiple pages. """ def write_entry(self, key, value, encoding): # Encoding-specific handling if encoding == 'hex2bin': value = value.strip() if len(value) % 2 != 0: raise InputError('%s: Invalid data length. Should be multiple of 2.' % key) value = binascii.a2b_hex(value) elif encoding == 'base64': value = binascii.a2b_base64(value) elif encoding == 'string': if type(value) == bytes: value = value.decode() value += '\0' encoding = encoding.lower() varlen_encodings = {'string', 'binary', 'hex2bin', 'base64'} primitive_encodings = {'u8', 'i8', 'u16', 'i16', 'u32', 'i32', 'u64', 'i64'} if encoding in varlen_encodings: try: self.cur_page.write_varlen_data(key, value, encoding, self.namespace_idx,self) except PageFullError: new_page = self.create_new_page() new_page.write_varlen_data(key, value, encoding, self.namespace_idx,self) elif encoding in primitive_encodings: try: self.cur_page.write_primitive_data(key, int(value), encoding, self.namespace_idx,self) except PageFullError: new_page = self.create_new_page() new_page.write_primitive_data(key, int(value), encoding, self.namespace_idx,self) else: raise InputError('%s: Unsupported encoding' % encoding) """ Return accumulated data of all pages """ def get_binary_data(self): data = bytearray() for page in self.pages: data += page.get_data() return data class PageFullError(RuntimeError): """ Represents error when current page doesn't have sufficient entries left to accommodate current request """ def __init__(self): super(PageFullError, self).__init__() class InputError(RuntimeError): """ Represents error on the input """ def __init__(self, e): print('\nError:') super(InputError, self).__init__(e) class InsufficientSizeError(RuntimeError): """ Represents error when NVS Partition size given is insufficient to accomodate the data in the given csv file """ def __init__(self, e): super(InsufficientSizeError, self).__init__(e) def nvs_open(result_obj, input_size, version=None, is_encrypt=False, key=None): """ Wrapper to create and NVS class object. This object can later be used to set key-value pairs :param result_obj: File/Stream object to dump resultant binary. If data is to be dumped into memory, one way is to use BytesIO object :param input_size: Size of Partition :return: NVS class instance """ return NVS(result_obj, input_size, version, encrypt=is_encrypt, key_input=key) def write_entry(nvs_instance, key, datatype, encoding, value): """ Wrapper to set key-value pair in NVS format :param nvs_instance: Instance of an NVS class returned by nvs_open() :param key: Key of the data :param datatype: Data type. Valid values are "file", "data" and "namespace" :param encoding: Data encoding. Valid values are "u8", "i8", "u16", "i16", "u32", "i32", "u64", "i64", "string", "binary", "hex2bin" and "base64" :param value: Data value in ascii encoded string format for "data" datatype and filepath for "file" datatype :return: None """ if datatype == 'file': abs_file_path = value if os.path.isabs(value) is False: script_dir = os.getcwd() abs_file_path = os.path.join(script_dir, value) with open(abs_file_path, 'rb') as f: value = f.read() if datatype == 'namespace': nvs_instance.write_namespace(key) else: nvs_instance.write_entry(key, value, encoding) def nvs_close(nvs_instance): """ Wrapper to finish writing to NVS and write data to file/stream object provided to nvs_open method :param nvs_instance: Instance of NVS class returned by nvs_open() :return: None """ nvs_instance.__exit__(None, None, None) def check_size(size): try: input_size = size if input_size % 4096 != 0: sys.exit('Size of partition must be multiple of 4096') # Update size as a page needs to be reserved of size 4KB input_size = input_size - Page.PAGE_PARAMS['max_size'] if input_size < (2 * Page.PAGE_PARAMS['max_size']): sys.exit('Minimum NVS partition size needed is 0x3000 bytes.') return input_size except Exception as e: print(e) sys.exit(0) def set_target_filepath(outdir, filepath): ''' Set target file path: <outdir>/<filepath> :param outdir: Target output dir to store files :param filepath: Path of target file ''' bin_ext = '.bin' # Expand if tilde(~) provided in path outdir = os.path.expanduser(outdir) if filepath: key_file_name, ext = os.path.splitext(filepath) if not ext: filepath = key_file_name + bin_ext elif bin_ext not in ext: sys.exit('Error: `%s`. Only `%s` extension allowed.' % (filepath, bin_ext)) # Create dir if does not exist if not (os.path.isdir(outdir)): distutils.dir_util.mkpath(outdir) filedir, filename = os.path.split(filepath) filedir = os.path.join(outdir,filedir,'') if filedir and not os.path.isdir(filedir): distutils.dir_util.mkpath(filedir) if os.path.isabs(filepath): if not outdir == os.getcwd(): print('\nWarning: `%s` \n\t==> absolute path given so outdir is ignored for this file.' % filepath) # Set to empty as outdir is ignored here outdir = '' # Set full path - outdir + filename filepath = os.path.join(outdir, '') + filepath return outdir, filepath def generate(infile, outfile, size): ''' Generate NVS Partition :param args: Command line arguments given :param is_encr_enabled: Encryption enabled/disabled :param encr_key: Key to encrypt NVS partition ''' is_dir_new = False bin_ext = '.bin' input_size = check_size(size) outdir = os.getcwd() # Check if key file has .bin extension filename, ext = os.path.splitext(outfile) if bin_ext not in ext: sys.exit('Error: `%s`. Only `.bin` extension allowed.' % outfile) outdir, outfile = set_target_filepath(outdir, outfile) with open(infile, 'rt', encoding='utf8') as input_file,\ open(outfile, 'wb') as output_file,\ nvs_open(output_file, input_size, Page.VERSION2, False, None) as nvs_obj: # Comments are skipped reader = csv.DictReader(filter(lambda row: row[0] != '#',input_file), delimiter=',') for row in reader: try: max_key_len = 15 if len(row['key']) > max_key_len: raise InputError('Length of key `%s` should be <= 15 characters.' % row['key']) write_entry(nvs_obj, row['key'], row['type'], row['encoding'], row['value']) except InputError as e: print(e) filedir, filename = os.path.split(outfile) if filename: print('\nWarning: NVS binary not created...') os.remove(outfile) if is_dir_new and not filedir == os.getcwd(): print('\nWarning: Output dir not created...') os.rmdir(filedir) sys.exit(-2) print('Created NVS binary: ===>', outfile) # ----------------------------------------------------------------------------------------- # NVS_KEY_TYPE = { 'D': ',', # digit 'S': ',string,', # string 'B': ',binary,', # binary } def at_update_param(key, type, value, data): if value is None: return data lines = data.split('\n') for i in range(len(lines)): line = lines[i].strip() if line.startswith(key) and 'namespace' not in line: parts = line.split(NVS_KEY_TYPE[type]) if type == 'S': parts[-1] = '"' + str(value) + '"' else: parts[-1] = str(value) lines[i] = NVS_KEY_TYPE[type].join(parts) data = '\n'.join(lines) return data def at_update_mfg_parameters(args, data): # string parameters data = at_update_param('module_name', 'S', args.module_name, data) data = at_update_param('country_code', 'S', args.country_code, data) for i in range(31): # gatts config data = at_update_param('cfg{}'.format(i), 'S', getattr(args, 'gatts_cfg{}'.format(i)), data) # int parameters data = at_update_param('max_tx_power', 'D', args.tx_power, data) data = at_update_param('uart_port', 'D', args.uart_num, data) data = at_update_param('start_channel', 'D', args.start_channel, data) data = at_update_param('channel_num', 'D', args.channel_number, data) data = at_update_param('uart_baudrate', 'D', args.baud, data) data = at_update_param('uart_tx_pin', 'D', args.tx_pin, data) data = at_update_param('uart_rx_pin', 'D', args.rx_pin, data) data = at_update_param('uart_cts_pin', 'D', args.cts_pin, data) data = at_update_param('uart_rts_pin', 'D', args.rts_pin, data) # binary file parameters data = at_update_param('server_ca', 'B', args.server_ca, data) data = at_update_param('server_cert', 'B', args.server_cert, data) data = at_update_param('server_key', 'B', args.server_key, data) data = at_update_param('client_ca.0', 'B', args.client_ca0, data) data = at_update_param('client_ca.1', 'B', args.client_ca1, data) data = at_update_param('client_cert.0', 'B', args.client_cert0, data) data = at_update_param('client_cert.1', 'B', args.client_cert1, data) data = at_update_param('client_key.0', 'B', args.client_key0, data) data = at_update_param('client_key.1', 'B', args.client_key1, data) data = at_update_param('mqtt_ca', 'B', args.mqtt_ca, data) data = at_update_param('mqtt_cert', 'B', args.mqtt_cert, data) data = at_update_param('mqtt_key', 'B', args.mqtt_key, data) data = at_update_param('wpa2_ca', 'B', args.wpa2_ca, data) data = at_update_param('wpa2_cert', 'B', args.wpa2_cert, data) data = at_update_param('wpa2_key', 'B', args.wpa2_key, data) return data def modify_param_bin_in_nvs(esp, args): """ A typic format of esp-at parameter binary is in nvs partition, and these parameters support to configure: <tpower>, <uart_x>, <schan>, <nchan>, <country code>, <uart baud>, <tx_pin>, <tx_pin>, <cts>, <rts>, <module name> """ print('Modify the binary firmware where the parameters are stored in manufacturing nvs...') if args.parameter_offset: param_addr = args.parameter_offset else: with open(args.output, 'rb') as fp: data = fp.read() try: param_addr = re.search(mfg_nvs_pattern, data).span()[0] except Exception as e: ESP_LOGE('Can not find valid entry of parameter partition, please check firmware: {}'.format(args.input)) sys.exit(2) # read the offset and size parameter of mfg_nvs.bin by param_addr parameter with open(args.output, 'rb') as fp: param_format = '<HBBII' fp.seek(param_addr, 0) raw_at_parameter = at_read_records(param_format, fp) list_at_parameter = list(raw_at_parameter) mfg_nvs_addr = list_at_parameter[3] mfg_nvs_size = list_at_parameter[4] ESP_LOGI('mfg_nvs.bin address: {} size: {}'.format(hex(mfg_nvs_addr), hex(mfg_nvs_size))) # create work directory if os.path.exists(mfg_directory): rmtree(mfg_directory) os.mkdir(mfg_directory) mfg_nvs_csv = os.path.abspath(os.path.join(mfg_directory, mfg_csv_filename)) mfg_nvs_bin = os.path.abspath(os.path.join(mfg_directory, mfg_bin_filename)) # read the content of mfg_nvs.bin with open(args.output, 'rb') as fp: fp.seek(mfg_nvs_addr, 0) data = fp.read(mfg_nvs_size) # dump mfg_nvs.bin to mfg_nvs.csv nvs = NVS_Partition(bytearray(data)) with open(mfg_nvs_csv, 'w+') as fp: dump_key_value_pairs(nvs, fp) # update mfg_nvs.csv with new parameters with open(mfg_nvs_csv, 'r+') as fp: data = fp.read() data = at_update_mfg_parameters(args, data) fp.seek(0) fp.truncate(0) fp.write(data) print('Updated NVS CSV: ===>', mfg_nvs_csv) # generate new mfg_nvs.bin from mfg_nvs.csv generate(mfg_nvs_csv, mfg_nvs_bin, mfg_nvs_size) # re-combine target.bin with new mfg_nvs.bin with open(args.output, 'rb+') as fp, open(mfg_nvs_bin, 'rb') as fbin: mfg_nvs_data = fbin.read() fp.seek(mfg_nvs_addr, 0) fp.write(mfg_nvs_data) ESP_LOGI('New esp-at firmware successfully generated! ----> {}'.format(os.path.abspath(args.output))) return def modify_param_bin_in_partition(esp, args): """ A typic format of esp-at parameter binary is in hard-coding partition (4KB size) and the format is like the following: ┌─────────────────┬─────────┬────────┬────────┬────────┬───────┬───────┬──────────────────┬─────────────────┐ │ 2B │ 1B │ 1B │ 1B │ 1B │ 1B │ 1B │ 4B │ 4B │ ├─────────────────┼─────────┼────────┼────────┼────────┼───────┼───────┼──────────────────┼─────────────────┤ │ magic code │ version │ rsvd │ tpower │ uart_x │ schan │ nchan │ country code │ uart baud │ ├────────┬────────┼─────────┼────────┼────────┼────────┼───────┴───────┼──────────────────┴─────────────────┤ │ tx pin │ rx pin │ cts │ rts │ txctrl │ rxctrl │ rsvd │ platform (0-7) │ ├────────┴────────┴─────────┴────────┴────────┴────────┴───────────────┴────────────────────────────────────┤ │ platform (8-23) │ ├──────────────────────────────────────────────────────────────────────┬────────────────────────────────────┤ │ platform (24-31) │ module name (0-7) │ ├──────────────────────────────────────────────────────────────────────┴────────────────────────────────────┤ │ module name (8-23) │ ├──────────────────────────────────────────────────────────────────────┬────────────────────────────────────┤ │ module name (24-31) │ padded with 0xFF │ ├──────────────────────────────────────────────────────────────────────┴────────────────────────────────────┤ │ padded with 0xFF bytes up to 4KB │ └───────────────────────────────────────────────────────────────────────────────────────────────────────────┘ For the sake of generality, these parameters support to configure: <tpower>, <uart_x>, <schan>, <nchan>, <country code>, <uart baud>, <tx_pin>, <tx_pin>, <cts>, <rts>, <txctrl>, <rxctrl>, <platform>, <module name> For the sake of compatibility, these parameters do not support to configure: <magic code>, <version>, <rsvd> """ print('Modify the binary firmware where the parameters are stored in partitions...') # find out the parameter start address # prefer to use input configuation if args.parameter_offset: param_addr = args.parameter_offset else: with open(args.output, 'rb') as fp: data = fp.read() try: param_addr = re.search(parameter_pattern, data).span()[0] except Exception as e: ESP_LOGE('Can not find valid entry of parameter partition, please check firmware: {}'.format(args.input)) sys.exit(2) if param_addr % sec_size != 0: ESP_LOGE("Found wrong entry of parameter partition: {}, please manually specify \"--parameter_offset\" parameter!".format(hex(param_addr))) sys.exit(2) else: ESP_LOGI('factory parameter entry address: {}'.format(hex(param_addr))) # modify parameter with open(args.output, 'rb+') as fp: param_format = '<HBBBbBB 4c i bbbbbbH 32c 32c' # valid 88 bytes and 4008 padding bytes fp.seek(param_addr, 0) raw_at_parameter = at_read_records(param_format, fp) print('raw parameters: {}\r\n'.format(raw_at_parameter)) list_at_parameter = list(raw_at_parameter) """ <_magic_code>, <_version>, <_rsvd>, <tpower>, <uart_x>, <schan>, <nchan>, <country code>, <uart baud>, <tx_pin>, <tx_pin>, <cts>, <rts>, <txctrl>, <rxctrl>, <_rsvd>, <platform>, <module name> """ at_parameter_assign_int(args.tx_power, 1, list_at_parameter, 3) at_parameter_assign_int(args.uart_num, 1, list_at_parameter, 4) at_parameter_assign_int(args.start_channel, 1, list_at_parameter, 5) at_parameter_assign_int(args.channel_number, 1, list_at_parameter, 6) at_parameter_assign_str(args.country_code, 4, list_at_parameter, 7) at_parameter_assign_int(args.baud, 4, list_at_parameter, 11) at_parameter_assign_int(args.tx_pin, 1, list_at_parameter, 12) at_parameter_assign_int(args.rx_pin, 1, list_at_parameter, 13) at_parameter_assign_int(args.cts_pin, 1, list_at_parameter, 14) at_parameter_assign_int(args.rts_pin, 1, list_at_parameter, 15) at_parameter_assign_int(args.tx_control_pin, 1, list_at_parameter, 16) at_parameter_assign_int(args.rx_control_pin, 1, list_at_parameter, 17) at_parameter_assign_str(args.platform, 32, list_at_parameter, 19) at_parameter_assign_str(args.module_name, 32, list_at_parameter, 51) new_at_parameter = tuple(list_at_parameter) fp.seek(param_addr, 0) at_write_records(new_at_parameter, param_format, fp) fp.seek(param_addr, 0) raw_at_parameter = at_read_records(param_format, fp) print('new parameters: {}\r\n'.format(raw_at_parameter)) ESP_LOGI('New esp-at firmware successfully generated! ----> {}'.format(os.path.abspath(args.output))) def generate_bin(esp, args): print('TODOs: ESP-AT will add this feature in v2.4.0.0+') def version(esp, args): print(__version__) def main(argv=None, esp=None): parser = argparse.ArgumentParser(description='at.py {} - ESP-AT Utility'.format(__version__), prog='at.py') subparsers = parser.add_subparsers( dest='operation', help='Run at.py {command} -h for additional help') parser_modify_bin = subparsers.add_parser( 'modify_bin', help='Modify the parameter configuration of esp-at factory firmware (1MB/2MB/4MB/.. size) according to the parameter configuration') parser_generate_bin = subparsers.add_parser( 'generate_bin', help='TODOs: ESP-AT will add this feature in v2.4.0.0+') subparsers.add_parser( 'version', help='Print at.py version') parser_modify_bin.add_argument('--platform', '-pf', help='ESP chip series', type=lambda c: c[0:32], choices=['PLATFORM_ESP32', 'PLATFORM_ESP8266', 'PLATFORM_ESP32S2', 'PLATFORM_ESP32C3']) parser_modify_bin.add_argument('--module_name', '-mn', help='ESP module name', type=lambda c: c[0:32]) parser_modify_bin.add_argument('--tx_power', '-tp', help='Initial RF Tx power of Wi-Fi, the unit is 0.25 dBm', type=int, choices=range(40, 85)) parser_modify_bin.add_argument('--uart_num', '-un', help='Initial UART number for communication with host MCU, receive AT commands and response', type=int, choices=range(0, 3)) parser_modify_bin.add_argument('--start_channel', '-sc', help='Initial Wi-Fi start channel', type=int, choices=range(1, 15)) parser_modify_bin.add_argument('--channel_number', '-cn', help='Total Wi-Fi channel number', type=int, choices=range(1, 15)) parser_modify_bin.add_argument('--country_code', '-cc', help='Initial Wi-Fi country code', type=lambda c: c[0:4]) parser_modify_bin.add_argument('--baud', '-b', help='Initial UART baudrate of AT firmware, for communication with host MCU', type=arg_auto_int) parser_modify_bin.add_argument('--tx_pin', '-tx', help='GPIO pin of ESP-AT uart tx, ESP-AT uses this tx_pin to send data to host MCU', type=arg_auto_int) parser_modify_bin.add_argument('--rx_pin', '-rx', help='GPIO pin of ESP-AT uart rx, ESP-AT uses this rx_pin to receive data from host MCU', type=arg_auto_int) parser_modify_bin.add_argument('--cts_pin', '-cts', help='GPIO pin of ESP-AT uart cts, used for hardware flow control', type=arg_auto_int) parser_modify_bin.add_argument('--rts_pin', '-rts', help='GPIO pin of ESP-AT uart rts, used for hardware flow control', type=arg_auto_int) parser_modify_bin.add_argument('--tx_control_pin', '-txctrl', help='See the Figure 1-10b (ESP8266EX UART SWAP) in <ESP8266 Hardware Design Guidelines> for more details.', type=arg_auto_int) parser_modify_bin.add_argument('--rx_control_pin', '-rxctrl', help='See the Figure 1-10c (ESP8266EX UART SWAP) in <ESP8266 Hardware Design Guidelines> for more details.', type=arg_auto_int) # server ca parser_modify_bin.add_argument('--server_ca', '-sca', help='Specify the new file path for the CA certificate of server side to update. This will update the file located at esp-at/components/customized_partitions/raw_data/server_ca/server_ca.crt with the contents of your new file.', type=str) # server crt parser_modify_bin.add_argument('--server_cert', '-scrt', help='Specify the new file path for the server certificate to update. This will update the file located at esp-at/components/customized_partitions/raw_data/server_cert/server_cert.crt with the contents of your new file.', type=str) # server key parser_modify_bin.add_argument('--server_key', '-skey', help='Specify the new file path for the server private key to update. This will update the file located at esp-at/components/customized_partitions/raw_data/server_key/server.key with the contents of your new file.', type=str) # client ca parser_modify_bin.add_argument('--client_ca0', '-cca0', help='Specify the new file path for the first CA certificate of client side to update. This will update the file located at esp-at/components/customized_partitions/raw_data/client_ca/client_ca_00.crt with the contents of your new file.', type=str) parser_modify_bin.add_argument('--client_ca1', '-cca1', help='Specify the new file path for the second CA certificate of client side to update. This will update the file located at esp-at/components/customized_partitions/raw_data/client_ca/client_ca_01.crt with the contents of your new file.', type=str) # client crt parser_modify_bin.add_argument('--client_cert0', '-ccrt0', help='Specify the new file path for the first client certificate to update. This will update the file located at esp-at/components/customized_partitions/raw_data/client_cert/client_cert_00.crt with the contents of your new file.', type=str) parser_modify_bin.add_argument('--client_cert1', '-ccrt1', help='Specify the new file path for the second client certificate to update. This will update the file located at esp-at/components/customized_partitions/raw_data/client_cert/client_cert_01.crt with the contents of your new file.', type=str) # client key parser_modify_bin.add_argument('--client_key0', '-ckey0', help='Specify the new file path for the first client private key to update. This will update the file located at esp-at/components/customized_partitions/raw_data/client_key/client_key_00.crt with the contents of your new file.', type=str) parser_modify_bin.add_argument('--client_key1', '-ckey1', help='Specify the new file path for the second client private key to update. This will update the file located at esp-at/components/customized_partitions/raw_data/client_key/client_key_01.crt with the contents of your new file.', type=str) # mqtt ca parser_modify_bin.add_argument('--mqtt_ca', '-mqca', help='Specify the new file path for the CA certificate of MQTT client to update. This will update the file located at esp-at/components/customized_partitions/raw_data/mqtt_ca/mqtt_ca.crt with the contents of your new file.', type=str) # mqtt crt parser_modify_bin.add_argument('--mqtt_cert', '-mqcrt', help='Specify the new file path for the MQTT certificate to update. This will update the file located at esp-at/components/customized_partitions/raw_data/mqtt_cert/mqtt_client.crt with the contents of your new file.', type=str) # mqtt key parser_modify_bin.add_argument('--mqtt_key', '-mqkey', help='Specify the new file path for the MQTT private key to update. This will update the file located at esp-at/components/customized_partitions/raw_data/mqtt_key/mqtt_client.key with the contents of your new file.', type=str) # wpa2 ca parser_modify_bin.add_argument('--wpa2_ca', '-wpa2ca', help='Specify the new file path for the CA certificate of WPA2 enterprise client to update. This will update the file located at esp-at/components/customized_partitions/raw_data/wpa2_ca/wpa2_ca.pem with the contents of your new file.', type=str) # wpa2 crt parser_modify_bin.add_argument('--wpa2_cert', '-wpa2crt', help='Specify the new file path for the WPA2 certificate to update. This will update the file located at esp-at/components/customized_partitions/raw_data/wpa2_cert/wpa2_client.crt with the contents of your new file.', type=str) # wpa2 key parser_modify_bin.add_argument('--wpa2_key', '-wpa2key', help='Specify the new file path for the WPA2 private key to update. This will update the file located at esp-at/components/customized_partitions/raw_data/wpa2_key/wpa2_client.key with the contents of your new file.', type=str) # gatts config for i in range(31): parser_modify_bin.add_argument('--gatts_cfg{}'.format(i), '-cfg{}'.format(i), help='Specify the nth configuration of GATTS to update. This will update the index={} line of esp-at/components/customized_partitions/raw_data/ble_data/gatts_data.csv file'.format(i), type=str) parser_modify_bin.add_argument('--parameter_offset', '-os', help='Offset of parameter partition in AT firmware. If this parameter is set, the input file will be parsed directly according to the parameter instead of automatically matching the parameter partition header.', type=arg_auto_int) parser_modify_bin.add_argument('--input', '-in', help='Input filename of AT firmware or parameter partition', metavar='filename', type=str, required=True) parser_modify_bin.add_argument('--output', '-o', help='Output filename of AT firmware or parameter partition', metavar='filename', type=str, default='target.bin') for operation in subparsers.choices.keys(): assert operation in globals(), '{} should be a module function'.format(operation) args = parser.parse_args(argv) if args.operation is None: parser.print_help() sys.exit(1) operation_func = globals()[args.operation] try: operation_func(esp, args) finally: # do final cleanup pass class FatalError(RuntimeError): """ Wrapper class for runtime errors that aren't caused by internal bugs, but by ESP-AT responses or input content. """ def __init__(self, message): RuntimeError.__init__(self, message) @staticmethod def WithResult(message, result): """ Return a fatal error object that appends the hex values of 'result' as a string formatted argument. """ message += ' (result was {})'.format(hexify(result)) return FatalError(message) def _main(): try: main() except FatalError as e: ESP_LOGE('A fatal error occurred: {}'.format(e)) sys.exit(2) except Exception as e: ESP_LOGE('A system error occurred: {}'.format(e)) if __name__ == '__main__': _main()