feat(cmds): Expand input of all functions to file paths, bytes, or file-like objects

This commit is contained in:
Radim Karniš
2025-03-03 23:29:14 +01:00
parent 03b84a12ff
commit 46a9e31cfe
11 changed files with 292 additions and 199 deletions

View File

@@ -23,6 +23,7 @@ project = "esptool.py"
copyright = "2016 - {}, Espressif Systems (Shanghai) Co., Ltd".format( copyright = "2016 - {}, Espressif Systems (Shanghai) Co., Ltd".format(
datetime.datetime.now().year datetime.datetime.now().year
) )
autodoc_typehints_format = "short"
# The language for content autogenerated by Sphinx. Refer to documentation # The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages. # for a list of supported languages.

View File

@@ -95,6 +95,40 @@ The following example demonstrates running a series of flash memory operations i
- This example doesn't use ``detect_chip()``, but instantiates a ``ESP32ROM`` class directly. This is useful when you know the target chip in advance. In this scenario ``esp.connect()`` is required to establish a connection with the device. - This example doesn't use ``detect_chip()``, but instantiates a ``ESP32ROM`` class directly. This is useful when you know the target chip in advance. In this scenario ``esp.connect()`` is required to establish a connection with the device.
- Multiple operations can be chained together in a single context manager block. - Multiple operations can be chained together in a single context manager block.
------------
The Public API implements a custom ``ImageSource`` input type, which expands to ``str | bytes | IO[bytes]`` - a path to the firmware image file, an opened file-like object, or the image data as bytes.
As output, the API returns a ``bytes`` object representing the binary image or writes the image to a file if the ``output`` parameter is provided.
The following example converts an ELF file to a flashable binary, prints the image information, and flashes the image. The example demonstrates three different ways to achieve the same result, showcasing the flexibility of the API:
.. code-block:: python
ELF = "firmware.elf"
# var 1 - Loading ELF from a file, not writing binary to a file
bin_file = elf2image(ELF, "esp32c3")
image_info(bin_file)
with detect_chip(PORT) as esp:
attach_flash(esp)
write_flash(esp, [(0, bin_file)])
# var 2 - Loading ELF from an opened file object, not writing binary to a file
with open(ELF, "rb") as elf_file, detect_chip(PORT) as esp:
bin_file = elf2image(elf_file, "esp32c3")
image_info(bin_file)
attach_flash(esp)
write_flash(esp, [(0, bin_file)])
# var 3 - Loading ELF from a file, writing binary to a file
elf2image(ELF, "esp32c3", "image.bin")
image_info("image.bin")
with detect_chip(PORT) as esp:
attach_flash(esp)
write_flash(esp, [(0, "image.bin")])
------------ ------------
**The following section provides a detailed reference for the public API functions.** **The following section provides a detailed reference for the public API functions.**

View File

@@ -627,11 +627,11 @@ def run_cli(ctx):
@click.pass_context @click.pass_context
def image_info_cli(ctx, filename): def image_info_cli(ctx, filename):
"""Dump headers from a binary file (bootloader or application)""" """Dump headers from a binary file (bootloader or application)"""
image_info(filename, ctx.obj["chip"]) image_info(filename, chip=None if ctx.obj["chip"] == "auto" else ctx.obj["chip"])
@cli.command("elf2image") @cli.command("elf2image")
@click.argument("input", type=click.Path(exists=True)) @click.argument("filename", type=click.Path(exists=True))
@click.option( @click.option(
"--output", "--output",
"-o", "-o",
@@ -721,14 +721,16 @@ def image_info_cli(ctx, filename):
) )
@add_spi_flash_options(allow_keep=False, auto_detect=False) @add_spi_flash_options(allow_keep=False, auto_detect=False)
@click.pass_context @click.pass_context
def elf2image_cli(ctx, **kwargs): def elf2image_cli(ctx, filename, **kwargs):
"""Create an application image from ELF file""" """Create an application image from ELF file"""
if ctx.obj["chip"] == "auto":
raise FatalError(
f"Specify the --chip argument (choose from {', '.join(CHIP_LIST)})"
)
append_digest = not kwargs.pop("dont_append_digest", False) append_digest = not kwargs.pop("dont_append_digest", False)
# Default to ESP8266 for backwards compatibility
chip = "esp8266" if ctx.obj["chip"] == "auto" else ctx.obj["chip"]
output = kwargs.pop("output", None) output = kwargs.pop("output", None)
output = "auto" if output is None else output output = "auto" if output is None else output
elf2image(chip=chip, output=output, append_digest=append_digest, **kwargs) elf2image(filename, ctx.obj["chip"], output, append_digest=append_digest, **kwargs)
@cli.command("read_mac") @cli.command("read_mac")
@@ -917,6 +919,10 @@ def read_flash_sfdp_cli(ctx, address, bytes, **kwargs):
@click.pass_context @click.pass_context
def merge_bin_cli(ctx, addr_filename, **kwargs): def merge_bin_cli(ctx, addr_filename, **kwargs):
"""Merge multiple raw binary files into a single file for later flashing""" """Merge multiple raw binary files into a single file for later flashing"""
if ctx.obj["chip"] == "auto":
raise FatalError(
f"Specify the --chip argument (choose from {', '.join(CHIP_LIST)})"
)
merge_bin(addr_filename, chip=ctx.obj["chip"], **kwargs) merge_bin(addr_filename, chip=ctx.obj["chip"], **kwargs)

View File

