feat(logging): Add collapsible output stages and ASCII progress bars

This commit is contained in:
Radim Karniš
2025-03-12 13:18:42 +01:00
parent 0beee77871
commit f3cf10715e
12 changed files with 354 additions and 203 deletions

View File

@@ -97,7 +97,7 @@ Complete list of configurable options:
+------------------------------+-----------------------------------------------------------+----------+
| erase_write_timeout_per_mb | Timeout (per megabyte) for erasing and writing data | 40 s |
+------------------------------+-----------------------------------------------------------+----------+
| mem_end_rom_timeout | Short timeout for ESP_MEM_END | 0.2 s |
| mem_end_rom_timeout | Short timeout for MEM_END | 0.2 s |
+------------------------------+-----------------------------------------------------------+----------+
| serial_write_timeout | Timeout for serial port write | 10 s |
+------------------------------+-----------------------------------------------------------+----------+

View File

@@ -295,7 +295,7 @@ Esptool allows redirecting output by implementing a custom logger class. This ca
log_to_file = True
log_file = "esptool.log"
def print(self, message, *args, **kwargs):
def print(self, message="", *args, **kwargs):
# Print to console
print(f"[CustomLogger]: {message}", *args, **kwargs)
# Optionally log to a file
@@ -312,14 +312,22 @@ Esptool allows redirecting output by implementing a custom logger class. This ca
def error(self, message):
self.print(message, file=sys.stderr)
def print_overwrite(self, message, last_line=False):
# Overwriting not needed, print normally
self.print(message)
def set_progress(self, percentage):
# Progress updates not needed, pass
def stage(self, finish=False):
# Collapsible stages not needed in this example
pass
def progress_bar(
self,
cur_iter,
total_iters,
prefix = "",
suffix = "",
bar_length: int = 30,
):
# Progress bars replaced with simple percentage output in this example
percent = f"{100 * (cur_iter / float(total_iters)):.1f}"
self.print(f"Finished: {percent}%")
# Replace the default logger with the custom logger
log.set_logger(CustomLogger())
@@ -334,11 +342,11 @@ To ensure compatibility with esptool, the custom logger should re-implement (or
- ``note``: Logs informational messages.
- ``warning``: Logs warning messages.
- ``error``: Logs error messages.
- ``print_overwrite``: Handles message overwriting (can be a simple ``print()`` if overwriting is not needed).
- ``set_progress``: Handles percentage updates of long-running operations - ``write-flash``, ``read-flash``, and ``dump-mem`` (useful for GUI visualisation, e.g. as a progress bar).
- ``stage``: Starts or ends a collapsible output stage.
- ``progress_bar``: Displays a progress bar.
.. autoclass:: esptool.logger.EsptoolLogger
:members: print, note, warning, error, print_overwrite, set_progress
:members: print, note, warning, error, stage, progress_bar
:member-order: bysource
These methods are essential for maintaining proper integration and behavior with esptool. Additionally, all calls to the logger should be made using ``log.print()`` (or the respective method, such as ``log.info()`` or ``log.warning()``) instead of the standard ``print()`` function to ensure the output is routed through the custom logger. This ensures consistency and allows the custom logger to handle all output appropriately. You can further customize this logger to fit your application's needs, such as integrating with GUI components or advanced logging frameworks.
These methods are essential for maintaining proper integration and behavior with esptool. Additionally, all output printing should be made using ``log.print()`` (or the respective method, such as ``log.info()`` or ``log.warning()``) instead of the standard ``print()`` function to ensure the output is routed through the custom logger. This ensures consistency and allows the custom logger to handle all output appropriately. You can further customize this logger to fit your application's needs, such as integrating with GUI components or advanced logging frameworks.

View File

@@ -278,7 +278,8 @@ def check_flash_size(esp: ESPLoader, address: int, size: int) -> None:
cls=Group,
no_args_is_help=True,
context_settings=dict(help_option_names=["-h", "--help"], max_content_width=120),
help=f"esptool.py v{__version__} - Espressif chips ROM Bootloader Utility",
help=f"esptool.py v{__version__} - Espressif SoCs flashing, debugging, "
"and provisioning utility",
)
@click.option(
"--chip",
@@ -376,6 +377,12 @@ def prepare_esp_object(ctx):
# 1) Get the ESP object
#######################
# Disable output stage collapsing, colors, and overwriting in trace mode
if ctx.obj["trace"]:
log._smart_features = False
log.stage()
if ctx.obj["before"] != "no_reset_no_sync":
initial_baud = min(
ESPLoader.ESP_ROM_BAUD, ctx.obj["baud"]
@@ -430,19 +437,23 @@ def prepare_esp_object(ctx):
f"on any of the {len(ser_list)} available serial ports."
)
log.stage(finish=True)
log.print(f"Connected to {esp.CHIP_NAME} on {esp._port.port}:")
# 2) Print the chip info
########################
if esp.secure_download_mode:
log.print(f"Chip is {esp.CHIP_NAME} in Secure Download Mode")
else:
log.print(f"Chip is {esp.get_chip_description()}")
log.print(f"Features: {', '.join(esp.get_chip_features())}")
log.print(f"Crystal is {esp.get_crystal_freq()}MHz")
log.print(f"{'Chip type:':<20}{esp.get_chip_description()}")
log.print(f"{'Features:':<20}{', '.join(esp.get_chip_features())}")
log.print(f"{'Crystal frequency:':<20}{esp.get_crystal_freq()}MHz")
usb_mode = esp.get_usb_mode()
if usb_mode is not None:
log.print(f"USB mode: {usb_mode}")
log.print(f"{'USB mode:':<20}{usb_mode}")
read_mac(esp)
log.print()
# 3) Upload the stub flasher
############################
@@ -488,6 +499,7 @@ def prepare_esp_object(ctx):
# 8) Reset the chip
###################
log.print()
# Handle post-operation behaviour (reset or other)
if ctx.obj["invoked_subcommand"] == "load-ram":
# the ESP is now running the loaded image, so let it run

View File

@@ -202,25 +202,34 @@ def load_ram(esp: ESPLoader, input: ImageSource) -> None:
input: Path to the firmware image file, opened file-like object,
or the image data as bytes.
"""
data, _ = get_bytes(input)
data, source = get_bytes(input)
image = LoadFirmwareImage(esp.CHIP_NAME, data)
log.print("RAM boot...")
for seg in image.segments:
log.stage()
source = "image" if source is None else f"'{source}'"
log.print(f"Loading {source} to RAM...")
for i, seg in enumerate(image.segments, start=1):
size = len(seg.data)
log.print(f"Downloading {size} bytes at {seg.addr:08x}...", end=" ", flush=True)
log.progress_bar(
cur_iter=i,
total_iters=len(image.segments),
prefix=f"Downloading {size} bytes at {seg.addr:#010x} ",
suffix="...",
)
esp.mem_begin(
size, div_roundup(size, esp.ESP_RAM_BLOCK), esp.ESP_RAM_BLOCK, seg.addr
)
seq = 0
while len(seg.data) > 0:
esp.mem_block(seg.data[0 : esp.ESP_RAM_BLOCK], seq)
seg.data = seg.data[esp.ESP_RAM_BLOCK :]
seq += 1
log.print("done!")
log.print(f"All segments done, executing at {image.entrypoint:08x}")
log.stage(finish=True)
log.print(
f"Loaded {len(image.segments)} segments from {source} to RAM, "
f"executing at {image.entrypoint:#010x}."
)
esp.mem_finish(image.entrypoint)
@@ -265,32 +274,40 @@ def dump_mem(
Memory dump as bytes if output is None;
otherwise, returns None after writing to file.
"""
data = io.BytesIO() # Use BytesIO to store the memory dump
log.stage()
log.print(
f"Dumping {size} bytes from {address:#010x}"
+ (f" to file '{output}'..." if output else "...")
)
data = io.BytesIO() # Use BytesIO to store the memory dump
log.set_progress(0)
t = time.time()
# Read the memory in 4-byte chunks.
for i in range(size // 4):
d = esp.read_reg(address + (i * 4))
cur_addr = address + (i * 4)
d = esp.read_reg(cur_addr)
data.write(struct.pack("<I", d)) # Write 4 bytes to BytesIO
# Update progress every 1024 bytes.
if data.tell() % 1024 == 0:
percent = data.tell() * 100 // size
log.set_progress(percent)
log.print_overwrite(f"{data.tell()} bytes read... ({percent}%)")
log.print_overwrite(f"Successfully read {data.tell()} bytes.", last_line=True)
cur = data.tell()
if cur % 1024 == 0 or cur == size:
log.progress_bar(
cur_iter=data.tell(),
total_iters=size,
prefix=f"Dumping from {cur_addr:#010x} ",
suffix=f" {cur}/{size} bytes...",
)
t = time.time() - t
speed_msg = " ({:.1f} kbit/s)".format(data.tell() / t * 8 / 1000) if t > 0.0 else ""
dest_msg = f" to '{output}'" if output else ""
log.stage(finish=True)
log.print(
f"Dumped {data.tell()} bytes from {address:#010x} in {t:.1f} seconds"
f"{speed_msg}{dest_msg}."
)
if output:
with open(output, "wb") as f:
f.write(data.getvalue())
log.print(f"Memory dump to '{output}' completed.")
return None
else:
log.print("Memory dump completed.")
return data.getvalue()
@@ -753,36 +770,36 @@ def write_flash(
if compress:
uncimage = image
image = zlib.compress(uncimage, 9)
compsize = len(image)
original_image = image # Save the whole image in case retry is needed
# Try again if reconnect was successful
log.stage()
for attempt in range(1, esp.WRITE_FLASH_ATTEMPTS + 1):
try:
if compress:
# Decompress the compressed binary a block at a time,
# to dynamically calculate the timeout based on the real write size
decompress = zlib.decompressobj()
blocks = esp.flash_defl_begin(uncsize, len(image), address)
esp.flash_defl_begin(uncsize, compsize, address)
else:
blocks = esp.flash_begin(
uncsize, address, begin_rom_encrypted=encrypted
)
esp.flash_begin(uncsize, address, begin_rom_encrypted=encrypted)
seq = 0
bytes_sent = 0 # bytes sent on wire
bytes_written = 0 # bytes written to flash
t = time.time()
timeout = DEFAULT_TIMEOUT
log.set_progress(0)
while len(image) > 0:
percent = 100 * (seq + 1) // blocks
log.set_progress(percent)
image_size = compsize if compress else uncsize
while len(image) >= 0:
if not no_progress:
log.print_overwrite(
f"Writing at {address + bytes_written:010x}... "
f"({percent} %)"
log.progress_bar(
cur_iter=image_size - len(image),
total_iters=image_size,
prefix=f"Writing at {address + bytes_written:#010x} ",
suffix=f" {bytes_sent}/{image_size} bytes...",
)
if len(image) == 0: # All data sent, print 100% progress and end
break
block = image[0 : esp.FLASH_WRITE_SIZE]
if compress:
# feeding each compressed block into the decompressor lets us
@@ -849,21 +866,20 @@ def write_flash(
t = time.time() - t
speed_msg = ""
log.stage(finish=True)
if compress:
if t > 0.0:
speed_msg = f" (effective {uncsize / t * 8 / 1000:.1f} kbit/s)"
log.print_overwrite(
f"Wrote {uncsize} bytes ({bytes_sent} compressed) at {address:#010x} "
f"in {t:.1f} seconds{speed_msg}...",
last_line=True,
speed_msg = f" ({uncsize / t * 8 / 1000:.1f} kbit/s)"
log.print(
f"Wrote {uncsize} bytes ({bytes_sent} compressed) "
f"at {address:#010x} in {t:.1f} seconds{speed_msg}."
)
else:
if t > 0.0:
speed_msg = " (%.1f kbit/s)" % (bytes_written / t * 8 / 1000)
log.print_overwrite(
log.print(
f"Wrote {bytes_written} bytes at {address:#010x} in {t:.1f} "
f"seconds{speed_msg}...",
last_line=True,
f"seconds{speed_msg}."
)
if not encrypted and not esp.secure_download_mode:
@@ -912,7 +928,7 @@ def read_mac(esp: ESPLoader) -> None:
"""
def print_mac(label, mac):
log.print(f"{label}: {':'.join(f'{x:02x}' for x in mac)}")
log.print(f"{label + ':':<20}{':'.join(f'{x:02x}' for x in mac)}")
eui64 = esp.read_mac("EUI64")
if eui64:
@@ -1232,6 +1248,10 @@ def flash_id(esp: ESPLoader) -> None:
Args:
esp: Initiated esp object connected to a real device.
"""
log.print()
title = "Flash Memory Information:"
log.print(title)
log.print("=" * len(title))
print_flash_id(esp)
flash_type = esp.flash_type()
flash_type_dict = {0: "quad (4 data lines)", 1: "octal (8 data lines)"}
@@ -1299,34 +1319,30 @@ def read_flash(
flash_progress = None
else:
def flash_progress(progress, length):
percent = progress * 100.0 / length
log.set_progress(percent)
def flash_progress(progress, length, offset):
log.progress_bar(
cur_iter=progress,
total_iters=length,
prefix=f"Reading from {offset + progress:#010x} ",
suffix=f" {progress}/{length} bytes...",
)
msg = f"{progress} ({percent:.0f} %)"
padding = "\b" * len(msg)
if progress != length:
log.print_overwrite(f"{msg}{padding}")
else:
log.print_overwrite(f"{msg}", last_line=True)
log.set_progress(0)
log.stage()
t = time.time()
data = esp.read_flash(address, size, flash_progress)
t = time.time() - t
speed_msg = " ({:.1f} kbit/s)".format(len(data) / t * 8 / 1000) if t > 0.0 else ""
log.print_overwrite(
f"Read {len(data)} bytes at {address:#010x} in {t:.1f} seconds{speed_msg}...",
last_line=True,
dest_msg = f" to '{output}'" if output else ""
log.stage(finish=True)
log.print(
f"Read {len(data)} bytes from {address:#010x} in {t:.1f} seconds"
f"{speed_msg}{dest_msg}."
)
if output:
with open(output, "wb") as f:
f.write(data)
log.print(f"Flash read to '{output}' completed.")
return None
else:
log.print("Flash read completed.")
return data
@@ -1373,27 +1389,27 @@ def verify_flash(
digest = esp.flash_md5sum(address, image_size)
expected_digest = hashlib.md5(image).hexdigest()
if digest == expected_digest:
log.print("-- verify OK (digest matched)")
log.print("Verification successful (digest matched).")
continue
else:
mismatch = True
if not diff:
log.print("-- verify FAILED (digest mismatch)")
log.print("Verification failed (digest mismatch)")
continue
flash = esp.read_flash(address, image_size)
assert flash != image
differences = [i for i in range(image_size) if flash[i] != image[i]]
log.print(
f"-- verify FAILED: {len(differences)} differences, "
f"first at {address + differences[0]:#010x}"
f"Verification failed: {len(differences)} differences, "
f"first at {address + differences[0]:#010x}:"
)
for d in differences:
flash_byte = flash[d]
image_byte = image[d]
log.print(f" {address + d:08x} {flash_byte:02x} {image_byte:02x}")
log.print(f" {address + d:#010x} {flash_byte:02x} {image_byte:02x}")
if mismatch:
raise FatalError("Verify failed.")
raise FatalError("Verification failed.")
def read_flash_status(esp: ESPLoader, bytes: int = 2) -> None:
@@ -1757,15 +1773,8 @@ def image_info(input: ImageSource, chip: str | None = None) -> None:
image = LoadFirmwareImage(chip, data)
def get_key_from_value(dict, val):
"""Get key from value in dictionary"""
for key, value in dict.items():
if value == val:
return key
return None
log.print()
title = f"{chip.upper()} image header".format()
title = f"{chip.upper()} Image Header"
log.print(title)
log.print("=" * len(title))
log.print(f"Image version: {image.version}")
@@ -1803,7 +1812,7 @@ def image_info(input: ImageSource, chip: str | None = None) -> None:
# Extended header (ESP32 and later only)
if chip != "esp8266":
log.print()
title = f"{chip.upper()} extended image header"
title = f"{chip.upper()} Extended Image Header"
log.print(title)
log.print("=" * len(title))
log.print(
@@ -1843,7 +1852,7 @@ def image_info(input: ImageSource, chip: str | None = None) -> None:
log.print()
# Segments overview
title = "Segments information"
title = "Segments Information"
log.print(title)
log.print("=" * len(title))
headers_str = "{:>7} {:>7} {:>10} {:>10} {:10}"
@@ -1872,7 +1881,7 @@ def image_info(input: ImageSource, chip: str | None = None) -> None:
log.print()
# Footer
title = f"{chip.upper()} image footer"
title = f"{chip.upper()} Image Footer"
log.print(title)
log.print("=" * len(title))
calc_checksum = image.calculate_checksum()
@@ -1902,7 +1911,7 @@ def image_info(input: ImageSource, chip: str | None = None) -> None:
app_desc = _parse_app_info(app_desc_seg)
if app_desc:
log.print()
title = "Application information"
title = "Application Information"
log.print(title)
log.print("=" * len(title))
log.print(f"Project name: {app_desc['project_name']}")
@@ -1924,7 +1933,7 @@ def image_info(input: ImageSource, chip: str | None = None) -> None:
bootloader_desc = _parse_bootloader_info(bootloader_desc_seg)
if bootloader_desc:
log.print()
title = "Bootloader information"
title = "Bootloader Information"
log.print(title)
log.print("=" * len(title))
log.print(f"Bootloader version: {bootloader_desc['version']}")

View File

@@ -978,7 +978,7 @@ class ESPLoader(object):
timeout=timeout,
)
if size != 0 and not self.IS_STUB and logging:
log.print(f"Took {time.time() - t:.2f}s to erase flash block")
log.print(f"Took {time.time() - t:.2f}s to erase flash block.")
return num_blocks
def flash_block(self, data, seq, timeout=DEFAULT_TIMEOUT):
@@ -997,7 +997,7 @@ class ESPLoader(object):
if attempts_left:
self.trace(
"Block write failed, "
f"retrying with {attempts_left} attempts left"
f"retrying with {attempts_left} attempts left..."
)
else:
raise
@@ -1177,15 +1177,17 @@ class ESPLoader(object):
)
def run_stub(self, stub: StubFlasher | None = None) -> "ESPLoader":
log.stage()
if stub is None:
stub = StubFlasher(self.CHIP_NAME)
if self.sync_stub_detected:
log.print("Stub is already running. No upload is necessary.")
log.stage(finish=True)
log.print("Stub flasher is already running. No upload is necessary.")
return self.STUB_CLASS(self) if self.STUB_CLASS is not None else self
# Upload
log.print("Uploading stub...")
log.print("Uploading stub flasher...")
for field in [stub.text, stub.data]:
if field is not None:
offs = stub.text_start if field == stub.text else stub.data_start
@@ -1196,20 +1198,21 @@ class ESPLoader(object):
from_offs = seq * self.ESP_RAM_BLOCK
to_offs = from_offs + self.ESP_RAM_BLOCK
self.mem_block(field[from_offs:to_offs], seq)
log.print("Running stub...")
log.print("Running stub flasher...")
self.mem_finish(stub.entry)
try:
p = self.read()
except StopIteration:
raise FatalError(
"Failed to start stub. There was no response."
"Failed to start stub flasher. There was no response."
"\nTry increasing timeouts, for more information see: "
"https://docs.espressif.com/projects/esptool/en/latest/esptool/configuration-file.html" # noqa E501
)
if p != b"OHAI":
raise FatalError(f"Failed to start stub. Unexpected response: {p}")
log.print("Stub running...")
raise FatalError(f"Failed to start stub flasher. Unexpected response: {p}")
log.stage(finish=True)
log.print("Stub flasher running.")
return self.STUB_CLASS(self) if self.STUB_CLASS is not None else self
@stub_and_esp32_function_only
@@ -1251,7 +1254,7 @@ class ESPLoader(object):
)
if size != 0 and not self.IS_STUB:
# (stub erases as it writes, but ROM loaders erase on begin)
log.print(f"Took {time.time() - t:.2f}s to erase flash block")
log.print(f"Took {time.time() - t:.2f}s to erase flash block.")
return num_blocks
@stub_and_esp32_function_only
@@ -1310,7 +1313,7 @@ class ESPLoader(object):
@stub_and_esp32_function_only
def change_baud(self, baud):
log.print(f"Changing baud rate to {baud}")
log.print(f"Changing baud rate to {baud}...")
# stub takes the new baud rate and the old one
second_arg = self._port.baudrate if self.IS_STUB else 0
self.command(
@@ -1356,25 +1359,26 @@ class ESPLoader(object):
while len(data) < length:
p = self.read()
data += p
if len(data) < length and len(p) < self.FLASH_SECTOR_SIZE:
data_len = len(data)
if data_len < length and len(p) < self.FLASH_SECTOR_SIZE:
raise FatalError(
"Corrupt data, expected 0x%x bytes but received 0x%x bytes"
% (self.FLASH_SECTOR_SIZE, len(p))
f"Corrupt data, expected {self.FLASH_SECTOR_SIZE:#x} "
f"bytes but received {len(p):#x} bytes"
)
self.write(struct.pack("<I", len(data)))
if progress_fn and (len(data) % 1024 == 0 or len(data) == length):
progress_fn(len(data), length)
self.write(struct.pack("<I", data_len))
if progress_fn and (data_len % 1024 == 0 or data_len == length):
progress_fn(data_len, length, offset)
if len(data) > length:
raise FatalError("Read more than expected")
digest_frame = self.read()
if len(digest_frame) != 16:
raise FatalError("Expected digest, got: %s" % hexify(digest_frame))
raise FatalError(f"Expected digest, got: {hexify(digest_frame)}")
expected_digest = hexify(digest_frame).upper()
digest = hashlib.md5(data).hexdigest().upper()
if digest != expected_digest:
raise FatalError(
"Digest mismatch: expected %s, got %s" % (expected_digest, digest)
f"Digest mismatch: expected {expected_digest}, got {digest}"
)
return data

View File

@@ -4,6 +4,7 @@
from abc import ABC, abstractmethod
import sys
import os
class TemplateLogger(ABC):
@@ -36,26 +37,41 @@ class TemplateLogger(ABC):
pass
@abstractmethod
def print_overwrite(self, message: str, last_line: bool = False):
def stage(self, finish: bool = False):
"""
Print a message, overwriting the currently printed line.
Start or finish a new collapsible stage.
"""
pass
@abstractmethod
def set_progress(self, percentage: float):
def progress_bar(
self,
cur_iter: int,
total_iters: int,
prefix: str = "",
suffix: str = "",
bar_length: int = 30,
):
"""
Set the progress of long-running operations to a specific percentage.
Print a progress bar.
"""
pass
class EsptoolLogger(TemplateLogger):
ansi_red = "\033[1;31m"
ansi_yellow = "\033[0;33m"
ansi_blue = "\033[0;34m"
ansi_normal = "\033[0m"
ansi_clear = "\033[K"
ansi_red: str = ""
ansi_yellow: str = ""
ansi_blue: str = ""
ansi_normal: str = ""
ansi_clear: str = ""
ansi_line_up: str = ""
ansi_line_clear: str = ""
_stage_active: bool = False
_newline_count: int = 0
_kept_lines: list[str] = []
_smart_features: bool = False
def __new__(cls):
"""
@@ -63,6 +79,7 @@ class EsptoolLogger(TemplateLogger):
"""
if not hasattr(cls, "instance"):
cls.instance = super(EsptoolLogger, cls).__new__(cls)
cls.instance._set_smart_features()
return cls.instance
@classmethod
@@ -70,10 +87,63 @@ class EsptoolLogger(TemplateLogger):
if hasattr(cls, "instance"):
del cls.instance
@classmethod
def _set_smart_features(cls, override: bool | None = None):
# Check for smart terminal and color support
if override is not None:
cls.instance._smart_features = override
else:
is_tty = hasattr(sys.stdout, "isatty") and sys.stdout.isatty()
term_supports_color = os.getenv("TERM", "").lower() in (
"xterm",
"xterm-256color",
"screen",
"screen-256color",
"linux",
"vt100",
)
no_color = os.getenv("NO_COLOR", "").strip().lower() in ("1", "true", "yes")
# Determine if colors should be enabled
cls.instance._smart_features = (
is_tty and term_supports_color and not no_color
)
# Handle Windows specifically
if sys.platform == "win32" and cls.instance._smart_features:
try:
from colorama import init
init() # Enable ANSI support on Windows
except ImportError:
cls.instance._smart_features = False
if cls.instance._smart_features:
cls.instance.ansi_red = "\033[1;31m"
cls.instance.ansi_yellow = "\033[0;33m"
cls.instance.ansi_blue = "\033[1;36m"
cls.instance.ansi_normal = "\033[0m"
cls.instance.ansi_clear = "\033[K"
cls.instance.ansi_line_up = "\033[1A"
cls.instance.ansi_line_clear = "\x1b[2K"
else:
cls.instance.ansi_red = ""
cls.instance.ansi_yellow = ""
cls.instance.ansi_blue = ""
cls.instance.ansi_normal = ""
cls.instance.ansi_clear = ""
cls.instance.ansi_line_up = ""
cls.instance.ansi_line_clear = ""
def print(self, *args, **kwargs):
"""
Log a plain message.
Log a plain message. Count newlines if in a collapsing stage.
"""
if self._stage_active:
# Count the number of newlines in the message
message = "".join(map(str, args))
self._newline_count += message.count("\n")
if kwargs.get("end", "\n") == "\n":
self._newline_count += 1
print(*args, **kwargs)
def note(self, message: str):
@@ -82,7 +152,9 @@ class EsptoolLogger(TemplateLogger):
"""
formatted_message = f"{self.ansi_blue}Note:{self.ansi_normal} {message}"
print(formatted_message)
if self._stage_active:
self._kept_lines.append(formatted_message)
self.print(formatted_message)
def warning(self, message: str):
"""
@@ -90,7 +162,9 @@ class EsptoolLogger(TemplateLogger):
"""
formatted_message = f"{self.ansi_yellow}Warning:{self.ansi_normal} {message}"
print(formatted_message)
if self._stage_active:
self._kept_lines.append(formatted_message)
self.print(formatted_message)
def error(self, message: str):
"""
@@ -98,38 +172,68 @@ class EsptoolLogger(TemplateLogger):
"""
formatted_message = f"{self.ansi_red}{message}{self.ansi_normal}"
print(formatted_message, file=sys.stderr)
self.print(formatted_message, file=sys.stderr)
def print_overwrite(self, message: str, last_line: bool = False):
def stage(self, finish: bool = False):
"""
Print a message, overwriting the currently printed line.
If last_line is False, don't append a newline at the end
(expecting another subsequent call will overwrite this one).
After a sequence of calls with last_line=False, call once with last_line=True.
If output is not a TTY (for example redirected a pipe),
no overwriting happens and this function is the same as print().
Start or finish a collapsible stage.
Any log messages printed between the start and finish will be deleted
when the stage is successfully finished.
Warnings and notes will be saved and printed at the end of the stage.
If terminal doesn't support ANSI escape codes, no collapsing happens.
"""
if hasattr(sys.stdout, "isatty") and sys.stdout.isatty():
# ansi_clear clears the line to prevent artifacts from previous lines
print(
f"\r{self.ansi_clear}{message}",
end="\n" if last_line else "",
flush=True,
)
if finish:
if not self._stage_active:
return
# Deactivate stage to stop collecting input
self._stage_active = False
if self._smart_features:
# Delete printed lines
self.print(
f"{self.ansi_line_up}{self.ansi_line_clear}"
* (self._newline_count),
end="",
flush=True,
)
# Print saved warnings and notes
for line in self._kept_lines:
self.print(line)
# Clean the buffers for next stage
self._kept_lines.clear()
self._newline_count = 0
else:
print(message)
self._stage_active = True
def set_progress(self, percentage: float):
def progress_bar(
self,
cur_iter: int,
total_iters: int,
prefix: str = "",
suffix: str = "",
bar_length: int = 30,
):
"""
Call in a loop to print a progress bar overwriting itself in place.
If terminal doesn't support ANSI escape codes, no overwriting happens.
"""
Set the progress of long-running operations to a specific percentage.
Overwrite this method in a custom logger to implement e.g. a progress bar.
Percentage is a float between 0 and 100.
"""
pass
filled = int(bar_length * cur_iter // total_iters)
if filled == bar_length:
bar = "=" * bar_length
elif filled == 0:
bar = " " * bar_length
else:
bar = f"{'=' * (filled - 1)}>{' ' * (bar_length - filled)}"
percent = f"{100 * (cur_iter / float(total_iters)):.1f}"
self.print(
f"\r{self.ansi_clear}{prefix}[{bar}] {percent:>5}%{suffix} ",
end="\n" if not self._smart_features or cur_iter == total_iters else "",
flush=True,
)
def set_logger(self, new_logger):
self.__class__ = new_logger.__class__

View File

@@ -406,7 +406,7 @@ class ESP32ROM(ESPLoader):
# regardless of how many bytes were actually read from flash
data += r[:block_len]
if progress_fn and (len(data) % 1024 == 0 or len(data) == length):
progress_fn(len(data), length)
progress_fn(len(data), length, offset)
return data
def get_rom_cal_crystal_freq(self):
@@ -430,7 +430,7 @@ class ESP32ROM(ESPLoader):
valid_freq = 40000000 if rom_calculated_freq > 33000000 else 26000000
false_rom_baud = int(baud * rom_calculated_freq // valid_freq)
log.print(f"Changing baud rate to {baud}")
log.print(f"Changing baud rate to {baud}...")
self.command(
self.ESP_CMDS["CHANGE_BAUDRATE"], struct.pack("<II", false_rom_baud, 0)
)

View File

@@ -116,7 +116,7 @@ class ESP32C2ROM(ESP32C3ROM):
# a 26 MHz XTAL.
false_rom_baud = baud * 40 // 26
log.print(f"Changing baud rate to {baud}")
log.print(f"Changing baud rate to {baud}...")
self.command(
self.ESP_CMDS["CHANGE_BAUDRATE"],
struct.pack("<II", false_rom_baud, 0),

View File

@@ -133,7 +133,7 @@ class ESP32C5ROM(ESP32C6ROM):
crystal_freq_detect = self.get_crystal_freq()
log.print(
f"ROM expects crystal freq: {crystal_freq_rom_expect} MHz, "
f"detected {crystal_freq_detect} MHz"
f"detected {crystal_freq_detect} MHz."
)
baud_rate = baud
# If detect the XTAL is 48MHz, but the ROM code expects it to be 40MHz
@@ -146,7 +146,7 @@ class ESP32C5ROM(ESP32C6ROM):
ESPLoader.change_baud(self, baud_rate)
return
log.print(f"Changing baud rate to {baud_rate}")
log.print(f"Changing baud rate to {baud_rate}...")
self.command(
self.ESP_CMDS["CHANGE_BAUDRATE"], struct.pack("<II", baud_rate, 0)
)

View File

@@ -803,7 +803,7 @@ class TestFlashing(EsptoolTestCase):
output = self.run_esptool(
"--before no_reset --after no_reset_stub flash-id", preload=False
)
assert "Stub is already running. No upload is necessary." in output
assert "Stub flasher is already running. No upload is necessary." in output
sleep(10) # Wait if RTC WDT triggers
@@ -1145,8 +1145,8 @@ class TestVerifyCommand(EsptoolTestCase):
def test_verify_failure(self):
self.run_esptool("write-flash 0x6000 images/sector.bin")
output = self.run_esptool_error("verify-flash --diff 0x6000 images/one_kb.bin")
assert "verify FAILED" in output
assert "first at 0x00006000" in output
assert "Verification failed:" in output
assert "first at 0x00006000:" in output
def test_verify_unaligned_length(self):
self.run_esptool("write-flash 0x0 images/not_4_byte_aligned.bin")
@@ -1177,7 +1177,8 @@ class TestMemoryOperations(EsptoolTestCase):
@pytest.mark.quick_test
def test_memory_dump(self):
output = self.run_esptool("dump-mem 0x50000000 128 memout.bin")
assert "Successfully read 128 bytes" in output
assert "Dumped 128 bytes from 0x50000000" in output
assert "to 'memout.bin'" in output
os.remove("memout.bin")
def test_memory_write(self):
@@ -1381,7 +1382,7 @@ class TestAutoDetect(EsptoolTestCase):
if arg_chip not in ["esp8266", "esp32", "esp32s2"]:
assert "Unsupported detection protocol" not in output
assert f"Detecting chip type... {expected_chip_name}" in output
assert f"Chip is {expected_chip_name}" in output
assert f"{'Chip type:':<20}{expected_chip_name}" in output
@pytest.mark.quick_test
def test_auto_detect(self):
@@ -1402,7 +1403,8 @@ class TestUSBMode(EsptoolTestCase):
)
if expected_usb_mode:
assert f"USB mode: {expected_usb_mode}" in output
assert "USB mode: " in output
assert expected_usb_mode in output
@pytest.mark.flaky(reruns=5)
@@ -1757,7 +1759,7 @@ class TestESPObjectOperations(EsptoolTestCase):
read_mac(esp)
reset_chip(esp, "hard_reset")
output = fake_out.getvalue()
assert "Stub running..." in output
assert "Stub flasher running" in output
assert "MAC:" in output
@capture_stdout
@@ -1778,10 +1780,10 @@ class TestESPObjectOperations(EsptoolTestCase):
reset_chip(esp, "hard_reset")
os.remove("output.bin")
output = fake_out.getvalue()
assert "Stub running..." in output
assert "Hash of data verified." in output
assert "Stub flasher running" in output
assert "Hash of data verified" in output
assert "Read 9216 bytes" in output
assert "verify OK (digest matched)" in output
assert "Verification successful (digest matched)" in output
assert "Chip erase completed" in output
assert "Hard resetting" in output

View File

@@ -141,7 +141,7 @@ class TestImageInfo:
def test_application_info(self):
out = self.run_image_info("auto", "esp_idf_blink_esp32s2.bin")
assert "Application information" in out
assert "Application Information" in out
assert "Project name: blink" in out
assert "App version: qa-test-v5.0-20220830-4-g4532e6" in out
assert "Secure version: 0" in out
@@ -152,16 +152,16 @@ class TestImageInfo:
assert "ESP-IDF: v5.0-beta1-427-g4532e6e0b2-dirt" in out
# No application info in image
out = self.run_image_info("auto", "bootloader_esp32.bin")
assert "Application information" not in out
assert "Application Information" not in out
out = self.run_image_info("auto", ESP8266_BIN)
assert "Application information" not in out
assert "Application Information" not in out
def test_bootloader_info(self):
# This bootloader binary is built from "hello_world" project
# with default settings, IDF version is v5.2.
out = self.run_image_info("esp32", "bootloader_esp32_v5_2.bin")
assert "Image size: 26768 bytes" in out
assert "Bootloader information" in out
assert "Bootloader Information" in out
assert "Bootloader version: 1" in out
assert "ESP-IDF: v5.2-dev-254-g1950b15" in out
assert "Compile time: Apr 25 2023 00:13:32" in out
@@ -194,7 +194,7 @@ class TestImageInfo:
convert_bin2hex(file)
out = self.run_image_info("esp32", file)
assert "Image size: 26768 bytes" in out
assert "Bootloader information" in out
assert "Bootloader Information" in out
assert "Bootloader version: 1" in out
assert "ESP-IDF: v5.2-dev-254-g1950b15" in out
assert "Compile time: Apr 25 2023 00:13:32" in out

View File

@@ -29,16 +29,17 @@ class CustomLogger(TemplateLogger):
"""
pass
def print_overwrite(self, message: str, last_line: bool = False):
"""
Prints a message, overwriting the currently printed line.
"""
def stage(self, finish=False):
pass
def set_progress(self, percentage: float):
"""
Sets the progress of long-running operations to a specific percentage.
"""
def progress_bar(
self,
cur_iter: int,
total_iters: int,
prefix: str = "",
suffix: str = "",
bar_length: int = 30,
):
pass
@@ -52,7 +53,9 @@ class CustomLoggerIncomplete:
class TestLogger:
@pytest.fixture
def logger(self):
return EsptoolLogger()
log = EsptoolLogger()
log._set_smart_features(True)
return log
def test_singleton(self, logger):
logger2 = EsptoolLogger()
@@ -90,31 +93,40 @@ class TestLogger:
== f"{logger.ansi_red}This is an error{logger.ansi_normal}\n"
)
def test_print_overwrite_tty(self, logger):
with (
patch("sys.stdout", new=StringIO()) as fake_out,
patch("sys.stdout.isatty", return_value=True),
):
logger.print_overwrite("msg1", last_line=False)
logger.print_overwrite("msg2", last_line=True)
def test_stage(self, logger):
with patch("sys.stdout", new=StringIO()) as fake_out:
logger.stage()
assert logger._stage_active
logger.print("Line1")
logger.print("Line2")
logger.stage(finish=True)
assert not logger._stage_active
logger.print("Line3")
output = fake_out.getvalue()
assert "msg1\n" not in output # msg1 should not have a newline
assert f"\r{logger.ansi_clear}msg1" in output
assert f"\r{logger.ansi_clear}msg2\n" in output
assert f"{logger.ansi_line_up}{logger.ansi_line_clear}" * 2 in output
assert "Line1\nLine2\n" in output
assert "Line1\nLine2\nLine3\n" not in output
def test_print_overwrite_non_tty(self, logger):
with (
patch("sys.stdout", new=StringIO()) as fake_out,
patch("sys.stdout.isatty", return_value=False),
):
logger.print_overwrite("msg1", last_line=False)
logger.print_overwrite("msg2", last_line=True)
assert fake_out.getvalue() == "msg1\nmsg2\n" # Acting as a normal print()
def test_set_progress(self, logger):
logger.set_progress(50.0)
# Since set_progress is not implemented - just ensure it doesn't raise an error
assert True
def test_progress_bar(self, logger):
with patch("sys.stdout", new=StringIO()) as fake_out:
logger.progress_bar(
cur_iter=2,
total_iters=4,
prefix="Progress: ",
suffix=" (2/4)",
bar_length=10,
)
logger.progress_bar(
cur_iter=4,
total_iters=4,
prefix="Progress: ",
suffix=" (4/4)",
bar_length=10,
)
output = fake_out.getvalue()
assert "Progress: [====> ] 50.0% (2/4)" in output
assert "Progress: [==========] 100.0% (4/4) \n" in output
def test_set_incomplete_logger(self, logger):
with pytest.raises(