#!/usr/bin/env python # # SPDX-FileCopyrightText: 2024 Espressif Systems (Shanghai) CO LTD # SPDX-License-Identifier: Apache-2.0 from __future__ import division, print_function import argparse import re import os import sys from typing import Any, Dict, List, Optional from zlib import crc32 from shutil import copyfile, rmtree from struct import Struct import array import binascii import csv import distutils.dir_util import struct import zlib from builtins import bytes, int, range from io import open __version__ = 'v1.0-dev' # To locate the starting position of the parameter partition parameter_pattern = re.compile(b'\xFC\xFC[\x03|\x02|\x01]') # To locate the starting position of the mfg_nvs.bin partition ## 0xAA 0x50 is the default magic code of partition bin (generated by gen_esp32part.py) ## 0x01 0x02 is the type and subtype of mfg_nvs.bin partition (type: data, subtype: nvs) ## first .{4} is the offset of mfg_nvs.bin partition, sencod .{4} is the size of mfg_nvs.bin partition mfg_nvs_pattern = re.compile(b'\xAA\x50\x01\x02.{4}.{4}mfg_nvs') sec_size = 4096 min_firmware_size = (1024 * 1024) para_partition_size = (4 * 1024) # manufacturing nvs partition mfg_directory = 'mfg_nvs' mfg_csv_filename = 'mfg_nvs.csv' mfg_bin_filename = 'mfg_nvs.bin' def ESP_LOGI(x): print('\033[32m{}\033[0m'.format(x)) def ESP_LOGE(x): print('\033[31m{}\033[0m'.format(x)) def arg_auto_int(x): r = int(x, 0) return r if r >= 0 else -1 def at_read_records(format, f): record_struct = Struct(format) chunks = f.read(record_struct.size) return (record_struct.unpack(chunks)) def at_write_records(records, format, f): record_struct = Struct(format) x = record_struct.pack(*records) f.write(x) def at_parameter_assign_int(arg, fixed_len, l, lidx): if arg != None: l[lidx] = arg & (256 ** fixed_len - 1) if arg >= 0 else -1 def at_parameter_assign_str(arg, fixed_len, l, lidx): if arg != None: larg = list(arg) larg = larg + ['\x00'] * (fixed_len - len(larg)) arg_tmp = [x.encode() for x in larg] l[lidx : (lidx+fixed_len)] = arg_tmp[0 : fixed_len] def modify_bin(esp, args): print(args) if not os.path.exists(args.input): ESP_LOGE('File does not exist: {}'.format(args.input)) sys.exit(2) fsize = os.path.getsize(args.input) if (fsize != para_partition_size) and ((fsize % min_firmware_size) or (fsize / min_firmware_size > 16)): ESP_LOGE('Invalid file size: {}'.format(fsize)) sys.exit(2) copyfile(args.input, args.output) with open(args.input, 'rb') as fp: data = fp.read() if re.search(mfg_nvs_pattern, data): return modify_param_bin_in_nvs(esp, args) else: return modify_param_bin_in_partition(esp, args) # ----------------------------------------------------------------------------------------- # """ The following part is used to generate mfg_nvs.csv The lightweight version of https://github.com/espressif/esp-idf/tree/master/components/nvs_flash/nvs_partition_tool """ class NVS_Constants: class ConstantError(AttributeError): pass def __init__(self) -> None: self.page_size = 4096 self.entry_size = 32 self.item_type = { 0x01: 'u8', 0x11: 'i8', 0x02: 'u16', 0x12: 'i16', 0x04: 'u32', 0x14: 'i32', 0x08: 'u64', 0x18: 'i64', 0x21: 'string', 0x41: 'blob', 0x42: 'blob_data', 0x48: 'blob_index', } self.page_status = { 0xFFFFFFFF: 'Empty', 0xFFFFFFFE: 'Active', 0xFFFFFFFC: 'Full', 0xFFFFFFF8: 'Erasing', 0x00000000: 'Corrupted', } self.entry_status = { 0b11: 'Empty', 0b10: 'Written', 0b00: 'Erased', } def __setattr__(self, key: str, val: Any) -> None: if self.__dict__.get(key, None) is None: self.__dict__[key] = val else: raise NVS_Constants.ConstantError('Cannot change a constant!') nvs_const = NVS_Constants() class NotAlignedError(ValueError): pass class NVS_Partition: def __init__(self, raw_data: bytearray): if len(raw_data) % nvs_const.page_size != 0: raise NotAlignedError( f'Given partition data is not aligned to page size ({len(raw_data)} % {nvs_const.page_size} = {len(raw_data)%nvs_const.page_size})' ) # Divide partition into pages self.pages = [] for i in range(0, len(raw_data), nvs_const.page_size): self.pages.append(NVS_Page(raw_data[i: i + nvs_const.page_size], i)) class NVS_Page: def __init__(self, page_data: bytearray, address: int): if len(page_data) != nvs_const.page_size: raise NotAlignedError( f'Size of given page does not match page size ({len(page_data)} != {nvs_const.page_size})' ) # Initialize class self.is_empty = ( page_data[0: nvs_const.entry_size] == bytearray({0xFF}) * nvs_const.entry_size ) self.start_address = address self.raw_header = page_data[0: nvs_const.entry_size] self.raw_entry_state_bitmap = page_data[ nvs_const.entry_size: 2 * nvs_const.entry_size ] self.entries = [] # Load header self.header: Dict[str, Any] = { 'status': nvs_const.page_status.get( int.from_bytes(page_data[0:4], byteorder='little'), 'Invalid' ), 'page_index': int.from_bytes(page_data[4:8], byteorder='little'), 'version': 256 - page_data[8], 'crc': { 'original': int.from_bytes(page_data[28:32], byteorder='little'), 'computed': crc32(page_data[4:28], 0xFFFFFFFF), }, } # Load entry state bitmap entry_states = [] for c in self.raw_entry_state_bitmap: for index in range(0, 8, 2): entry_states.append( nvs_const.entry_status.get((c >> index) & 3, 'Invalid') ) entry_states = entry_states[:-2] # Load entries i = 2 while i < int( nvs_const.page_size / nvs_const.entry_size ): # Loop through every entry span = page_data[(i * nvs_const.entry_size) + 2] if span in [0xFF, 0]: # 'Default' span length to prevent span overflow span = 1 # Load an entry entry = NVS_Entry( i - 2, page_data[i * nvs_const.entry_size: (i + 1) * nvs_const.entry_size], entry_states[i - 2], ) self.entries.append(entry) # Load all children entries if span != 1: for span_idx in range(1, span): page_addr = i + span_idx entry_idx = page_addr - 2 if page_addr * nvs_const.entry_size >= nvs_const.page_size: break child_entry = NVS_Entry( entry_idx, page_data[ page_addr * nvs_const.entry_size: (page_addr + 1) * nvs_const.entry_size ], entry_states[entry_idx], ) entry.child_assign(child_entry) i += span class NVS_Entry: def __init__(self, index: int, entry_data: bytearray, entry_state: str): if len(entry_data) != nvs_const.entry_size: raise NotAlignedError( f'Given entry is not aligned to entry size ({len(entry_data)} % {nvs_const.entry_size} = {len(entry_data)%nvs_const.entry_size})' ) def item_convert(i_type: int, data: bytearray) -> Dict: byte_size_mask = 0x0F number_sign_mask = 0xF0 fixed_entry_length_threshold = ( 0x20 # Fixed length entry type number is always smaller than this ) if i_type in nvs_const.item_type: # Deal with non variable length entries if i_type < fixed_entry_length_threshold: size = i_type & byte_size_mask num = int.from_bytes( data[:size], byteorder='little', signed=bool(i_type & number_sign_mask), ) return {'value': num} # Deal with variable length entries if nvs_const.item_type[i_type] in ['string', 'blob_data', 'blob']: size = int.from_bytes(data[:2], byteorder='little') crc = int.from_bytes(data[4:8], byteorder='little') return {'value': [size, crc], 'size': size, 'crc': crc} if nvs_const.item_type[i_type] == 'blob_index': size = int.from_bytes(data[:4], byteorder='little') chunk_count = data[4] chunk_start = data[5] return { 'value': [size, chunk_count, chunk_start], 'size': size, 'chunk_count': chunk_count, 'chunk_start': chunk_start, } return {'value': None} def key_decode(data: bytearray) -> Optional[str]: decoded = '' for n in data.rstrip(b'\x00'): char = chr(n) if char.isascii(): decoded += char else: return None return decoded self.raw = entry_data self.state = entry_state self.is_empty = self.raw == bytearray({0xFF}) * nvs_const.entry_size self.index = index namespace = self.raw[0] entry_type = self.raw[1] span = self.raw[2] chunk_index = self.raw[3] crc = self.raw[4:8] key = self.raw[8:24] data = self.raw[24:32] raw_without_crc = self.raw[:4] + self.raw[8:32] self.metadata: Dict[str, Any] = { 'namespace': namespace, 'type': nvs_const.item_type.get(entry_type, f'0x{entry_type:02x}'), 'span': span, 'chunk_index': chunk_index, 'crc': { 'original': int.from_bytes(crc, byteorder='little'), 'computed': crc32(raw_without_crc, 0xFFFFFFFF), 'data_original': int.from_bytes(data[-4:], byteorder='little'), 'data_computed': 0, }, } self.children: List['NVS_Entry'] = [] self.key = key_decode(key) if self.key is None: self.data = None else: self.data = item_convert(entry_type, data) def child_assign(self, entry: 'NVS_Entry') -> None: if not isinstance(entry, type(self)): raise ValueError('You can assign only NVS_Entry') self.children.append(entry) def dump_key_value_pairs(nvs_partition: NVS_Partition, fp) -> None: # Get namespace list ns = {} for page in nvs_partition.pages: for entry in page.entries: if entry.state == 'Written' and entry.metadata['namespace'] == 0: ns[entry.data['value']] = entry.key fp.write('key,type,encoding,value\n') last_ns = '' last_key = '' findex = 0 for page in nvs_partition.pages: for entry in page.entries: if ( entry.state == 'Written' and entry.metadata['namespace'] != 0 ): # Ignore non-written entries chunk_index = '' data = '' if entry.metadata['type'] not in [ 'string', 'blob_data', 'blob_index', 'blob', ]: # Non-variable length entry data = entry.data['value'] elif entry.metadata['type'] == 'blob_index': continue else: # Variable length entries tmp = b'' for e in entry.children: # Merge all children entries tmp += bytes(e.raw) tmp = tmp[: entry.data['size']] # Discard padding if entry.metadata['type'] == 'blob_data': if entry.metadata['chunk_index'] >= 128: # Get real chunk index chunk_index = f'[{entry.metadata["chunk_index"] - 128}]' else: chunk_index = f'[{entry.metadata["chunk_index"]}]' # data = str(tmp) data = tmp if entry.metadata['namespace'] not in ns: continue else: # print(ns[entry.metadata['namespace']] + ':'+ entry.key + '(' + entry.metadata['type'] + ')' + f'{chunk_index} = {data}') now_ns = ns[entry.metadata['namespace']] if last_ns != now_ns: last_ns = now_ns fp.write(now_ns + ',namespace,,\n') if chunk_index == '': if entry.metadata['type'] == 'string': fp.write(entry.key + ',data,' + entry.metadata['type'] + ',' + '"' + str(data.decode('utf-8').rstrip('\x00')) + '"' +'\n') else: fp.write(entry.key + ',data,' + entry.metadata['type'] + ',' + str(data) + '\n') else: # blob data now_key = entry.key if last_key != now_key: findex += 1 last_key = now_key dup_key = False else: dup_key = True last_filename = 'v' + str(findex) + '.txt' with open(os.path.join(mfg_directory, last_filename), 'a+') as ftxt: ftxt.write(str(data.decode('utf-8'))) if not dup_key: fp.write(entry.key + ',file,binary,' + os.path.abspath(os.path.join(mfg_directory, last_filename)) + '\n') # ----------------------------------------------------------------------------------------- # # ----------------------------------------------------------------------------------------- # """ The following part is used to generate mfg_nvs.bin The lightweight version of https://github.com/espressif/esp-idf/blob/17451f1fb3d/components/nvs_flash/nvs_partition_generator """ VERSION1_PRINT = 'V1 - Multipage Blob Support Disabled' VERSION2_PRINT = 'V2 - Multipage Blob Support Enabled' class Page(object): # Item type codes U8 = 0x01 I8 = 0x11 U16 = 0x02 I16 = 0x12 U32 = 0x04 I32 = 0x14 U64 = 0x08 I64 = 0x18 SZ = 0x21 BLOB = 0x41 BLOB_DATA = 0x42 BLOB_IDX = 0x48 # Few Page constants HEADER_SIZE = 32 BITMAPARRAY_OFFSET = 32 BITMAPARRAY_SIZE_IN_BYTES = 32 FIRST_ENTRY_OFFSET = 64 SINGLE_ENTRY_SIZE = 32 CHUNK_ANY = 0xFF ACTIVE = 0xFFFFFFFE FULL = 0xFFFFFFFC VERSION1 = 0xFF VERSION2 = 0xFE PAGE_PARAMS = { 'max_size': 4096, 'max_blob_size': {VERSION1: 1984, VERSION2: 4000}, 'max_entries': 126 } def __init__(self, page_num, version, is_rsrv_page=False): self.entry_num = 0 self.bitmap_array = array.array('B') self.version = version self.page_buf = bytearray(b'\xff') * Page.PAGE_PARAMS['max_size'] if not is_rsrv_page: self.bitmap_array = self.create_bitmap_array() self.set_header(page_num, version) def set_header(self, page_num, version): # set page state to active page_header = bytearray(b'\xff') * 32 page_state_active_seq = Page.ACTIVE struct.pack_into('= 0, 'Page overflow!!' # Split the binary data into two and store a chunk of available size onto curr page if tailroom < remaining_size: chunk_size = tailroom else: chunk_size = remaining_size remaining_size = remaining_size - chunk_size # Change type of data to BLOB_DATA entry_struct[1] = Page.BLOB_DATA # Calculate no. of entries data chunk will require datachunk_rounded_size = (chunk_size + 31) & ~31 datachunk_entry_count = datachunk_rounded_size // 32 datachunk_total_entry_count = datachunk_entry_count + 1 # +1 for the entry header # Set Span entry_struct[2] = datachunk_total_entry_count # Update the chunkIndex chunk_index = chunk_start + chunk_count entry_struct[3] = chunk_index # Set data chunk data_chunk = data[offset:offset + chunk_size] # Compute CRC of data chunk struct.pack_into(' max_blob_size: raise InputError(' Input File: Size (%d) exceeds max allowed length `%s` bytes for key `%s`.' % (datalen, max_blob_size, key)) # Calculate no. of entries data will require rounded_size = (datalen + 31) & ~31 data_entry_count = rounded_size // 32 total_entry_count = data_entry_count + 1 # +1 for the entry header # Check if page is already full and new page is needed to be created right away if self.entry_num >= Page.PAGE_PARAMS['max_entries']: raise PageFullError() elif (self.entry_num + total_entry_count) >= Page.PAGE_PARAMS['max_entries']: if not (self.version == Page.VERSION2 and encoding in ['hex2bin', 'binary', 'base64']): raise PageFullError() # Entry header entry_struct = bytearray(b'\xff') * 32 # Set Namespace Index entry_struct[0] = ns_index # Set Span if self.version == Page.VERSION2: if encoding == 'string': entry_struct[2] = data_entry_count + 1 # Set Chunk Index chunk_index = Page.CHUNK_ANY entry_struct[3] = chunk_index else: entry_struct[2] = data_entry_count + 1 # set key key_array = b'\x00' * 16 entry_struct[8:24] = key_array entry_struct[8:8 + len(key)] = key.encode() # set Type if encoding == 'string': entry_struct[1] = Page.SZ elif encoding in ['hex2bin', 'binary', 'base64']: entry_struct[1] = Page.BLOB if self.version == Page.VERSION2 and (encoding in ['hex2bin', 'binary', 'base64']): entry_struct = self.write_varlen_binary_data(entry_struct,ns_index,key,data, datalen,total_entry_count, encoding, nvs_obj) else: self.write_single_page_entry(entry_struct, data, datalen, data_entry_count, nvs_obj) """ Low-level function to write data of primitive type into page buffer. """ def write_primitive_data(self, key, data, encoding, ns_index,nvs_obj): # Check if entry exceeds max number of entries allowed per page if self.entry_num >= Page.PAGE_PARAMS['max_entries']: raise PageFullError() entry_struct = bytearray(b'\xff') * 32 entry_struct[0] = ns_index # namespace index entry_struct[2] = 0x01 # Span chunk_index = Page.CHUNK_ANY entry_struct[3] = chunk_index # write key key_array = b'\x00' * 16 entry_struct[8:24] = key_array entry_struct[8:8 + len(key)] = key.encode() if encoding == 'u8': entry_struct[1] = Page.U8 struct.pack_into('/ :param outdir: Target output dir to store files :param filepath: Path of target file ''' bin_ext = '.bin' # Expand if tilde(~) provided in path outdir = os.path.expanduser(outdir) if filepath: key_file_name, ext = os.path.splitext(filepath) if not ext: filepath = key_file_name + bin_ext elif bin_ext not in ext: sys.exit('Error: `%s`. Only `%s` extension allowed.' % (filepath, bin_ext)) # Create dir if does not exist if not (os.path.isdir(outdir)): distutils.dir_util.mkpath(outdir) filedir, filename = os.path.split(filepath) filedir = os.path.join(outdir,filedir,'') if filedir and not os.path.isdir(filedir): distutils.dir_util.mkpath(filedir) if os.path.isabs(filepath): if not outdir == os.getcwd(): print('\nWarning: `%s` \n\t==> absolute path given so outdir is ignored for this file.' % filepath) # Set to empty as outdir is ignored here outdir = '' # Set full path - outdir + filename filepath = os.path.join(outdir, '') + filepath return outdir, filepath def generate(infile, outfile, size): ''' Generate NVS Partition :param args: Command line arguments given :param is_encr_enabled: Encryption enabled/disabled :param encr_key: Key to encrypt NVS partition ''' is_dir_new = False bin_ext = '.bin' input_size = check_size(size) outdir = os.getcwd() # Check if key file has .bin extension filename, ext = os.path.splitext(outfile) if bin_ext not in ext: sys.exit('Error: `%s`. Only `.bin` extension allowed.' % outfile) outdir, outfile = set_target_filepath(outdir, outfile) with open(infile, 'rt', encoding='utf8') as input_file,\ open(outfile, 'wb') as output_file,\ nvs_open(output_file, input_size, Page.VERSION2, False, None) as nvs_obj: # Comments are skipped reader = csv.DictReader(filter(lambda row: row[0] != '#',input_file), delimiter=',') for row in reader: try: max_key_len = 15 if len(row['key']) > max_key_len: raise InputError('Length of key `%s` should be <= 15 characters.' % row['key']) write_entry(nvs_obj, row['key'], row['type'], row['encoding'], row['value']) except InputError as e: print(e) filedir, filename = os.path.split(outfile) if filename: print('\nWarning: NVS binary not created...') os.remove(outfile) if is_dir_new and not filedir == os.getcwd(): print('\nWarning: Output dir not created...') os.rmdir(filedir) sys.exit(-2) print('Created NVS binary: ===>', outfile) # ----------------------------------------------------------------------------------------- # NVS_KEY_TYPE = { 'D': ',', # digit 'S': ',string,', # string 'B': ',binary,', # binary } def at_update_param(key, type, value, data): if value is None: return data lines = data.split('\n') for i in range(len(lines)): line = lines[i].strip() if line.startswith(key) and 'namespace' not in line: parts = line.split(NVS_KEY_TYPE[type]) if type == 'S': parts[-1] = '"' + str(value) + '"' else: parts[-1] = str(value) lines[i] = NVS_KEY_TYPE[type].join(parts) data = '\n'.join(lines) return data def at_update_mfg_parameters(args, data): # string parameters data = at_update_param('module_name', 'S', args.module_name, data) data = at_update_param('country_code', 'S', args.country_code, data) for i in range(31): # gatts config data = at_update_param('cfg{}'.format(i), 'S', getattr(args, 'gatts_cfg{}'.format(i)), data) # int parameters data = at_update_param('max_tx_power', 'D', args.tx_power, data) data = at_update_param('uart_port', 'D', args.uart_num, data) data = at_update_param('start_channel', 'D', args.start_channel, data) data = at_update_param('channel_num', 'D', args.channel_number, data) data = at_update_param('uart_baudrate', 'D', args.baud, data) data = at_update_param('uart_tx_pin', 'D', args.tx_pin, data) data = at_update_param('uart_rx_pin', 'D', args.rx_pin, data) data = at_update_param('uart_cts_pin', 'D', args.cts_pin, data) data = at_update_param('uart_rts_pin', 'D', args.rts_pin, data) # binary file parameters data = at_update_param('server_ca', 'B', args.server_ca, data) data = at_update_param('server_cert', 'B', args.server_cert, data) data = at_update_param('server_key', 'B', args.server_key, data) data = at_update_param('client_ca.0', 'B', args.client_ca0, data) data = at_update_param('client_ca.1', 'B', args.client_ca1, data) data = at_update_param('client_cert.0', 'B', args.client_cert0, data) data = at_update_param('client_cert.1', 'B', args.client_cert1, data) data = at_update_param('client_key.0', 'B', args.client_key0, data) data = at_update_param('client_key.1', 'B', args.client_key1, data) data = at_update_param('mqtt_ca', 'B', args.mqtt_ca, data) data = at_update_param('mqtt_cert', 'B', args.mqtt_cert, data) data = at_update_param('mqtt_key', 'B', args.mqtt_key, data) data = at_update_param('wpa2_ca', 'B', args.wpa2_ca, data) data = at_update_param('wpa2_cert', 'B', args.wpa2_cert, data) data = at_update_param('wpa2_key', 'B', args.wpa2_key, data) return data def modify_param_bin_in_nvs(esp, args): """ A typic format of esp-at parameter binary is in nvs partition, and these parameters support to configure: , , , , , , , , , , """ print('Modify the binary firmware where the parameters are stored in manufacturing nvs...') if args.parameter_offset: param_addr = args.parameter_offset else: with open(args.output, 'rb') as fp: data = fp.read() try: param_addr = re.search(mfg_nvs_pattern, data).span()[0] except Exception as e: ESP_LOGE('Can not find valid entry of parameter partition, please check firmware: {}'.format(args.input)) sys.exit(2) # read the offset and size parameter of mfg_nvs.bin by param_addr parameter with open(args.output, 'rb') as fp: param_format = '', mfg_nvs_csv) # generate new mfg_nvs.bin from mfg_nvs.csv generate(mfg_nvs_csv, mfg_nvs_bin, mfg_nvs_size) # re-combine target.bin with new mfg_nvs.bin with open(args.output, 'rb+') as fp, open(mfg_nvs_bin, 'rb') as fbin: mfg_nvs_data = fbin.read() fp.seek(mfg_nvs_addr, 0) fp.write(mfg_nvs_data) ESP_LOGI('New esp-at firmware successfully generated! ----> {}'.format(os.path.abspath(args.output))) return def modify_param_bin_in_partition(esp, args): """ A typic format of esp-at parameter binary is in hard-coding partition (4KB size) and the format is like the following: ┌─────────────────┬─────────┬────────┬────────┬────────┬───────┬───────┬──────────────────┬─────────────────┐ │ 2B │ 1B │ 1B │ 1B │ 1B │ 1B │ 1B │ 4B │ 4B │ ├─────────────────┼─────────┼────────┼────────┼────────┼───────┼───────┼──────────────────┼─────────────────┤ │ magic code │ version │ rsvd │ tpower │ uart_x │ schan │ nchan │ country code │ uart baud │ ├────────┬────────┼─────────┼────────┼────────┼────────┼───────┴───────┼──────────────────┴─────────────────┤ │ tx pin │ rx pin │ cts │ rts │ txctrl │ rxctrl │ rsvd │ platform (0-7) │ ├────────┴────────┴─────────┴────────┴────────┴────────┴───────────────┴────────────────────────────────────┤ │ platform (8-23) │ ├──────────────────────────────────────────────────────────────────────┬────────────────────────────────────┤ │ platform (24-31) │ module name (0-7) │ ├──────────────────────────────────────────────────────────────────────┴────────────────────────────────────┤ │ module name (8-23) │ ├──────────────────────────────────────────────────────────────────────┬────────────────────────────────────┤ │ module name (24-31) │ padded with 0xFF │ ├──────────────────────────────────────────────────────────────────────┴────────────────────────────────────┤ │ padded with 0xFF bytes up to 4KB │ └───────────────────────────────────────────────────────────────────────────────────────────────────────────┘ For the sake of generality, these parameters support to configure: , , , , , , , , , , , , , For the sake of compatibility, these parameters do not support to configure: , , """ print('Modify the binary firmware where the parameters are stored in partitions...') # find out the parameter start address # prefer to use input configuation if args.parameter_offset: param_addr = args.parameter_offset else: with open(args.output, 'rb') as fp: data = fp.read() try: param_addr = re.search(parameter_pattern, data).span()[0] except Exception as e: ESP_LOGE('Can not find valid entry of parameter partition, please check firmware: {}'.format(args.input)) sys.exit(2) if param_addr % sec_size != 0: ESP_LOGE("Found wrong entry of parameter partition: {}, please manually specify \"--parameter_offset\" parameter!".format(hex(param_addr))) sys.exit(2) else: ESP_LOGI('factory parameter entry address: {}'.format(hex(param_addr))) # modify parameter with open(args.output, 'rb+') as fp: param_format = ', <_version>, <_rsvd>, , , , , , , , , , , , , <_rsvd>, , """ at_parameter_assign_int(args.tx_power, 1, list_at_parameter, 3) at_parameter_assign_int(args.uart_num, 1, list_at_parameter, 4) at_parameter_assign_int(args.start_channel, 1, list_at_parameter, 5) at_parameter_assign_int(args.channel_number, 1, list_at_parameter, 6) at_parameter_assign_str(args.country_code, 4, list_at_parameter, 7) at_parameter_assign_int(args.baud, 4, list_at_parameter, 11) at_parameter_assign_int(args.tx_pin, 1, list_at_parameter, 12) at_parameter_assign_int(args.rx_pin, 1, list_at_parameter, 13) at_parameter_assign_int(args.cts_pin, 1, list_at_parameter, 14) at_parameter_assign_int(args.rts_pin, 1, list_at_parameter, 15) at_parameter_assign_int(args.tx_control_pin, 1, list_at_parameter, 16) at_parameter_assign_int(args.rx_control_pin, 1, list_at_parameter, 17) at_parameter_assign_str(args.platform, 32, list_at_parameter, 19) at_parameter_assign_str(args.module_name, 32, list_at_parameter, 51) new_at_parameter = tuple(list_at_parameter) fp.seek(param_addr, 0) at_write_records(new_at_parameter, param_format, fp) fp.seek(param_addr, 0) raw_at_parameter = at_read_records(param_format, fp) print('new parameters: {}\r\n'.format(raw_at_parameter)) ESP_LOGI('New esp-at firmware successfully generated! ----> {}'.format(os.path.abspath(args.output))) def generate_bin(esp, args): print('TODOs: ESP-AT will add this feature in v2.4.0.0+') def version(esp, args): print(__version__) def main(argv=None, esp=None): parser = argparse.ArgumentParser(description='at.py {} - ESP-AT Utility'.format(__version__), prog='at.py') subparsers = parser.add_subparsers( dest='operation', help='Run at.py {command} -h for additional help') parser_modify_bin = subparsers.add_parser( 'modify_bin', help='Modify the parameter configuration of esp-at factory firmware (1MB/2MB/4MB/.. size) according to the parameter configuration') parser_generate_bin = subparsers.add_parser( 'generate_bin', help='TODOs: ESP-AT will add this feature in v2.4.0.0+') subparsers.add_parser( 'version', help='Print at.py version') parser_modify_bin.add_argument('--platform', '-pf', help='ESP chip series', type=lambda c: c[0:32], choices=['PLATFORM_ESP32', 'PLATFORM_ESP8266', 'PLATFORM_ESP32S2', 'PLATFORM_ESP32C3']) parser_modify_bin.add_argument('--module_name', '-mn', help='ESP module name', type=lambda c: c[0:32]) parser_modify_bin.add_argument('--tx_power', '-tp', help='Initial RF Tx power of Wi-Fi, the unit is 0.25 dBm', type=int, choices=range(40, 85)) parser_modify_bin.add_argument('--uart_num', '-un', help='Initial UART number for communication with host MCU, receive AT commands and response', type=int, choices=range(0, 3)) parser_modify_bin.add_argument('--start_channel', '-sc', help='Initial Wi-Fi start channel', type=int, choices=range(1, 15)) parser_modify_bin.add_argument('--channel_number', '-cn', help='Total Wi-Fi channel number', type=int, choices=range(1, 15)) parser_modify_bin.add_argument('--country_code', '-cc', help='Initial Wi-Fi country code', type=lambda c: c[0:4]) parser_modify_bin.add_argument('--baud', '-b', help='Initial UART baudrate of AT firmware, for communication with host MCU', type=arg_auto_int) parser_modify_bin.add_argument('--tx_pin', '-tx', help='GPIO pin of ESP-AT uart tx, ESP-AT uses this tx_pin to send data to host MCU', type=arg_auto_int) parser_modify_bin.add_argument('--rx_pin', '-rx', help='GPIO pin of ESP-AT uart rx, ESP-AT uses this rx_pin to receive data from host MCU', type=arg_auto_int) parser_modify_bin.add_argument('--cts_pin', '-cts', help='GPIO pin of ESP-AT uart cts, used for hardware flow control', type=arg_auto_int) parser_modify_bin.add_argument('--rts_pin', '-rts', help='GPIO pin of ESP-AT uart rts, used for hardware flow control', type=arg_auto_int) parser_modify_bin.add_argument('--tx_control_pin', '-txctrl', help='See the Figure 1-10b (ESP8266EX UART SWAP) in for more details.', type=arg_auto_int) parser_modify_bin.add_argument('--rx_control_pin', '-rxctrl', help='See the Figure 1-10c (ESP8266EX UART SWAP) in for more details.', type=arg_auto_int) # server ca parser_modify_bin.add_argument('--server_ca', '-sca', help='Specify the new file path for the CA certificate of server side to update. This will update the file located at esp-at/components/customized_partitions/raw_data/server_ca/server_ca.crt with the contents of your new file.', type=str) # server crt parser_modify_bin.add_argument('--server_cert', '-scrt', help='Specify the new file path for the server certificate to update. This will update the file located at esp-at/components/customized_partitions/raw_data/server_cert/server_cert.crt with the contents of your new file.', type=str) # server key parser_modify_bin.add_argument('--server_key', '-skey', help='Specify the new file path for the server private key to update. This will update the file located at esp-at/components/customized_partitions/raw_data/server_key/server.key with the contents of your new file.', type=str) # client ca parser_modify_bin.add_argument('--client_ca0', '-cca0', help='Specify the new file path for the first CA certificate of client side to update. This will update the file located at esp-at/components/customized_partitions/raw_data/client_ca/client_ca_00.crt with the contents of your new file.', type=str) parser_modify_bin.add_argument('--client_ca1', '-cca1', help='Specify the new file path for the second CA certificate of client side to update. This will update the file located at esp-at/components/customized_partitions/raw_data/client_ca/client_ca_01.crt with the contents of your new file.', type=str) # client crt parser_modify_bin.add_argument('--client_cert0', '-ccrt0', help='Specify the new file path for the first client certificate to update. This will update the file located at esp-at/components/customized_partitions/raw_data/client_cert/client_cert_00.crt with the contents of your new file.', type=str) parser_modify_bin.add_argument('--client_cert1', '-ccrt1', help='Specify the new file path for the second client certificate to update. This will update the file located at esp-at/components/customized_partitions/raw_data/client_cert/client_cert_01.crt with the contents of your new file.', type=str) # client key parser_modify_bin.add_argument('--client_key0', '-ckey0', help='Specify the new file path for the first client private key to update. This will update the file located at esp-at/components/customized_partitions/raw_data/client_key/client_key_00.crt with the contents of your new file.', type=str) parser_modify_bin.add_argument('--client_key1', '-ckey1', help='Specify the new file path for the second client private key to update. This will update the file located at esp-at/components/customized_partitions/raw_data/client_key/client_key_01.crt with the contents of your new file.', type=str) # mqtt ca parser_modify_bin.add_argument('--mqtt_ca', '-mqca', help='Specify the new file path for the CA certificate of MQTT client to update. This will update the file located at esp-at/components/customized_partitions/raw_data/mqtt_ca/mqtt_ca.crt with the contents of your new file.', type=str) # mqtt crt parser_modify_bin.add_argument('--mqtt_cert', '-mqcrt', help='Specify the new file path for the MQTT certificate to update. This will update the file located at esp-at/components/customized_partitions/raw_data/mqtt_cert/mqtt_client.crt with the contents of your new file.', type=str) # mqtt key parser_modify_bin.add_argument('--mqtt_key', '-mqkey', help='Specify the new file path for the MQTT private key to update. This will update the file located at esp-at/components/customized_partitions/raw_data/mqtt_key/mqtt_client.key with the contents of your new file.', type=str) # wpa2 ca parser_modify_bin.add_argument('--wpa2_ca', '-wpa2ca', help='Specify the new file path for the CA certificate of WPA2 enterprise client to update. This will update the file located at esp-at/components/customized_partitions/raw_data/wpa2_ca/wpa2_ca.pem with the contents of your new file.', type=str) # wpa2 crt parser_modify_bin.add_argument('--wpa2_cert', '-wpa2crt', help='Specify the new file path for the WPA2 certificate to update. This will update the file located at esp-at/components/customized_partitions/raw_data/wpa2_cert/wpa2_client.crt with the contents of your new file.', type=str) # wpa2 key parser_modify_bin.add_argument('--wpa2_key', '-wpa2key', help='Specify the new file path for the WPA2 private key to update. This will update the file located at esp-at/components/customized_partitions/raw_data/wpa2_key/wpa2_client.key with the contents of your new file.', type=str) # gatts config for i in range(31): parser_modify_bin.add_argument('--gatts_cfg{}'.format(i), '-cfg{}'.format(i), help='Specify the nth configuration of GATTS to update. This will update the index={} line of esp-at/components/customized_partitions/raw_data/ble_data/gatts_data.csv file'.format(i), type=str) parser_modify_bin.add_argument('--parameter_offset', '-os', help='Offset of parameter partition in AT firmware. If this parameter is set, the input file will be parsed directly according to the parameter instead of automatically matching the parameter partition header.', type=arg_auto_int) parser_modify_bin.add_argument('--input', '-in', help='Input filename of AT firmware or parameter partition', metavar='filename', type=str, required=True) parser_modify_bin.add_argument('--output', '-o', help='Output filename of AT firmware or parameter partition', metavar='filename', type=str, default='target.bin') for operation in subparsers.choices.keys(): assert operation in globals(), '{} should be a module function'.format(operation) args = parser.parse_args(argv) if args.operation is None: parser.print_help() sys.exit(1) operation_func = globals()[args.operation] try: operation_func(esp, args) finally: # do final cleanup pass class FatalError(RuntimeError): """ Wrapper class for runtime errors that aren't caused by internal bugs, but by ESP-AT responses or input content. """ def __init__(self, message): RuntimeError.__init__(self, message) @staticmethod def WithResult(message, result): """ Return a fatal error object that appends the hex values of 'result' as a string formatted argument. """ message += ' (result was {})'.format(hexify(result)) return FatalError(message) def _main(): try: main() except FatalError as e: ESP_LOGE('A fatal error occurred: {}'.format(e)) sys.exit(2) except Exception as e: ESP_LOGE('A system error occurred: {}'.format(e)) if __name__ == '__main__': _main()