@@ -32,7 +32,7 @@ from .targets import (
ESP32S3ROM, ESP32S3ROM,
ESP8266ROM, ESP8266ROM,
) )
from .util import FatalError, byte, pad_to from .util import FatalError, byte, ImageSource, get_bytes, pad_to
def align_file_position(f, size): def align_file_position(f, size):
@@ -62,20 +62,29 @@ def intel_hex_to_bin(file: IO[bytes], start_addr: int | None = None) -> IO[bytes
return file return file
def LoadFirmwareImage(chip, image_file): def LoadFirmwareImage(chip: str, image_data: ImageSource):
""" """
Load a firmware image. Can be for any supported SoC. Load a firmware image. Can be for any supported SoC.
ESP8266 images will be examined to determine if they are original ROM firmware ESP8266 images will be examined to determine if they are original ROM firmware
images (ESP8266ROMFirmwareImage) or "v2" OTA bootloader images. images (ESP8266ROMFirmwareImage) or "v2" OTA bootloader images.
Returns a BaseFirmwareImage subclass, either ESP8266ROMFirmwareImage (v1) Returns a BaseFirmwareImage subclass.
or ESP8266V2FirmwareImage (v2).
""" """
data, _ = get_bytes(image_data)
def select_image_class(f, chip): f = io.BytesIO(data)
chip = re.sub(r"[-()]", "", chip.lower()) chip = re.sub(r"[-()]", "", chip.lower())
if chip != "esp8266": if chip == "esp8266":
# Look at the magic number to determine the ESP8266 image type
magic = ord(f.read(1))
f.seek(0)
if magic == ESPLoader.ESP_IMAGE_MAGIC:
return ESP8266ROMFirmwareImage(f)
elif magic == ESP8266V2FirmwareImage.IMAGE_V2_MAGIC:
return ESP8266V2FirmwareImage(f)
else:
raise FatalError(f"Invalid image magic number: {magic}")
else:
return { return {
"esp32": ESP32FirmwareImage, "esp32": ESP32FirmwareImage,
"esp32s2": ESP32S2FirmwareImage, "esp32s2": ESP32S2FirmwareImage,
@@ -90,20 +99,6 @@ def LoadFirmwareImage(chip, image_file):
"esp32p4": ESP32P4FirmwareImage, "esp32p4": ESP32P4FirmwareImage,
"esp32h4": ESP32H4FirmwareImage, "esp32h4": ESP32H4FirmwareImage,
}[chip](f) }[chip](f)
else: # Otherwise, ESP8266 so look at magic to determine the image type
magic = ord(f.read(1))
f.seek(0)
if magic == ESPLoader.ESP_IMAGE_MAGIC:
return ESP8266ROMFirmwareImage(f)
elif magic == ESP8266V2FirmwareImage.IMAGE_V2_MAGIC:
return ESP8266V2FirmwareImage(f)
else:
raise FatalError("Invalid image magic number: %d" % magic)
if isinstance(image_file, str):
with open(image_file, "rb") as f:
return select_image_class(f, chip)
return select_image_class(image_file, chip)
class ImageSegment(object): class ImageSegment(object):
@@ -1225,10 +1220,9 @@ class ELFFile(object):
SEG_TYPE_LOAD = 0x01 SEG_TYPE_LOAD = 0x01
LEN_SEG_HEADER = 0x20 LEN_SEG_HEADER = 0x20
def __init__(self, name): def __init__(self, data):
# Load sections from the ELF file self.data, self.name = get_bytes(data)
self.name = name f = io.BytesIO(self.data)
with open(self.name, "rb") as f:
self._read_elf_file(f) self._read_elf_file(f)
def get_section(self, section_name): def get_section(self, section_name):
@@ -1240,6 +1234,7 @@ class ELFFile(object):
def _read_elf_file(self, f): def _read_elf_file(self, f):
# read the ELF file header # read the ELF file header
LEN_FILE_HEADER = 0x34 LEN_FILE_HEADER = 0x34
source = "Image" if self.name is None else f"'{self.name}'"
try: try:
( (
ident, ident,
@@ -1257,25 +1252,23 @@ class ELFFile(object):
shnum, shnum,
shstrndx, shstrndx,
) = struct.unpack("<16sHHLLLLLHHHHHH", f.read(LEN_FILE_HEADER)) ) = struct.unpack("<16sHHLLLLLHHHHHH", f.read(LEN_FILE_HEADER))
except struct.error as e:
raise FatalError(
"Failed to read a valid ELF header from %s: %s" % (self.name, e)
)
except struct.error as e:
raise FatalError(f"{source} does not have a valid ELF header: {e}")
if byte(ident, 0) != 0x7F or ident[1:4] != b"ELF": if byte(ident, 0) != 0x7F or ident[1:4] != b"ELF":
raise FatalError("%s has invalid ELF magic header" % self.name) raise FatalError(f"{source} has invalid ELF magic header")
if machine not in [0x5E, 0xF3]: if machine not in [0x5E, 0xF3]:
raise FatalError( raise FatalError(
"%s does not appear to be an Xtensa or an RISCV ELF file. " f"{source} does not appear to be an Xtensa or an RISCV ELF image. "
"e_machine=%04x" % (self.name, machine) f"(e_machine = {machine:#06x})"
) )
if shentsize != self.LEN_SEC_HEADER: if shentsize != self.LEN_SEC_HEADER:
raise FatalError( raise FatalError(
"%s has unexpected section header entry size 0x%x (not 0x%x)" f"{source} has unexpected section header entry size {shentsize:#x} "
% (self.name, shentsize, self.LEN_SEC_HEADER) f"(not {self.LEN_SEC_HEADER:#x})"
) )
if shnum == 0: if shnum == 0:
raise FatalError("%s has 0 section headers" % (self.name)) raise FatalError(f"{source} has 0 section headers")
self._read_sections(f, shoff, shnum, shstrndx) self._read_sections(f, shoff, shnum, shstrndx)
self._read_segments(f, _phoff, _phnum, shstrndx) self._read_segments(f, _phoff, _phnum, shstrndx)
@@ -1285,13 +1278,13 @@ class ELFFile(object):
section_header = f.read(len_bytes) section_header = f.read(len_bytes)
if len(section_header) == 0: if len(section_header) == 0:
raise FatalError( raise FatalError(
"No section header found at offset %04x in ELF file." f"No section header found at offset {section_header_offs:#06x} "
% section_header_offs "in ELF image."
) )
if len(section_header) != (len_bytes): if len(section_header) != (len_bytes):
raise FatalError( raise FatalError(
"Only read 0x%x bytes from section header (expected 0x%x.) " f"Only read {len(section_header):#x} bytes from section header "
"Truncated ELF file?" % (len(section_header), len_bytes) f"(expected {len_bytes:#x}). Truncated ELF image?"
) )
# walk through the section header and extract all sections # walk through the section header and extract all sections
@@ -1347,13 +1340,13 @@ class ELFFile(object):
segment_header = f.read(len_bytes) segment_header = f.read(len_bytes)
if len(segment_header) == 0: if len(segment_header) == 0:
raise FatalError( raise FatalError(
"No segment header found at offset %04x in ELF file." f"No segment header found at offset {segment_header_offs:#06x} "
% segment_header_offs "in ELF image."
) )
if len(segment_header) != (len_bytes): if len(segment_header) != (len_bytes):
raise FatalError( raise FatalError(
"Only read 0x%x bytes from segment header (expected 0x%x.) " f"Only read {len(segment_header):#x} bytes from segment header "
"Truncated ELF file?" % (len(segment_header), len_bytes) f"(expected {len_bytes:#x}). Truncated ELF image?"
) )
# walk through the segment header and extract all segments # walk through the segment header and extract all segments
@@ -1389,6 +1382,6 @@ class ELFFile(object):
def sha256(self): def sha256(self):
# return SHA256 hash of the input ELF file # return SHA256 hash of the input ELF file
sha256 = hashlib.sha256() sha256 = hashlib.sha256()
with open(self.name, "rb") as f: f = io.BytesIO(self.data)
sha256.update(f.read()) sha256.update(f.read())
return sha256.digest() return sha256.digest()

View File

@@ -170,7 +170,7 @@ class AddrFilenamePairType(click.Path):
if sector_start < end: if sector_start < end:
raise click.BadParameter( raise click.BadParameter(
f"Detected overlap at address: " f"Detected overlap at address: "
f"0x{address:x} for file: {argfile.name}", f"{address:#x} for file: {argfile.name}",
) )
end = sector_end end = sector_end
return pairs return pairs

View File

@@ -14,7 +14,7 @@ import itertools
from intelhex import IntelHex from intelhex import IntelHex
from serial import SerialException from serial import SerialException
from typing import BinaryIO, cast from typing import cast
from .bin_image import ELFFile, LoadFirmwareImage from .bin_image import ELFFile, LoadFirmwareImage
from .bin_image import ( from .bin_image import (
@@ -42,8 +42,9 @@ from .util import (
from .util import ( from .util import (
div_roundup, div_roundup,
flash_size_bytes, flash_size_bytes,
get_file_size,
hexify, hexify,
ImageSource,
get_bytes,
pad_to, pad_to,
sanitize_string, sanitize_string,
) )
@@ -191,15 +192,17 @@ def detect_chip(
##################################### #####################################
def load_ram(esp: ESPLoader, filename: str) -> None: def load_ram(esp: ESPLoader, input: ImageSource) -> None:
""" """
Load a firmware image into RAM and execute it on the ESP device. Load a firmware image into RAM and execute it on the ESP device.
Args: Args:
esp: Initiated esp object connected to a real device. esp: Initiated esp object connected to a real device.
filename: Path to the firmware image file. input: Path to the firmware image file, opened file-like object,
or the image data as bytes.
""" """
image = LoadFirmwareImage(esp.CHIP_NAME, filename) data, _ = get_bytes(input)
image = LoadFirmwareImage(esp.CHIP_NAME, data)
log.print("RAM boot...") log.print("RAM boot...")
for seg in image.segments: for seg in image.segments:
@@ -426,7 +429,7 @@ def _update_image_flash_params(esp, address, flash_freq, flash_mode, flash_size,
def write_flash( def write_flash(
esp: ESPLoader, esp: ESPLoader,
addr_filename: list[tuple[int, BinaryIO]], addr_data: list[tuple[int, ImageSource]],
flash_freq: str = "keep", flash_freq: str = "keep",
flash_mode: str = "keep", flash_mode: str = "keep",
flash_size: str = "keep", flash_size: str = "keep",
@@ -437,8 +440,9 @@ def write_flash(
Args: Args:
esp: Initiated esp object connected to a real device. esp: Initiated esp object connected to a real device.
addr_filename: List of (address, file) tuples specifying where addr_data: List of (address, data) tuples specifying where
to write each file in flash memory. to write each file or data in flash memory. The data can be
a file path (str), bytes, or a file-like object.
flash_freq: Flash frequency to set in the bootloader image header flash_freq: Flash frequency to set in the bootloader image header
(``"keep"`` to retain current). (``"keep"`` to retain current).
flash_mode: Flash mode to set in the bootloader image header flash_mode: Flash mode to set in the bootloader image header
@@ -449,19 +453,23 @@ def write_flash(
Keyword Args: Keyword Args:
erase_all (bool): Erase the entire flash before writing. erase_all (bool): Erase the entire flash before writing.
encrypt (bool): Encrypt all files during flashing. encrypt (bool): Encrypt all files during flashing.
encrypt_files (list[tuple[int, BinaryIO]] | None): List of (address, file) encrypt_files (list[tuple[int, ImageSource]] | None): List of
tuples for files to encrypt individually. (address, data) tuples for files to encrypt individually.
compress (bool): Compress data before flashing. compress (bool): Compress data before flashing.
no_compress (bool): Don't compress data before flashing. no_compress (bool): Don't compress data before flashing.
force (bool): Ignore safety checks (e.g., overwriting bootloader, flash size). force (bool): Ignore safety checks (e.g., overwriting bootloader, flash size).
ignore_flash_enc_efuse (bool): Ignore flash encryption eFuse settings. ignore_flash_enc_efuse (bool): Ignore flash encryption eFuse settings.
no_progress (bool): Disable progress updates. no_progress (bool): Disable progress updates.
""" """
# Normalize addr_data to use bytes
norm_addr_data = [(addr, get_bytes(data)) for addr, data in addr_data]
# Set default values of optional arguments # Set default values of optional arguments
erase_all: bool = kwargs.get("erase_all", False) erase_all: bool = kwargs.get("erase_all", False)
encrypt: bool = kwargs.get("encrypt", False) encrypt: bool = kwargs.get("encrypt", False)
encrypt_files: list[tuple[int, BinaryIO]] | None = kwargs.get("encrypt_files", None) encrypt_files: list[tuple[int, ImageSource]] | None = kwargs.get(
"encrypt_files", None
)
compress: bool = kwargs.get("compress", False) compress: bool = kwargs.get("compress", False)
no_compress: bool = kwargs.get("no_compress", False) no_compress: bool = kwargs.get("no_compress", False)
force: bool = kwargs.get("force", False) force: bool = kwargs.get("force", False)
@@ -477,7 +485,7 @@ def write_flash(
if not force and esp.CHIP_NAME != "ESP8266" and not esp.secure_download_mode: if not force and esp.CHIP_NAME != "ESP8266" and not esp.secure_download_mode:
# Check if secure boot is active # Check if secure boot is active
if esp.get_secure_boot_enabled(): if esp.get_secure_boot_enabled():
for address, _ in addr_filename: for address, _ in norm_addr_data:
if address < 0x8000: if address < 0x8000:
raise FatalError( raise FatalError(
"Secure Boot detected, writing to flash regions < 0x8000 " "Secure Boot detected, writing to flash regions < 0x8000 "
@@ -486,16 +494,17 @@ def write_flash(
"please use with caution, otherwise it may brick your device!" "please use with caution, otherwise it may brick your device!"
) )
# Check if chip_id and min_rev in image are valid for the target in use # Check if chip_id and min_rev in image are valid for the target in use
for _, argfile in addr_filename: for _, (data, name) in norm_addr_data:
try: try:
image = LoadFirmwareImage(esp.CHIP_NAME, argfile) image = LoadFirmwareImage(esp.CHIP_NAME, data)
except (FatalError, struct.error, RuntimeError): except (FatalError, struct.error, RuntimeError):
continue continue
finally:
argfile.seek(0) # LoadFirmwareImage changes the file handle position
if image.chip_id != esp.IMAGE_CHIP_ID: if image.chip_id != esp.IMAGE_CHIP_ID:
msg = (
"Input does not contain" if name is None else f"'{name}' is not an"
)
raise FatalError( raise FatalError(
f"{argfile.name} is not an {esp.CHIP_NAME} image. " f"{msg} an {esp.CHIP_NAME} image. "
"Use the force argument to flash anyway." "Use the force argument to flash anyway."
) )
@@ -514,7 +523,7 @@ def write_flash(
if use_rev_full_fields: if use_rev_full_fields:
rev = esp.get_chip_revision() rev = esp.get_chip_revision()
if rev < image.min_rev_full or rev > image.max_rev_full: if rev < image.min_rev_full or rev > image.max_rev_full:
error_str = f"{argfile.name} requires chip revision in range " error_str = f"'{name}' requires chip revision in range "
error_str += ( error_str += (
f"[v{image.min_rev_full // 100}.{image.min_rev_full % 100} - " f"[v{image.min_rev_full // 100}.{image.min_rev_full % 100} - "
) )
@@ -538,7 +547,7 @@ def write_flash(
rev = esp.get_major_chip_version() rev = esp.get_major_chip_version()
if rev < image.min_rev: if rev < image.min_rev:
raise FatalError( raise FatalError(
f"{argfile.name} requires chip revision " f"'{name}' requires chip revision "
f"{image.min_rev} or higher (this chip is revision {rev}). " f"{image.min_rev} or higher (this chip is revision {rev}). "
"Use the force argument to flash anyway." "Use the force argument to flash anyway."
) )
@@ -570,17 +579,21 @@ def write_flash(
do_write = False do_write = False
# Determine which files list contain the ones to encrypt # Determine which files list contain the ones to encrypt
files_to_encrypt = addr_filename if encrypt else encrypt_files files_to_encrypt = (
norm_addr_data
if encrypt is not None
else [(addr, get_bytes(data)) for addr, data in encrypt_files]
)
if files_to_encrypt is not None: if files_to_encrypt is not None:
for address, argfile in files_to_encrypt: for address, (data, name) in files_to_encrypt:
if address % esp.FLASH_ENCRYPTED_WRITE_ALIGN: if address % esp.FLASH_ENCRYPTED_WRITE_ALIGN:
log.print( source = "Input image" if name is None else f"'{name}'"
f"File {argfile.name} address {address:#x} is not " log.warning(
f"{source} (address {address:#x}) is not "
f"{esp.FLASH_ENCRYPTED_WRITE_ALIGN} byte aligned, " f"{esp.FLASH_ENCRYPTED_WRITE_ALIGN} byte aligned, "
"can't flash encrypted" "can't flash encrypted"
) )
do_write = False do_write = False
if not do_write and not ignore_flash_enc_efuse: if not do_write and not ignore_flash_enc_efuse:
@@ -634,23 +647,20 @@ def write_flash(
# Verify file sizes fit in the set flash_size, or real flash size if smaller # Verify file sizes fit in the set flash_size, or real flash size if smaller
flash_end = min(set_flash_size, flash_end) if set_flash_size else flash_end flash_end = min(set_flash_size, flash_end) if set_flash_size else flash_end
if flash_end is not None: if flash_end is not None:
for address, argfile in addr_filename: for address, (data, name) in norm_addr_data:
argfile.seek(0, os.SEEK_END) if address + len(data) > flash_end:
if address + argfile.tell() > flash_end: source = "Input image" if name is None else f"File '{name}'"
raise FatalError( raise FatalError(
f"File {argfile.name} (length {argfile.tell()}) at offset " f"{source} (length {len(data)}) at offset "
f"{address} will not fit in {flash_end} bytes of flash. " f"{address:#010x} will not fit in {flash_end} bytes of flash. "
"Change the flash_size argument, or flashing address." "Change the flash_size argument or flashing address."
) )
argfile.seek(0)
if erase_all: if erase_all:
erase_flash(esp) erase_flash(esp)
else: else:
for address, argfile in addr_filename: for address, (data, _) in norm_addr_data:
argfile.seek(0, os.SEEK_END) write_end = address + len(data)
write_end = address + argfile.tell()
argfile.seek(0)
bytes_over = address % esp.FLASH_SECTOR_SIZE bytes_over = address % esp.FLASH_SECTOR_SIZE
if bytes_over != 0: if bytes_over != 0:
log.note( log.note(
@@ -673,15 +683,15 @@ def write_flash(
Each entry holds an "encrypt" flag marking whether the file needs encryption or not. Each entry holds an "encrypt" flag marking whether the file needs encryption or not.
This list needs to be sorted. This list needs to be sorted.
First, append to each entry of our addr_filename list the flag "encrypt" First, append to each entry of our addr_data list the flag "encrypt"
E.g., if addr_filename is [(0x1000, "partition.bin"), (0x8000, "bootloader")], E.g., if addr_data is [(0x1000, "partition.bin"), (0x8000, "bootloader")],
all_files will be [ all_files will be [
(0x1000, "partition.bin", encrypt), (0x1000, data, "partition.bin", encrypt),
(0x8000, "bootloader", encrypt) (0x8000, data, "bootloader", encrypt)
], ],
where, of course, encrypt is either True or False where, of course, encrypt is either True or False
""" """
all_files = [(offs, filename, encrypt) for (offs, filename) in addr_filename] all_files = [(addr, data, name, encrypt) for (addr, (data, name)) in norm_addr_data]
""" """
Now do the same with encrypt_files list, if defined. Now do the same with encrypt_files list, if defined.
@@ -689,7 +699,7 @@ def write_flash(
""" """
if encrypt_files is not None: if encrypt_files is not None:
encrypted_files_flag = [ encrypted_files_flag = [
(offs, filename, True) for (offs, filename) in encrypt_files (addr, *get_bytes(data), True) for (addr, data) in encrypt_files
] ]
# Concatenate both lists and sort them. # Concatenate both lists and sort them.
@@ -698,20 +708,23 @@ def write_flash(
# let's use sorted. # let's use sorted.
all_files = sorted(all_files + encrypted_files_flag, key=lambda x: x[0]) all_files = sorted(all_files + encrypted_files_flag, key=lambda x: x[0])
for address, argfile, encrypted in all_files: for address, data, name, encrypted in all_files:
compress = compress compress = compress
# Check whether we can compress the current file before flashing # Check whether we can compress the current file before flashing
if compress and encrypted: if compress and encrypted:
source = "input bytes" if name is None else f"'{name}'"
log.print("\n") log.print("\n")
log.warning("Compress and encrypt options are mutually exclusive.") log.warning("Compress and encrypt options are mutually exclusive.")
log.print(f"Will flash {argfile.name} uncompressed.") log.print(f"Will flash {source} uncompressed.")
compress = False compress = False
image = argfile.read() image = data
if len(image) == 0: if len(image) == 0:
log.warning(f"File {argfile.name} is empty") log.warning(
"Input bytes are empty" if name is None else f"'{name}' is empty"
)
continue continue
image = pad_to(image, esp.FLASH_ENCRYPTED_WRITE_ALIGN if encrypted else 4) image = pad_to(image, esp.FLASH_ENCRYPTED_WRITE_ALIGN if encrypted else 4)
@@ -752,7 +765,6 @@ def write_flash(
blocks = esp.flash_begin( blocks = esp.flash_begin(
uncsize, address, begin_rom_encrypted=encrypted uncsize, address, begin_rom_encrypted=encrypted
) )
argfile.seek(0) # in case we need it again
seq = 0 seq = 0
bytes_sent = 0 # bytes sent on wire bytes_sent = 0 # bytes sent on wire
bytes_written = 0 # bytes written to flash bytes_written = 0 # bytes written to flash
@@ -857,7 +869,7 @@ def write_flash(
try: try:
res = esp.flash_md5sum(address, uncsize) res = esp.flash_md5sum(address, uncsize)
if res != calcmd5: if res != calcmd5:
log.print(f"File MD5: {calcmd5}") log.print(f"Input MD5: {calcmd5}")
log.print(f"Flash MD5: {res}") log.print(f"Flash MD5: {res}")
if res == hashlib.md5(b"\xff" * uncsize).hexdigest(): if res == hashlib.md5(b"\xff" * uncsize).hexdigest():
raise FatalError( raise FatalError(
@@ -879,9 +891,9 @@ def write_flash(
esp.flash_begin(0, 0) esp.flash_begin(0, 0)
# Get the "encrypted" flag for the last file flashed # Get the "encrypted" flag for the last file flashed
# Note: all_files list contains triplets like: # Note: all_files list contains quadruplets like:
# (address: Integer, filename: String, encrypted: Boolean) # (address: int, filename: str | None, data: bytes, encrypted: bool)
last_file_encrypted = all_files[-1][2] last_file_encrypted = all_files[-1][3]
# Check whether the last file flashed was compressed or not # Check whether the last file flashed was compressed or not
if compress and not last_file_encrypted: if compress and not last_file_encrypted:
@@ -1319,19 +1331,21 @@ def read_flash(
def verify_flash( def verify_flash(
esp: ESPLoader, esp: ESPLoader,
addr_filename: list[tuple[int, BinaryIO]], addr_data: list[tuple[int, ImageSource]],
flash_freq: str = "keep", flash_freq: str = "keep",
flash_mode: str = "keep", flash_mode: str = "keep",
flash_size: str = "keep", flash_size: str = "keep",
diff: bool = False, diff: bool = False,
) -> None: ) -> None:
""" """
Verify the contents of the SPI flash memory against the provided binary files. Verify the contents of the SPI flash memory against the provided binary files
or byte data.
Args: Args:
esp: Initiated esp object connected to a real device. esp: Initiated esp object connected to a real device.
addr_filename: List of (address, file) tuples specifying what addr_data: List of (address, data) tuples specifying what
parts of flash memory to verify. parts of flash memory to verify. The data can be
a file path (str), bytes, or a file-like object.
flash_freq: Flash frequency setting (``"keep"`` to retain current). flash_freq: Flash frequency setting (``"keep"`` to retain current).
flash_mode: Flash mode setting (``"keep"`` to retain current). flash_mode: Flash mode setting (``"keep"`` to retain current).
flash_size: Flash size setting (``"keep"`` to retain current). flash_size: Flash size setting (``"keep"`` to retain current).
@@ -1340,18 +1354,19 @@ def verify_flash(
flash_size = _set_flash_parameters(esp, flash_size) # Set flash size parameters flash_size = _set_flash_parameters(esp, flash_size) # Set flash size parameters
mismatch = False mismatch = False
for address, argfile in addr_filename: for address, data in addr_data:
image = pad_to(argfile.read(), 4) data, source = get_bytes(data)
argfile.seek(0) # rewind in case we need it again image = pad_to(data, 4)
image = _update_image_flash_params( image = _update_image_flash_params(
esp, address, flash_freq, flash_mode, flash_size, image esp, address, flash_freq, flash_mode, flash_size, image
) )
image_size = len(image) image_size = len(image)
source = "input bytes" if source is None else f"'{source}'"
log.print( log.print(
f"Verifying {image_size:#x} ({image_size}) bytes " f"Verifying {image_size:#x} ({image_size}) bytes "
f"at {address:#010x} in flash against {argfile.name}..." f"at {address:#010x} in flash against {source}..."
) )
# Try digest first, only read if there are differences. # Try digest first, only read if there are differences.
digest = esp.flash_md5sum(address, image_size) digest = esp.flash_md5sum(address, image_size)
@@ -1689,23 +1704,30 @@ def _parse_bootloader_info(bootloader_info_segment):
} }
def image_info(filename: str, chip: str | None = None) -> None: def image_info(input: ImageSource, chip: str | None = None) -> None:
""" """
Display detailed information about an ESP firmware image. Display detailed information about an ESP firmware image.
Args: Args:
filename: Path to the firmware image file. input: Path to the firmware image file, opened file-like object,
or the image data as bytes.
chip: Target ESP device type (e.g., ``"esp32"``). If None, the chip chip: Target ESP device type (e.g., ``"esp32"``). If None, the chip
type will be automatically detected from the image header. type will be automatically detected from the image header.
""" """
log.print(f"File size: {get_file_size(filename)} (bytes)")
with open(filename, "rb") as f: data, _ = get_bytes(input)
# magic number log.print(f"Image size: {len(data)} bytes")
stream = io.BytesIO(data)
common_header = stream.read(8)
if chip is None:
extended_header = stream.read(16)
stream.seek(0)
# Check magic number
try: try:
common_header = f.read(8)
magic = common_header[0] magic = common_header[0]
except IndexError: except IndexError:
raise FatalError("File is empty") raise FatalError("Image is empty")
if magic not in [ if magic not in [
ESPLoader.ESP_IMAGE_MAGIC, ESPLoader.ESP_IMAGE_MAGIC,
ESP8266V2FirmwareImage.IMAGE_V2_MAGIC, ESP8266V2FirmwareImage.IMAGE_V2_MAGIC,
@@ -1716,14 +1738,12 @@ def image_info(filename: str, chip: str | None = None) -> None:
if chip is None: if chip is None:
try: try:
extended_header = f.read(16)
# append_digest, either 0 or 1 # append_digest, either 0 or 1
if extended_header[-1] not in [0, 1]: if extended_header[-1] not in [0, 1]:
raise FatalError("Append digest field not 0 or 1") raise FatalError("Append digest field not 0 or 1")
chip_id = int.from_bytes(extended_header[4:5], "little") chip_id = int.from_bytes(extended_header[4:5], "little")
for rom in [n for n in ROM_LIST if n.CHIP_NAME != "ESP8266"]: for rom in ROM_LIST:
if chip_id == rom.IMAGE_CHIP_ID: if chip_id == rom.IMAGE_CHIP_ID:
chip = rom.CHIP_NAME chip = rom.CHIP_NAME
break break
@@ -1734,7 +1754,7 @@ def image_info(filename: str, chip: str | None = None) -> None:
log.print(f"Detected image type: {chip.upper()}") log.print(f"Detected image type: {chip.upper()}")
image = LoadFirmwareImage(chip, filename) image = LoadFirmwareImage(chip, data)
def get_key_from_value(dict, val): def get_key_from_value(dict, val):
"""Get key from value in dictionary""" """Get key from value in dictionary"""
@@ -1912,7 +1932,7 @@ def image_info(filename: str, chip: str | None = None) -> None:
def merge_bin( def merge_bin(
addr_filename: list[tuple[int, BinaryIO]], addr_data: list[tuple[int, ImageSource]],
chip: str, chip: str,
output: str | None = None, output: str | None = None,
flash_freq: str = "keep", flash_freq: str = "keep",
@@ -1929,8 +1949,9 @@ def merge_bin(
Also apply necessary flash parameters and ensure correct alignment for flashing. Also apply necessary flash parameters and ensure correct alignment for flashing.
Args: Args:
addr_filename: List of (address, file) pairs specifying addr_data: List of (address, data) tuples specifying where
memory offsets and corresponding binary files. to write each file or data in flash memory. The data can be
a file path (str), bytes, or a file-like object.
chip: Target ESP device type (e.g., ``"esp32"``). chip: Target ESP device type (e.g., ``"esp32"``).
output: Path to the output file where the merged binary will be written. output: Path to the output file where the merged binary will be written.
If None, the merged binary will be returned as bytes. If None, the merged binary will be returned as bytes.
@@ -1962,7 +1983,7 @@ def merge_bin(
if format not in ["raw", "uf2", "hex"]: if format not in ["raw", "uf2", "hex"]:
raise FatalError(f"Invalid format: '{format}', choose from 'raw', 'uf2', 'hex'") raise FatalError(f"Invalid format: '{format}', choose from 'raw', 'uf2', 'hex'")
if format in ["uf2", "hex"] and output is None: if output is None and format in ["uf2", "hex"]:
raise FatalError(f"Output file must be specified with {format.upper()} format") raise FatalError(f"Output file must be specified with {format.upper()} format")
try: try:
@@ -1974,26 +1995,27 @@ def merge_bin(
# sort the files by offset. # sort the files by offset.
# The AddrFilenamePairAction has already checked for overlap # The AddrFilenamePairAction has already checked for overlap
input_files = sorted(addr_filename, key=lambda x: x[0]) addr_data = sorted(addr_data, key=lambda x: x[0])
if not input_files: if not addr_data:
raise FatalError("No input files specified") raise FatalError("No input data")
first_addr = input_files[0][0] first_addr = addr_data[0][0]
if first_addr < target_offset: if first_addr < target_offset:
raise FatalError( raise FatalError(
f"Output file target offset is {target_offset:#x}. " f"Output data target offset is {target_offset:#x}. "
f"Input file offset {first_addr:#x} is before this." f"Input data offset {first_addr:#x} is before this."
) )
if format == "uf2" and output is not None: if output is not None and format == "uf2":
with UF2Writer( with UF2Writer(
chip_class.UF2_FAMILY_ID, chip_class.UF2_FAMILY_ID,
output, output,
chunk_size, chunk_size,
md5_enabled=not md5_disable, md5_enabled=not md5_disable,
) as writer: ) as writer:
for addr, argfile in input_files: for addr, data in addr_data:
log.print(f"Adding {argfile.name} at {addr:#x}") image, source = get_bytes(data)
image = argfile.read() source = "bytes" if source is None else f"'{source}'"
log.print(f"Adding {source} at {addr:#x}")
image = _update_image_flash_params( image = _update_image_flash_params(
chip_class, addr, flash_freq, flash_mode, flash_size, image chip_class, addr, flash_freq, flash_mode, flash_size, image
) )
@@ -2011,10 +2033,9 @@ def merge_bin(
# account for output file offset if there is any # account for output file offset if there is any
of.write(b"\xff" * (flash_offs - target_offset - of.tell())) of.write(b"\xff" * (flash_offs - target_offset - of.tell()))
for addr, argfile in input_files: for addr, data in addr_data:
pad_to(addr) pad_to(addr)
image = argfile.read() image, _ = get_bytes(data)
argfile.seek(0) # Rewind for possible future operations with the files
image = _update_image_flash_params( image = _update_image_flash_params(
chip_class, addr, flash_freq, flash_mode, flash_size, image chip_class, addr, flash_freq, flash_mode, flash_size, image
) )
@@ -2038,18 +2059,18 @@ def merge_bin(
) )
return None return None
elif format == "hex" and output is not None: elif output is not None and format == "hex":
out = IntelHex() out = IntelHex()
if len(input_files) == 1: if len(addr_data) == 1:
log.warning( log.warning(
"Only one input file specified, output may include " "Only one input file specified, output may include "
"additional padding if input file was previously merged. " "additional padding if input file was previously merged. "
"Please refer to the documentation for more information: " "Please refer to the documentation for more information: "
"https://docs.espressif.com/projects/esptool/en/latest/esptool/basic-commands.html#hex-output-format" # noqa E501 "https://docs.espressif.com/projects/esptool/en/latest/esptool/basic-commands.html#hex-output-format" # noqa E501
) )
for addr, argfile in input_files: for addr, data in addr_data:
ihex = IntelHex() ihex = IntelHex()
image = argfile.read() image, _ = get_bytes(data)
image = _update_image_flash_params( image = _update_image_flash_params(
chip_class, addr, flash_freq, flash_mode, flash_size, image chip_class, addr, flash_freq, flash_mode, flash_size, image
) )
@@ -2064,7 +2085,7 @@ def merge_bin(
def elf2image( def elf2image(
input: str, input: ImageSource,
chip: str, chip: str,
output: str | None = None, output: str | None = None,
flash_freq: str | None = None, flash_freq: str | None = None,
@@ -2073,10 +2094,11 @@ def elf2image(
**kwargs, **kwargs,
) -> bytes | tuple[bytes | None, bytes] | None: ) -> bytes | tuple[bytes | None, bytes] | None:
""" """
Convert an ELF file into a firmware image suitable for flashing onto an ESP device. Convert ELF data into a firmware image suitable for flashing onto an ESP device.
Args: Args:
input: Path to the ELF file. input: Path to the ELF file to convert, opened file-like object,
or the ELF data as bytes.
chip: Target ESP device type. chip: Target ESP device type.
output: Path to save the generated firmware image. If "auto", a default output: Path to save the generated firmware image. If "auto", a default
pre-defined path is used. If None, the image is not written to a file, pre-defined path is used. If None, the image is not written to a file,
@@ -2126,7 +2148,8 @@ def elf2image(
f"Invalid chip choice: '{chip}' (choose from {', '.join(CHIP_LIST)})" f"Invalid chip choice: '{chip}' (choose from {', '.join(CHIP_LIST)})"
) )
e = ELFFile(input) data, source = get_bytes(input)
e = ELFFile(data)
log.print(f"Creating {chip.upper()} image...") log.print(f"Creating {chip.upper()} image...")
if chip != "esp8266": if chip != "esp8266":
bootloader_image = CHIP_DEFS[chip].BOOTLOADER_IMAGE bootloader_image = CHIP_DEFS[chip].BOOTLOADER_IMAGE
@@ -2227,7 +2250,8 @@ def elf2image(
log.print(f"Successfully created {chip.upper()} image.") log.print(f"Successfully created {chip.upper()} image.")
if output == "auto": if output == "auto":
output = image.default_output_name(input) source = f"{chip}_image" if source is None else source
output = image.default_output_name(source)
return cast(bytes | tuple[bytes | None, bytes] | None, image.save(output)) return cast(bytes | tuple[bytes | None, bytes] | None, image.save(output))

View File

@@ -2,11 +2,16 @@
# Espressif Systems (Shanghai) CO LTD, other contributors as noted. # Espressif Systems (Shanghai) CO LTD, other contributors as noted.
# #
# SPDX-License-Identifier: GPL-2.0-or-later # SPDX-License-Identifier: GPL-2.0-or-later
from __future__ import annotations
import os import os
import re import re
import struct import struct
from typing import IO, TypeAlias
# Define a custom type for the input
ImageSource: TypeAlias = str | bytes | IO[bytes]
def byte(bitstr, index): def byte(bitstr, index):
return bitstr[index] return bitstr[index]
@@ -85,6 +90,34 @@ def sanitize_string(byte_string):
return byte_string.decode("utf-8").replace("\0", "") return byte_string.decode("utf-8").replace("\0", "")
def get_bytes(input: ImageSource) -> tuple[bytes, str | None]:
"""
Normalize the input (file path, bytes, or an opened file-like object) into bytes
and provide a name of the source.
Args:
input: The input file path, bytes, or an opened file-like object.
Returns:
A tuple containing the normalized bytes and the source of the input.
"""
if isinstance(input, str):
with open(input, "rb") as f:
data = f.read()
source = input
elif isinstance(input, bytes):
data = input
source = None
elif hasattr(input, "read") and hasattr(input, "write") and hasattr(input, "close"):
pos = input.tell()
data = input.read()
input.seek(pos) # Reset the file pointer
source = input.name
else:
raise FatalError(f"Invalid input type {type(input)}")
return data, source
class PrintOnce: class PrintOnce:
""" """
Class for printing messages just once. Can be useful when running in a loop Class for printing messages just once. Can be useful when running in a loop

View File

@@ -643,7 +643,7 @@ class TestFlashing(EsptoolTestCase):
"write_flash 0x10000 images/one_kb.bin 0x11000 images/zerolength.bin" "write_flash 0x10000 images/one_kb.bin 0x11000 images/zerolength.bin"
) )
self.verify_readback(0x10000, 1024, "images/one_kb.bin") self.verify_readback(0x10000, 1024, "images/one_kb.bin")
assert "zerolength.bin is empty" in output assert "'images/zerolength.bin' is empty" in output
@pytest.mark.quick_test @pytest.mark.quick_test
def test_single_byte(self): def test_single_byte(self):
@@ -674,7 +674,7 @@ class TestFlashing(EsptoolTestCase):
) )
assert "Unexpected chip ID in image." in output assert "Unexpected chip ID in image." in output
assert "value was 9. Is this image for a different chip model?" in output assert "value was 9. Is this image for a different chip model?" in output
assert "images/esp32s3_header.bin is not an " in output assert "'images/esp32s3_header.bin' is not an " in output
assert "image. Use the force argument to flash anyway." in output assert "image. Use the force argument to flash anyway." in output
@pytest.mark.skipif( @pytest.mark.skipif(
@@ -687,7 +687,7 @@ class TestFlashing(EsptoolTestCase):
output = self.run_esptool_error( output = self.run_esptool_error(
"write_flash 0x0 images/one_kb.bin 0x1000 images/esp32s3_header.bin" "write_flash 0x0 images/one_kb.bin 0x1000 images/esp32s3_header.bin"
) )
assert "images/esp32s3_header.bin requires chip revision 10" in output assert "'images/esp32s3_header.bin' requires chip revision 10" in output
assert "or higher (this chip is revision" in output assert "or higher (this chip is revision" in output
assert "Use the force argument to flash anyway." in output assert "Use the force argument to flash anyway." in output
@@ -700,7 +700,7 @@ class TestFlashing(EsptoolTestCase):
"write_flash 0x0 images/one_kb.bin 0x1000 images/esp32c3_header_min_rev.bin" "write_flash 0x0 images/one_kb.bin 0x1000 images/esp32c3_header_min_rev.bin"
) )
assert ( assert (
"images/esp32c3_header_min_rev.bin " "'images/esp32c3_header_min_rev.bin' "
"requires chip revision in range [v2.55 - max rev not set]" in output "requires chip revision in range [v2.55 - max rev not set]" in output
) )
assert "Use the force argument to flash anyway." in output assert "Use the force argument to flash anyway." in output
@@ -856,14 +856,14 @@ class TestFlashSizes(EsptoolTestCase):
output = self.run_esptool_error( output = self.run_esptool_error(
"write_flash -fs 1MB 0x280000 images/one_kb.bin" "write_flash -fs 1MB 0x280000 images/one_kb.bin"
) )
assert "File images/one_kb.bin" in output assert "File 'images/one_kb.bin'" in output
assert "will not fit" in output assert "will not fit" in output
def test_write_no_compression_past_end_fails(self): def test_write_no_compression_past_end_fails(self):
output = self.run_esptool_error( output = self.run_esptool_error(
"write_flash -u -fs 1MB 0x280000 images/one_kb.bin" "write_flash -u -fs 1MB 0x280000 images/one_kb.bin"
) )
assert "File images/one_kb.bin" in output assert "File 'images/one_kb.bin'" in output
assert "will not fit" in output assert "will not fit" in output
@pytest.mark.skipif( @pytest.mark.skipif(
@@ -1779,5 +1779,5 @@ class TestESPObjectOperations(EsptoolTestCase):
output = fake_out.getvalue() output = fake_out.getvalue()
assert "Detected image type: ESP32" in output assert "Detected image type: ESP32" in output
assert "Checksum: 0x83 (valid)" in output assert "Checksum: 0x83 (valid)" in output
assert "Wrote 0x2000 bytes to file output.bin" in output assert "Wrote 0x2400 bytes to file 'output.bin'" in output
assert esptool.__version__ in output assert esptool.__version__ in output

View File

@@ -160,7 +160,7 @@ class TestImageInfo:
# This bootloader binary is built from "hello_world" project # This bootloader binary is built from "hello_world" project
# with default settings, IDF version is v5.2. # with default settings, IDF version is v5.2.
out = self.run_image_info("esp32", "bootloader_esp32_v5_2.bin") out = self.run_image_info("esp32", "bootloader_esp32_v5_2.bin")
assert "File size: 26768 (bytes)" in out assert "Image size: 26768 bytes" in out
assert "Bootloader information" in out assert "Bootloader information" in out
assert "Bootloader version: 1" in out assert "Bootloader version: 1" in out
assert "ESP-IDF: v5.2-dev-254-g1950b15" in out assert "ESP-IDF: v5.2-dev-254-g1950b15" in out
@@ -193,7 +193,7 @@ class TestImageInfo:
try: try:
convert_bin2hex(file) convert_bin2hex(file)
out = self.run_image_info("esp32", file) out = self.run_image_info("esp32", file)
assert "File size: 26768 (bytes)" in out assert "Image size: 26768 bytes" in out
assert "Bootloader information" in out assert "Bootloader information" in out
assert "Bootloader version: 1" in out assert "Bootloader version: 1" in out
assert "ESP-IDF: v5.2-dev-254-g1950b15" in out assert "ESP-IDF: v5.2-dev-254-g1950b15" in out

View File

@@ -173,9 +173,9 @@ class TestESP8266V1Image(BaseTestCase):
@classmethod @classmethod
def teardown_class(self): def teardown_class(self):
super(TestESP8266V1Image, self).teardown_class()
try_delete(self.BIN_LOAD) try_delete(self.BIN_LOAD)
try_delete(self.BIN_IROM) try_delete(self.BIN_IROM)
super(TestESP8266V1Image, self).teardown_class()
def test_irom_bin(self): def test_irom_bin(self):
with open(self.ELF, "rb") as f: with open(self.ELF, "rb") as f:

View File

@@ -374,7 +374,9 @@ class TestUF2:
print(output) print(output)
assert "warning" not in output.lower(), "merge_bin should not output warnings" assert "warning" not in output.lower(), "merge_bin should not output warnings"
exp_list = [f"Adding {f} at {hex(addr)}" for addr, f in iter_addr_offset_tuples] exp_list = [
f"Adding '{f}' at {hex(addr)}" for addr, f in iter_addr_offset_tuples
]
exp_list += [ exp_list += [
f"bytes to file '{of_name}', ready to be flashed with any ESP USB Bridge" f"bytes to file '{of_name}', ready to be flashed with any ESP USB Bridge"
] ]