mirror of
https://github.com/espressif/esptool.git
synced 2025-10-15 21:07:25 +08:00
feat(logging): Add collapsible output stages and ASCII progress bars
This commit is contained in:
@@ -97,7 +97,7 @@ Complete list of configurable options:
|
||||
+------------------------------+-----------------------------------------------------------+----------+
|
||||
| erase_write_timeout_per_mb | Timeout (per megabyte) for erasing and writing data | 40 s |
|
||||
+------------------------------+-----------------------------------------------------------+----------+
|
||||
| mem_end_rom_timeout | Short timeout for ESP_MEM_END | 0.2 s |
|
||||
| mem_end_rom_timeout | Short timeout for MEM_END | 0.2 s |
|
||||
+------------------------------+-----------------------------------------------------------+----------+
|
||||
| serial_write_timeout | Timeout for serial port write | 10 s |
|
||||
+------------------------------+-----------------------------------------------------------+----------+
|
||||
|
@@ -295,7 +295,7 @@ Esptool allows redirecting output by implementing a custom logger class. This ca
|
||||
log_to_file = True
|
||||
log_file = "esptool.log"
|
||||
|
||||
def print(self, message, *args, **kwargs):
|
||||
def print(self, message="", *args, **kwargs):
|
||||
# Print to console
|
||||
print(f"[CustomLogger]: {message}", *args, **kwargs)
|
||||
# Optionally log to a file
|
||||
@@ -312,14 +312,22 @@ Esptool allows redirecting output by implementing a custom logger class. This ca
|
||||
def error(self, message):
|
||||
self.print(message, file=sys.stderr)
|
||||
|
||||
def print_overwrite(self, message, last_line=False):
|
||||
# Overwriting not needed, print normally
|
||||
self.print(message)
|
||||
|
||||
def set_progress(self, percentage):
|
||||
# Progress updates not needed, pass
|
||||
def stage(self, finish=False):
|
||||
# Collapsible stages not needed in this example
|
||||
pass
|
||||
|
||||
def progress_bar(
|
||||
self,
|
||||
cur_iter,
|
||||
total_iters,
|
||||
prefix = "",
|
||||
suffix = "",
|
||||
bar_length: int = 30,
|
||||
):
|
||||
# Progress bars replaced with simple percentage output in this example
|
||||
percent = f"{100 * (cur_iter / float(total_iters)):.1f}"
|
||||
self.print(f"Finished: {percent}%")
|
||||
|
||||
# Replace the default logger with the custom logger
|
||||
log.set_logger(CustomLogger())
|
||||
|
||||
@@ -334,11 +342,11 @@ To ensure compatibility with esptool, the custom logger should re-implement (or
|
||||
- ``note``: Logs informational messages.
|
||||
- ``warning``: Logs warning messages.
|
||||
- ``error``: Logs error messages.
|
||||
- ``print_overwrite``: Handles message overwriting (can be a simple ``print()`` if overwriting is not needed).
|
||||
- ``set_progress``: Handles percentage updates of long-running operations - ``write-flash``, ``read-flash``, and ``dump-mem`` (useful for GUI visualisation, e.g. as a progress bar).
|
||||
- ``stage``: Starts or ends a collapsible output stage.
|
||||
- ``progress_bar``: Displays a progress bar.
|
||||
|
||||
.. autoclass:: esptool.logger.EsptoolLogger
|
||||
:members: print, note, warning, error, print_overwrite, set_progress
|
||||
:members: print, note, warning, error, stage, progress_bar
|
||||
:member-order: bysource
|
||||
|
||||
These methods are essential for maintaining proper integration and behavior with esptool. Additionally, all calls to the logger should be made using ``log.print()`` (or the respective method, such as ``log.info()`` or ``log.warning()``) instead of the standard ``print()`` function to ensure the output is routed through the custom logger. This ensures consistency and allows the custom logger to handle all output appropriately. You can further customize this logger to fit your application's needs, such as integrating with GUI components or advanced logging frameworks.
|
||||
These methods are essential for maintaining proper integration and behavior with esptool. Additionally, all output printing should be made using ``log.print()`` (or the respective method, such as ``log.info()`` or ``log.warning()``) instead of the standard ``print()`` function to ensure the output is routed through the custom logger. This ensures consistency and allows the custom logger to handle all output appropriately. You can further customize this logger to fit your application's needs, such as integrating with GUI components or advanced logging frameworks.
|
||||
|
@@ -278,7 +278,8 @@ def check_flash_size(esp: ESPLoader, address: int, size: int) -> None:
|
||||
cls=Group,
|
||||
no_args_is_help=True,
|
||||
context_settings=dict(help_option_names=["-h", "--help"], max_content_width=120),
|
||||
help=f"esptool.py v{__version__} - Espressif chips ROM Bootloader Utility",
|
||||
help=f"esptool.py v{__version__} - Espressif SoCs flashing, debugging, "
|
||||
"and provisioning utility",
|
||||
)
|
||||
@click.option(
|
||||
"--chip",
|
||||
@@ -376,6 +377,12 @@ def prepare_esp_object(ctx):
|
||||
# 1) Get the ESP object
|
||||
#######################
|
||||
|
||||
# Disable output stage collapsing, colors, and overwriting in trace mode
|
||||
if ctx.obj["trace"]:
|
||||
log._smart_features = False
|
||||
|
||||
log.stage()
|
||||
|
||||
if ctx.obj["before"] != "no_reset_no_sync":
|
||||
initial_baud = min(
|
||||
ESPLoader.ESP_ROM_BAUD, ctx.obj["baud"]
|
||||
@@ -430,19 +437,23 @@ def prepare_esp_object(ctx):
|
||||
f"on any of the {len(ser_list)} available serial ports."
|
||||
)
|
||||
|
||||
log.stage(finish=True)
|
||||
log.print(f"Connected to {esp.CHIP_NAME} on {esp._port.port}:")
|
||||
|
||||
# 2) Print the chip info
|
||||
########################
|
||||
|
||||
if esp.secure_download_mode:
|
||||
log.print(f"Chip is {esp.CHIP_NAME} in Secure Download Mode")
|
||||
else:
|
||||
log.print(f"Chip is {esp.get_chip_description()}")
|
||||
log.print(f"Features: {', '.join(esp.get_chip_features())}")
|
||||
log.print(f"Crystal is {esp.get_crystal_freq()}MHz")
|
||||
log.print(f"{'Chip type:':<20}{esp.get_chip_description()}")
|
||||
log.print(f"{'Features:':<20}{', '.join(esp.get_chip_features())}")
|
||||
log.print(f"{'Crystal frequency:':<20}{esp.get_crystal_freq()}MHz")
|
||||
usb_mode = esp.get_usb_mode()
|
||||
if usb_mode is not None:
|
||||
log.print(f"USB mode: {usb_mode}")
|
||||
log.print(f"{'USB mode:':<20}{usb_mode}")
|
||||
read_mac(esp)
|
||||
log.print()
|
||||
|
||||
# 3) Upload the stub flasher
|
||||
############################
|
||||
@@ -488,6 +499,7 @@ def prepare_esp_object(ctx):
|
||||
|
||||
# 8) Reset the chip
|
||||
###################
|
||||
log.print()
|
||||
# Handle post-operation behaviour (reset or other)
|
||||
if ctx.obj["invoked_subcommand"] == "load-ram":
|
||||
# the ESP is now running the loaded image, so let it run
|
||||
|
167
esptool/cmds.py
167
esptool/cmds.py
@@ -202,25 +202,34 @@ def load_ram(esp: ESPLoader, input: ImageSource) -> None:
|
||||
input: Path to the firmware image file, opened file-like object,
|
||||
or the image data as bytes.
|
||||
"""
|
||||
data, _ = get_bytes(input)
|
||||
data, source = get_bytes(input)
|
||||
image = LoadFirmwareImage(esp.CHIP_NAME, data)
|
||||
|
||||
log.print("RAM boot...")
|
||||
for seg in image.segments:
|
||||
log.stage()
|
||||
source = "image" if source is None else f"'{source}'"
|
||||
log.print(f"Loading {source} to RAM...")
|
||||
for i, seg in enumerate(image.segments, start=1):
|
||||
size = len(seg.data)
|
||||
log.print(f"Downloading {size} bytes at {seg.addr:08x}...", end=" ", flush=True)
|
||||
log.progress_bar(
|
||||
cur_iter=i,
|
||||
total_iters=len(image.segments),
|
||||
prefix=f"Downloading {size} bytes at {seg.addr:#010x} ",
|
||||
suffix="...",
|
||||
)
|
||||
|
||||
esp.mem_begin(
|
||||
size, div_roundup(size, esp.ESP_RAM_BLOCK), esp.ESP_RAM_BLOCK, seg.addr
|
||||
)
|
||||
|
||||
seq = 0
|
||||
while len(seg.data) > 0:
|
||||
esp.mem_block(seg.data[0 : esp.ESP_RAM_BLOCK], seq)
|
||||
seg.data = seg.data[esp.ESP_RAM_BLOCK :]
|
||||
seq += 1
|
||||
log.print("done!")
|
||||
|
||||
log.print(f"All segments done, executing at {image.entrypoint:08x}")
|
||||
log.stage(finish=True)
|
||||
log.print(
|
||||
f"Loaded {len(image.segments)} segments from {source} to RAM, "
|
||||
f"executing at {image.entrypoint:#010x}."
|
||||
)
|
||||
esp.mem_finish(image.entrypoint)
|
||||
|
||||
|
||||
@@ -265,32 +274,40 @@ def dump_mem(
|
||||
Memory dump as bytes if output is None;
|
||||
otherwise, returns None after writing to file.
|
||||
"""
|
||||
data = io.BytesIO() # Use BytesIO to store the memory dump
|
||||
log.stage()
|
||||
log.print(
|
||||
f"Dumping {size} bytes from {address:#010x}"
|
||||
+ (f" to file '{output}'..." if output else "...")
|
||||
)
|
||||
data = io.BytesIO() # Use BytesIO to store the memory dump
|
||||
log.set_progress(0)
|
||||
|
||||
t = time.time()
|
||||
# Read the memory in 4-byte chunks.
|
||||
for i in range(size // 4):
|
||||
d = esp.read_reg(address + (i * 4))
|
||||
cur_addr = address + (i * 4)
|
||||
d = esp.read_reg(cur_addr)
|
||||
data.write(struct.pack("<I", d)) # Write 4 bytes to BytesIO
|
||||
# Update progress every 1024 bytes.
|
||||
if data.tell() % 1024 == 0:
|
||||
percent = data.tell() * 100 // size
|
||||
log.set_progress(percent)
|
||||
log.print_overwrite(f"{data.tell()} bytes read... ({percent}%)")
|
||||
|
||||
log.print_overwrite(f"Successfully read {data.tell()} bytes.", last_line=True)
|
||||
|
||||
cur = data.tell()
|
||||
if cur % 1024 == 0 or cur == size:
|
||||
log.progress_bar(
|
||||
cur_iter=data.tell(),
|
||||
total_iters=size,
|
||||
prefix=f"Dumping from {cur_addr:#010x} ",
|
||||
suffix=f" {cur}/{size} bytes...",
|
||||
)
|
||||
t = time.time() - t
|
||||
speed_msg = " ({:.1f} kbit/s)".format(data.tell() / t * 8 / 1000) if t > 0.0 else ""
|
||||
dest_msg = f" to '{output}'" if output else ""
|
||||
log.stage(finish=True)
|
||||
log.print(
|
||||
f"Dumped {data.tell()} bytes from {address:#010x} in {t:.1f} seconds"
|
||||
f"{speed_msg}{dest_msg}."
|
||||
)
|
||||
if output:
|
||||
with open(output, "wb") as f:
|
||||
f.write(data.getvalue())
|
||||
log.print(f"Memory dump to '{output}' completed.")
|
||||
return None
|
||||
else:
|
||||
log.print("Memory dump completed.")
|
||||
return data.getvalue()
|
||||
|
||||
|
||||
@@ -753,36 +770,36 @@ def write_flash(
|
||||
if compress:
|
||||
uncimage = image
|
||||
image = zlib.compress(uncimage, 9)
|
||||
compsize = len(image)
|
||||
original_image = image # Save the whole image in case retry is needed
|
||||
# Try again if reconnect was successful
|
||||
log.stage()
|
||||
for attempt in range(1, esp.WRITE_FLASH_ATTEMPTS + 1):
|
||||
try:
|
||||
if compress:
|
||||
# Decompress the compressed binary a block at a time,
|
||||
# to dynamically calculate the timeout based on the real write size
|
||||
decompress = zlib.decompressobj()
|
||||
blocks = esp.flash_defl_begin(uncsize, len(image), address)
|
||||
esp.flash_defl_begin(uncsize, compsize, address)
|
||||
else:
|
||||
blocks = esp.flash_begin(
|
||||
uncsize, address, begin_rom_encrypted=encrypted
|
||||
)
|
||||
esp.flash_begin(uncsize, address, begin_rom_encrypted=encrypted)
|
||||
seq = 0
|
||||
bytes_sent = 0 # bytes sent on wire
|
||||
bytes_written = 0 # bytes written to flash
|
||||
t = time.time()
|
||||
|
||||
timeout = DEFAULT_TIMEOUT
|
||||
|
||||
log.set_progress(0)
|
||||
|
||||
while len(image) > 0:
|
||||
percent = 100 * (seq + 1) // blocks
|
||||
log.set_progress(percent)
|
||||
image_size = compsize if compress else uncsize
|
||||
while len(image) >= 0:
|
||||
if not no_progress:
|
||||
log.print_overwrite(
|
||||
f"Writing at {address + bytes_written:010x}... "
|
||||
f"({percent} %)"
|
||||
log.progress_bar(
|
||||
cur_iter=image_size - len(image),
|
||||
total_iters=image_size,
|
||||
prefix=f"Writing at {address + bytes_written:#010x} ",
|
||||
suffix=f" {bytes_sent}/{image_size} bytes...",
|
||||
)
|
||||
if len(image) == 0: # All data sent, print 100% progress and end
|
||||
break
|
||||
block = image[0 : esp.FLASH_WRITE_SIZE]
|
||||
if compress:
|
||||
# feeding each compressed block into the decompressor lets us
|
||||
@@ -849,21 +866,20 @@ def write_flash(
|
||||
|
||||
t = time.time() - t
|
||||
speed_msg = ""
|
||||
log.stage(finish=True)
|
||||
if compress:
|
||||
if t > 0.0:
|
||||
speed_msg = f" (effective {uncsize / t * 8 / 1000:.1f} kbit/s)"
|
||||
log.print_overwrite(
|
||||
f"Wrote {uncsize} bytes ({bytes_sent} compressed) at {address:#010x} "
|
||||
f"in {t:.1f} seconds{speed_msg}...",
|
||||
last_line=True,
|
||||
speed_msg = f" ({uncsize / t * 8 / 1000:.1f} kbit/s)"
|
||||
log.print(
|
||||
f"Wrote {uncsize} bytes ({bytes_sent} compressed) "
|
||||
f"at {address:#010x} in {t:.1f} seconds{speed_msg}."
|
||||
)
|
||||
else:
|
||||
if t > 0.0:
|
||||
speed_msg = " (%.1f kbit/s)" % (bytes_written / t * 8 / 1000)
|
||||
log.print_overwrite(
|
||||
log.print(
|
||||
f"Wrote {bytes_written} bytes at {address:#010x} in {t:.1f} "
|
||||
f"seconds{speed_msg}...",
|
||||
last_line=True,
|
||||
f"seconds{speed_msg}."
|
||||
)
|
||||
|
||||
if not encrypted and not esp.secure_download_mode:
|
||||
@@ -912,7 +928,7 @@ def read_mac(esp: ESPLoader) -> None:
|
||||
"""
|
||||
|
||||
def print_mac(label, mac):
|
||||
log.print(f"{label}: {':'.join(f'{x:02x}' for x in mac)}")
|
||||
log.print(f"{label + ':':<20}{':'.join(f'{x:02x}' for x in mac)}")
|
||||
|
||||
eui64 = esp.read_mac("EUI64")
|
||||
if eui64:
|
||||
@@ -1232,6 +1248,10 @@ def flash_id(esp: ESPLoader) -> None:
|
||||
Args:
|
||||
esp: Initiated esp object connected to a real device.
|
||||
"""
|
||||
log.print()
|
||||
title = "Flash Memory Information:"
|
||||
log.print(title)
|
||||
log.print("=" * len(title))
|
||||
print_flash_id(esp)
|
||||
flash_type = esp.flash_type()
|
||||
flash_type_dict = {0: "quad (4 data lines)", 1: "octal (8 data lines)"}
|
||||
@@ -1299,34 +1319,30 @@ def read_flash(
|
||||
flash_progress = None
|
||||
else:
|
||||
|
||||
def flash_progress(progress, length):
|
||||
percent = progress * 100.0 / length
|
||||
log.set_progress(percent)
|
||||
def flash_progress(progress, length, offset):
|
||||
log.progress_bar(
|
||||
cur_iter=progress,
|
||||
total_iters=length,
|
||||
prefix=f"Reading from {offset + progress:#010x} ",
|
||||
suffix=f" {progress}/{length} bytes...",
|
||||
)
|
||||
|
||||
msg = f"{progress} ({percent:.0f} %)"
|
||||
padding = "\b" * len(msg)
|
||||
|
||||
if progress != length:
|
||||
log.print_overwrite(f"{msg}{padding}")
|
||||
else:
|
||||
log.print_overwrite(f"{msg}", last_line=True)
|
||||
|
||||
log.set_progress(0)
|
||||
log.stage()
|
||||
t = time.time()
|
||||
data = esp.read_flash(address, size, flash_progress)
|
||||
t = time.time() - t
|
||||
speed_msg = " ({:.1f} kbit/s)".format(len(data) / t * 8 / 1000) if t > 0.0 else ""
|
||||
log.print_overwrite(
|
||||
f"Read {len(data)} bytes at {address:#010x} in {t:.1f} seconds{speed_msg}...",
|
||||
last_line=True,
|
||||
dest_msg = f" to '{output}'" if output else ""
|
||||
log.stage(finish=True)
|
||||
log.print(
|
||||
f"Read {len(data)} bytes from {address:#010x} in {t:.1f} seconds"
|
||||
f"{speed_msg}{dest_msg}."
|
||||
)
|
||||
if output:
|
||||
with open(output, "wb") as f:
|
||||
f.write(data)
|
||||
log.print(f"Flash read to '{output}' completed.")
|
||||
return None
|
||||
else:
|
||||
log.print("Flash read completed.")
|
||||
return data
|
||||
|
||||
|
||||
@@ -1373,27 +1389,27 @@ def verify_flash(
|
||||
digest = esp.flash_md5sum(address, image_size)
|
||||
expected_digest = hashlib.md5(image).hexdigest()
|
||||
if digest == expected_digest:
|
||||
log.print("-- verify OK (digest matched)")
|
||||
log.print("Verification successful (digest matched).")
|
||||
continue
|
||||
else:
|
||||
mismatch = True
|
||||
if not diff:
|
||||
log.print("-- verify FAILED (digest mismatch)")
|
||||
log.print("Verification failed (digest mismatch)")
|
||||
continue
|
||||
|
||||
flash = esp.read_flash(address, image_size)
|
||||
assert flash != image
|
||||
differences = [i for i in range(image_size) if flash[i] != image[i]]
|
||||
log.print(
|
||||
f"-- verify FAILED: {len(differences)} differences, "
|
||||
f"first at {address + differences[0]:#010x}"
|
||||
f"Verification failed: {len(differences)} differences, "
|
||||
f"first at {address + differences[0]:#010x}:"
|
||||
)
|
||||
for d in differences:
|
||||
flash_byte = flash[d]
|
||||
image_byte = image[d]
|
||||
log.print(f" {address + d:08x} {flash_byte:02x} {image_byte:02x}")
|
||||
log.print(f" {address + d:#010x} {flash_byte:02x} {image_byte:02x}")
|
||||
if mismatch:
|
||||
raise FatalError("Verify failed.")
|
||||
raise FatalError("Verification failed.")
|
||||
|
||||
|
||||
def read_flash_status(esp: ESPLoader, bytes: int = 2) -> None:
|
||||
@@ -1757,15 +1773,8 @@ def image_info(input: ImageSource, chip: str | None = None) -> None:
|
||||
|
||||
image = LoadFirmwareImage(chip, data)
|
||||
|
||||
def get_key_from_value(dict, val):
|
||||
"""Get key from value in dictionary"""
|
||||
for key, value in dict.items():
|
||||
if value == val:
|
||||
return key
|
||||
return None
|
||||
|
||||
log.print()
|
||||
title = f"{chip.upper()} image header".format()
|
||||
title = f"{chip.upper()} Image Header"
|
||||
log.print(title)
|
||||
log.print("=" * len(title))
|
||||
log.print(f"Image version: {image.version}")
|
||||
@@ -1803,7 +1812,7 @@ def image_info(input: ImageSource, chip: str | None = None) -> None:
|
||||
# Extended header (ESP32 and later only)
|
||||
if chip != "esp8266":
|
||||
log.print()
|
||||
title = f"{chip.upper()} extended image header"
|
||||
title = f"{chip.upper()} Extended Image Header"
|
||||
log.print(title)
|
||||
log.print("=" * len(title))
|
||||
log.print(
|
||||
@@ -1843,7 +1852,7 @@ def image_info(input: ImageSource, chip: str | None = None) -> None:
|
||||
log.print()
|
||||
|
||||
# Segments overview
|
||||
title = "Segments information"
|
||||
title = "Segments Information"
|
||||
log.print(title)
|
||||
log.print("=" * len(title))
|
||||
headers_str = "{:>7} {:>7} {:>10} {:>10} {:10}"
|
||||
@@ -1872,7 +1881,7 @@ def image_info(input: ImageSource, chip: str | None = None) -> None:
|
||||
log.print()
|
||||
|
||||
# Footer
|
||||
title = f"{chip.upper()} image footer"
|
||||
title = f"{chip.upper()} Image Footer"
|
||||
log.print(title)
|
||||
log.print("=" * len(title))
|
||||
calc_checksum = image.calculate_checksum()
|
||||
@@ -1902,7 +1911,7 @@ def image_info(input: ImageSource, chip: str | None = None) -> None:
|
||||
app_desc = _parse_app_info(app_desc_seg)
|
||||
if app_desc:
|
||||
log.print()
|
||||
title = "Application information"
|
||||
title = "Application Information"
|
||||
log.print(title)
|
||||
log.print("=" * len(title))
|
||||
log.print(f"Project name: {app_desc['project_name']}")
|
||||
@@ -1924,7 +1933,7 @@ def image_info(input: ImageSource, chip: str | None = None) -> None:
|
||||
bootloader_desc = _parse_bootloader_info(bootloader_desc_seg)
|
||||
if bootloader_desc:
|
||||
log.print()
|
||||
title = "Bootloader information"
|
||||
title = "Bootloader Information"
|
||||
log.print(title)
|
||||
log.print("=" * len(title))
|
||||
log.print(f"Bootloader version: {bootloader_desc['version']}")
|
||||
|
@@ -978,7 +978,7 @@ class ESPLoader(object):
|
||||
timeout=timeout,
|
||||
)
|
||||
if size != 0 and not self.IS_STUB and logging:
|
||||
log.print(f"Took {time.time() - t:.2f}s to erase flash block")
|
||||
log.print(f"Took {time.time() - t:.2f}s to erase flash block.")
|
||||
return num_blocks
|
||||
|
||||
def flash_block(self, data, seq, timeout=DEFAULT_TIMEOUT):
|
||||
@@ -997,7 +997,7 @@ class ESPLoader(object):
|
||||
if attempts_left:
|
||||
self.trace(
|
||||
"Block write failed, "
|
||||
f"retrying with {attempts_left} attempts left"
|
||||
f"retrying with {attempts_left} attempts left..."
|
||||
)
|
||||
else:
|
||||
raise
|
||||
@@ -1177,15 +1177,17 @@ class ESPLoader(object):
|
||||
)
|
||||
|
||||
def run_stub(self, stub: StubFlasher | None = None) -> "ESPLoader":
|
||||
log.stage()
|
||||
if stub is None:
|
||||
stub = StubFlasher(self.CHIP_NAME)
|
||||
|
||||
if self.sync_stub_detected:
|
||||
log.print("Stub is already running. No upload is necessary.")
|
||||
log.stage(finish=True)
|
||||
log.print("Stub flasher is already running. No upload is necessary.")
|
||||
return self.STUB_CLASS(self) if self.STUB_CLASS is not None else self
|
||||
|
||||
# Upload
|
||||
log.print("Uploading stub...")
|
||||
log.print("Uploading stub flasher...")
|
||||
for field in [stub.text, stub.data]:
|
||||
if field is not None:
|
||||
offs = stub.text_start if field == stub.text else stub.data_start
|
||||
@@ -1196,20 +1198,21 @@ class ESPLoader(object):
|
||||
from_offs = seq * self.ESP_RAM_BLOCK
|
||||
to_offs = from_offs + self.ESP_RAM_BLOCK
|
||||
self.mem_block(field[from_offs:to_offs], seq)
|
||||
log.print("Running stub...")
|
||||
log.print("Running stub flasher...")
|
||||
self.mem_finish(stub.entry)
|
||||
try:
|
||||
p = self.read()
|
||||
except StopIteration:
|
||||
raise FatalError(
|
||||
"Failed to start stub. There was no response."
|
||||
"Failed to start stub flasher. There was no response."
|
||||
"\nTry increasing timeouts, for more information see: "
|
||||
"https://docs.espressif.com/projects/esptool/en/latest/esptool/configuration-file.html" # noqa E501
|
||||
)
|
||||
|
||||
if p != b"OHAI":
|
||||
raise FatalError(f"Failed to start stub. Unexpected response: {p}")
|
||||
log.print("Stub running...")
|
||||
raise FatalError(f"Failed to start stub flasher. Unexpected response: {p}")
|
||||
log.stage(finish=True)
|
||||
log.print("Stub flasher running.")
|
||||
return self.STUB_CLASS(self) if self.STUB_CLASS is not None else self
|
||||
|
||||
@stub_and_esp32_function_only
|
||||
@@ -1251,7 +1254,7 @@ class ESPLoader(object):
|
||||
)
|
||||
if size != 0 and not self.IS_STUB:
|
||||
# (stub erases as it writes, but ROM loaders erase on begin)
|
||||
log.print(f"Took {time.time() - t:.2f}s to erase flash block")
|
||||
log.print(f"Took {time.time() - t:.2f}s to erase flash block.")
|
||||
return num_blocks
|
||||
|
||||
@stub_and_esp32_function_only
|
||||
@@ -1310,7 +1313,7 @@ class ESPLoader(object):
|
||||
|
||||
@stub_and_esp32_function_only
|
||||
def change_baud(self, baud):
|
||||
log.print(f"Changing baud rate to {baud}")
|
||||
log.print(f"Changing baud rate to {baud}...")
|
||||
# stub takes the new baud rate and the old one
|
||||
second_arg = self._port.baudrate if self.IS_STUB else 0
|
||||
self.command(
|
||||
@@ -1356,25 +1359,26 @@ class ESPLoader(object):
|
||||
while len(data) < length:
|
||||
p = self.read()
|
||||
data += p
|
||||
if len(data) < length and len(p) < self.FLASH_SECTOR_SIZE:
|
||||
data_len = len(data)
|
||||
if data_len < length and len(p) < self.FLASH_SECTOR_SIZE:
|
||||
raise FatalError(
|
||||
"Corrupt data, expected 0x%x bytes but received 0x%x bytes"
|
||||
% (self.FLASH_SECTOR_SIZE, len(p))
|
||||
f"Corrupt data, expected {self.FLASH_SECTOR_SIZE:#x} "
|
||||
f"bytes but received {len(p):#x} bytes"
|
||||
)
|
||||
self.write(struct.pack("<I", len(data)))
|
||||
if progress_fn and (len(data) % 1024 == 0 or len(data) == length):
|
||||
progress_fn(len(data), length)
|
||||
self.write(struct.pack("<I", data_len))
|
||||
if progress_fn and (data_len % 1024 == 0 or data_len == length):
|
||||
progress_fn(data_len, length, offset)
|
||||
if len(data) > length:
|
||||
raise FatalError("Read more than expected")
|
||||
|
||||
digest_frame = self.read()
|
||||
if len(digest_frame) != 16:
|
||||
raise FatalError("Expected digest, got: %s" % hexify(digest_frame))
|
||||
raise FatalError(f"Expected digest, got: {hexify(digest_frame)}")
|
||||
expected_digest = hexify(digest_frame).upper()
|
||||
digest = hashlib.md5(data).hexdigest().upper()
|
||||
if digest != expected_digest:
|
||||
raise FatalError(
|
||||
"Digest mismatch: expected %s, got %s" % (expected_digest, digest)
|
||||
f"Digest mismatch: expected {expected_digest}, got {digest}"
|
||||
)
|
||||
return data
|
||||
|
||||
|
@@ -4,6 +4,7 @@
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
import sys
|
||||
import os
|
||||
|
||||
|
||||
class TemplateLogger(ABC):
|
||||
@@ -36,26 +37,41 @@ class TemplateLogger(ABC):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def print_overwrite(self, message: str, last_line: bool = False):
|
||||
def stage(self, finish: bool = False):
|
||||
"""
|
||||
Print a message, overwriting the currently printed line.
|
||||
Start or finish a new collapsible stage.
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def set_progress(self, percentage: float):
|
||||
def progress_bar(
|
||||
self,
|
||||
cur_iter: int,
|
||||
total_iters: int,
|
||||
prefix: str = "",
|
||||
suffix: str = "",
|
||||
bar_length: int = 30,
|
||||
):
|
||||
"""
|
||||
Set the progress of long-running operations to a specific percentage.
|
||||
Print a progress bar.
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class EsptoolLogger(TemplateLogger):
|
||||
ansi_red = "\033[1;31m"
|
||||
ansi_yellow = "\033[0;33m"
|
||||
ansi_blue = "\033[0;34m"
|
||||
ansi_normal = "\033[0m"
|
||||
ansi_clear = "\033[K"
|
||||
ansi_red: str = ""
|
||||
ansi_yellow: str = ""
|
||||
ansi_blue: str = ""
|
||||
ansi_normal: str = ""
|
||||
ansi_clear: str = ""
|
||||
ansi_line_up: str = ""
|
||||
ansi_line_clear: str = ""
|
||||
|
||||
_stage_active: bool = False
|
||||
_newline_count: int = 0
|
||||
_kept_lines: list[str] = []
|
||||
|
||||
_smart_features: bool = False
|
||||
|
||||
def __new__(cls):
|
||||
"""
|
||||
@@ -63,6 +79,7 @@ class EsptoolLogger(TemplateLogger):
|
||||
"""
|
||||
if not hasattr(cls, "instance"):
|
||||
cls.instance = super(EsptoolLogger, cls).__new__(cls)
|
||||
cls.instance._set_smart_features()
|
||||
return cls.instance
|
||||
|
||||
@classmethod
|
||||
@@ -70,10 +87,63 @@ class EsptoolLogger(TemplateLogger):
|
||||
if hasattr(cls, "instance"):
|
||||
del cls.instance
|
||||
|
||||
@classmethod
|
||||
def _set_smart_features(cls, override: bool | None = None):
|
||||
# Check for smart terminal and color support
|
||||
if override is not None:
|
||||
cls.instance._smart_features = override
|
||||
else:
|
||||
is_tty = hasattr(sys.stdout, "isatty") and sys.stdout.isatty()
|
||||
term_supports_color = os.getenv("TERM", "").lower() in (
|
||||
"xterm",
|
||||
"xterm-256color",
|
||||
"screen",
|
||||
"screen-256color",
|
||||
"linux",
|
||||
"vt100",
|
||||
)
|
||||
no_color = os.getenv("NO_COLOR", "").strip().lower() in ("1", "true", "yes")
|
||||
|
||||
# Determine if colors should be enabled
|
||||
cls.instance._smart_features = (
|
||||
is_tty and term_supports_color and not no_color
|
||||
)
|
||||
# Handle Windows specifically
|
||||
if sys.platform == "win32" and cls.instance._smart_features:
|
||||
try:
|
||||
from colorama import init
|
||||
|
||||
init() # Enable ANSI support on Windows
|
||||
except ImportError:
|
||||
cls.instance._smart_features = False
|
||||
|
||||
if cls.instance._smart_features:
|
||||
cls.instance.ansi_red = "\033[1;31m"
|
||||
cls.instance.ansi_yellow = "\033[0;33m"
|
||||
cls.instance.ansi_blue = "\033[1;36m"
|
||||
cls.instance.ansi_normal = "\033[0m"
|
||||
cls.instance.ansi_clear = "\033[K"
|
||||
cls.instance.ansi_line_up = "\033[1A"
|
||||
cls.instance.ansi_line_clear = "\x1b[2K"
|
||||
else:
|
||||
cls.instance.ansi_red = ""
|
||||
cls.instance.ansi_yellow = ""
|
||||
cls.instance.ansi_blue = ""
|
||||
cls.instance.ansi_normal = ""
|
||||
cls.instance.ansi_clear = ""
|
||||
cls.instance.ansi_line_up = ""
|
||||
cls.instance.ansi_line_clear = ""
|
||||
|
||||
def print(self, *args, **kwargs):
|
||||
"""
|
||||
Log a plain message.
|
||||
Log a plain message. Count newlines if in a collapsing stage.
|
||||
"""
|
||||
if self._stage_active:
|
||||
# Count the number of newlines in the message
|
||||
message = "".join(map(str, args))
|
||||
self._newline_count += message.count("\n")
|
||||
if kwargs.get("end", "\n") == "\n":
|
||||
self._newline_count += 1
|
||||
print(*args, **kwargs)
|
||||
|
||||
def note(self, message: str):
|
||||
@@ -82,7 +152,9 @@ class EsptoolLogger(TemplateLogger):
|
||||
"""
|
||||
|
||||
formatted_message = f"{self.ansi_blue}Note:{self.ansi_normal} {message}"
|
||||
print(formatted_message)
|
||||
if self._stage_active:
|
||||
self._kept_lines.append(formatted_message)
|
||||
self.print(formatted_message)
|
||||
|
||||
def warning(self, message: str):
|
||||
"""
|
||||
@@ -90,7 +162,9 @@ class EsptoolLogger(TemplateLogger):
|
||||
"""
|
||||
|
||||
formatted_message = f"{self.ansi_yellow}Warning:{self.ansi_normal} {message}"
|
||||
print(formatted_message)
|
||||
if self._stage_active:
|
||||
self._kept_lines.append(formatted_message)
|
||||
self.print(formatted_message)
|
||||
|
||||
def error(self, message: str):
|
||||
"""
|
||||
@@ -98,38 +172,68 @@ class EsptoolLogger(TemplateLogger):
|
||||
"""
|
||||
|
||||
formatted_message = f"{self.ansi_red}{message}{self.ansi_normal}"
|
||||
print(formatted_message, file=sys.stderr)
|
||||
self.print(formatted_message, file=sys.stderr)
|
||||
|
||||
def print_overwrite(self, message: str, last_line: bool = False):
|
||||
def stage(self, finish: bool = False):
|
||||
"""
|
||||
Print a message, overwriting the currently printed line.
|
||||
|
||||
If last_line is False, don't append a newline at the end
|
||||
(expecting another subsequent call will overwrite this one).
|
||||
|
||||
After a sequence of calls with last_line=False, call once with last_line=True.
|
||||
|
||||
If output is not a TTY (for example redirected a pipe),
|
||||
no overwriting happens and this function is the same as print().
|
||||
Start or finish a collapsible stage.
|
||||
Any log messages printed between the start and finish will be deleted
|
||||
when the stage is successfully finished.
|
||||
Warnings and notes will be saved and printed at the end of the stage.
|
||||
If terminal doesn't support ANSI escape codes, no collapsing happens.
|
||||
"""
|
||||
if hasattr(sys.stdout, "isatty") and sys.stdout.isatty():
|
||||
# ansi_clear clears the line to prevent artifacts from previous lines
|
||||
print(
|
||||
f"\r{self.ansi_clear}{message}",
|
||||
end="\n" if last_line else "",
|
||||
flush=True,
|
||||
)
|
||||
|
||||
if finish:
|
||||
if not self._stage_active:
|
||||
return
|
||||
# Deactivate stage to stop collecting input
|
||||
self._stage_active = False
|
||||
|
||||
if self._smart_features:
|
||||
# Delete printed lines
|
||||
self.print(
|
||||
f"{self.ansi_line_up}{self.ansi_line_clear}"
|
||||
* (self._newline_count),
|
||||
end="",
|
||||
flush=True,
|
||||
)
|
||||
# Print saved warnings and notes
|
||||
for line in self._kept_lines:
|
||||
self.print(line)
|
||||
|
||||
# Clean the buffers for next stage
|
||||
self._kept_lines.clear()
|
||||
self._newline_count = 0
|
||||
else:
|
||||
print(message)
|
||||
self._stage_active = True
|
||||
|
||||
def set_progress(self, percentage: float):
|
||||
def progress_bar(
|
||||
self,
|
||||
cur_iter: int,
|
||||
total_iters: int,
|
||||
prefix: str = "",
|
||||
suffix: str = "",
|
||||
bar_length: int = 30,
|
||||
):
|
||||
"""
|
||||
Call in a loop to print a progress bar overwriting itself in place.
|
||||
If terminal doesn't support ANSI escape codes, no overwriting happens.
|
||||
"""
|
||||
Set the progress of long-running operations to a specific percentage.
|
||||
Overwrite this method in a custom logger to implement e.g. a progress bar.
|
||||
|
||||
Percentage is a float between 0 and 100.
|
||||
"""
|
||||
pass
|
||||
filled = int(bar_length * cur_iter // total_iters)
|
||||
if filled == bar_length:
|
||||
bar = "=" * bar_length
|
||||
elif filled == 0:
|
||||
bar = " " * bar_length
|
||||
else:
|
||||
bar = f"{'=' * (filled - 1)}>{' ' * (bar_length - filled)}"
|
||||
|
||||
percent = f"{100 * (cur_iter / float(total_iters)):.1f}"
|
||||
self.print(
|
||||
f"\r{self.ansi_clear}{prefix}[{bar}] {percent:>5}%{suffix} ",
|
||||
end="\n" if not self._smart_features or cur_iter == total_iters else "",
|
||||
flush=True,
|
||||
)
|
||||
|
||||
def set_logger(self, new_logger):
|
||||
self.__class__ = new_logger.__class__
|
||||
|
@@ -406,7 +406,7 @@ class ESP32ROM(ESPLoader):
|
||||
# regardless of how many bytes were actually read from flash
|
||||
data += r[:block_len]
|
||||
if progress_fn and (len(data) % 1024 == 0 or len(data) == length):
|
||||
progress_fn(len(data), length)
|
||||
progress_fn(len(data), length, offset)
|
||||
return data
|
||||
|
||||
def get_rom_cal_crystal_freq(self):
|
||||
@@ -430,7 +430,7 @@ class ESP32ROM(ESPLoader):
|
||||
valid_freq = 40000000 if rom_calculated_freq > 33000000 else 26000000
|
||||
false_rom_baud = int(baud * rom_calculated_freq // valid_freq)
|
||||
|
||||
log.print(f"Changing baud rate to {baud}")
|
||||
log.print(f"Changing baud rate to {baud}...")
|
||||
self.command(
|
||||
self.ESP_CMDS["CHANGE_BAUDRATE"], struct.pack("<II", false_rom_baud, 0)
|
||||
)
|
||||
|
@@ -116,7 +116,7 @@ class ESP32C2ROM(ESP32C3ROM):
|
||||
# a 26 MHz XTAL.
|
||||
false_rom_baud = baud * 40 // 26
|
||||
|
||||
log.print(f"Changing baud rate to {baud}")
|
||||
log.print(f"Changing baud rate to {baud}...")
|
||||
self.command(
|
||||
self.ESP_CMDS["CHANGE_BAUDRATE"],
|
||||
struct.pack("<II", false_rom_baud, 0),
|
||||
|
@@ -133,7 +133,7 @@ class ESP32C5ROM(ESP32C6ROM):
|
||||
crystal_freq_detect = self.get_crystal_freq()
|
||||
log.print(
|
||||
f"ROM expects crystal freq: {crystal_freq_rom_expect} MHz, "
|
||||
f"detected {crystal_freq_detect} MHz"
|
||||
f"detected {crystal_freq_detect} MHz."
|
||||
)
|
||||
baud_rate = baud
|
||||
# If detect the XTAL is 48MHz, but the ROM code expects it to be 40MHz
|
||||
@@ -146,7 +146,7 @@ class ESP32C5ROM(ESP32C6ROM):
|
||||
ESPLoader.change_baud(self, baud_rate)
|
||||
return
|
||||
|
||||
log.print(f"Changing baud rate to {baud_rate}")
|
||||
log.print(f"Changing baud rate to {baud_rate}...")
|
||||
self.command(
|
||||
self.ESP_CMDS["CHANGE_BAUDRATE"], struct.pack("<II", baud_rate, 0)
|
||||
)
|
||||
|
@@ -803,7 +803,7 @@ class TestFlashing(EsptoolTestCase):
|
||||
output = self.run_esptool(
|
||||
"--before no_reset --after no_reset_stub flash-id", preload=False
|
||||
)
|
||||
assert "Stub is already running. No upload is necessary." in output
|
||||
assert "Stub flasher is already running. No upload is necessary." in output
|
||||
|
||||
sleep(10) # Wait if RTC WDT triggers
|
||||
|
||||
@@ -1145,8 +1145,8 @@ class TestVerifyCommand(EsptoolTestCase):
|
||||
def test_verify_failure(self):
|
||||
self.run_esptool("write-flash 0x6000 images/sector.bin")
|
||||
output = self.run_esptool_error("verify-flash --diff 0x6000 images/one_kb.bin")
|
||||
assert "verify FAILED" in output
|
||||
assert "first at 0x00006000" in output
|
||||
assert "Verification failed:" in output
|
||||
assert "first at 0x00006000:" in output
|
||||
|
||||
def test_verify_unaligned_length(self):
|
||||
self.run_esptool("write-flash 0x0 images/not_4_byte_aligned.bin")
|
||||
@@ -1177,7 +1177,8 @@ class TestMemoryOperations(EsptoolTestCase):
|
||||
@pytest.mark.quick_test
|
||||
def test_memory_dump(self):
|
||||
output = self.run_esptool("dump-mem 0x50000000 128 memout.bin")
|
||||
assert "Successfully read 128 bytes" in output
|
||||
assert "Dumped 128 bytes from 0x50000000" in output
|
||||
assert "to 'memout.bin'" in output
|
||||
os.remove("memout.bin")
|
||||
|
||||
def test_memory_write(self):
|
||||
@@ -1381,7 +1382,7 @@ class TestAutoDetect(EsptoolTestCase):
|
||||
if arg_chip not in ["esp8266", "esp32", "esp32s2"]:
|
||||
assert "Unsupported detection protocol" not in output
|
||||
assert f"Detecting chip type... {expected_chip_name}" in output
|
||||
assert f"Chip is {expected_chip_name}" in output
|
||||
assert f"{'Chip type:':<20}{expected_chip_name}" in output
|
||||
|
||||
@pytest.mark.quick_test
|
||||
def test_auto_detect(self):
|
||||
@@ -1402,7 +1403,8 @@ class TestUSBMode(EsptoolTestCase):
|
||||
)
|
||||
|
||||
if expected_usb_mode:
|
||||
assert f"USB mode: {expected_usb_mode}" in output
|
||||
assert "USB mode: " in output
|
||||
assert expected_usb_mode in output
|
||||
|
||||
|
||||
@pytest.mark.flaky(reruns=5)
|
||||
@@ -1757,7 +1759,7 @@ class TestESPObjectOperations(EsptoolTestCase):
|
||||
read_mac(esp)
|
||||
reset_chip(esp, "hard_reset")
|
||||
output = fake_out.getvalue()
|
||||
assert "Stub running..." in output
|
||||
assert "Stub flasher running" in output
|
||||
assert "MAC:" in output
|
||||
|
||||
@capture_stdout
|
||||
@@ -1778,10 +1780,10 @@ class TestESPObjectOperations(EsptoolTestCase):
|
||||
reset_chip(esp, "hard_reset")
|
||||
os.remove("output.bin")
|
||||
output = fake_out.getvalue()
|
||||
assert "Stub running..." in output
|
||||
assert "Hash of data verified." in output
|
||||
assert "Stub flasher running" in output
|
||||
assert "Hash of data verified" in output
|
||||
assert "Read 9216 bytes" in output
|
||||
assert "verify OK (digest matched)" in output
|
||||
assert "Verification successful (digest matched)" in output
|
||||
assert "Chip erase completed" in output
|
||||
assert "Hard resetting" in output
|
||||
|
||||
|
@@ -141,7 +141,7 @@ class TestImageInfo:
|
||||
|
||||
def test_application_info(self):
|
||||
out = self.run_image_info("auto", "esp_idf_blink_esp32s2.bin")
|
||||
assert "Application information" in out
|
||||
assert "Application Information" in out
|
||||
assert "Project name: blink" in out
|
||||
assert "App version: qa-test-v5.0-20220830-4-g4532e6" in out
|
||||
assert "Secure version: 0" in out
|
||||
@@ -152,16 +152,16 @@ class TestImageInfo:
|
||||
assert "ESP-IDF: v5.0-beta1-427-g4532e6e0b2-dirt" in out
|
||||
# No application info in image
|
||||
out = self.run_image_info("auto", "bootloader_esp32.bin")
|
||||
assert "Application information" not in out
|
||||
assert "Application Information" not in out
|
||||
out = self.run_image_info("auto", ESP8266_BIN)
|
||||
assert "Application information" not in out
|
||||
assert "Application Information" not in out
|
||||
|
||||
def test_bootloader_info(self):
|
||||
# This bootloader binary is built from "hello_world" project
|
||||
# with default settings, IDF version is v5.2.
|
||||
out = self.run_image_info("esp32", "bootloader_esp32_v5_2.bin")
|
||||
assert "Image size: 26768 bytes" in out
|
||||
assert "Bootloader information" in out
|
||||
assert "Bootloader Information" in out
|
||||
assert "Bootloader version: 1" in out
|
||||
assert "ESP-IDF: v5.2-dev-254-g1950b15" in out
|
||||
assert "Compile time: Apr 25 2023 00:13:32" in out
|
||||
@@ -194,7 +194,7 @@ class TestImageInfo:
|
||||
convert_bin2hex(file)
|
||||
out = self.run_image_info("esp32", file)
|
||||
assert "Image size: 26768 bytes" in out
|
||||
assert "Bootloader information" in out
|
||||
assert "Bootloader Information" in out
|
||||
assert "Bootloader version: 1" in out
|
||||
assert "ESP-IDF: v5.2-dev-254-g1950b15" in out
|
||||
assert "Compile time: Apr 25 2023 00:13:32" in out
|
||||
|
@@ -29,16 +29,17 @@ class CustomLogger(TemplateLogger):
|
||||
"""
|
||||
pass
|
||||
|
||||
def print_overwrite(self, message: str, last_line: bool = False):
|
||||
"""
|
||||
Prints a message, overwriting the currently printed line.
|
||||
"""
|
||||
def stage(self, finish=False):
|
||||
pass
|
||||
|
||||
def set_progress(self, percentage: float):
|
||||
"""
|
||||
Sets the progress of long-running operations to a specific percentage.
|
||||
"""
|
||||
def progress_bar(
|
||||
self,
|
||||
cur_iter: int,
|
||||
total_iters: int,
|
||||
prefix: str = "",
|
||||
suffix: str = "",
|
||||
bar_length: int = 30,
|
||||
):
|
||||
pass
|
||||
|
||||
|
||||
@@ -52,7 +53,9 @@ class CustomLoggerIncomplete:
|
||||
class TestLogger:
|
||||
@pytest.fixture
|
||||
def logger(self):
|
||||
return EsptoolLogger()
|
||||
log = EsptoolLogger()
|
||||
log._set_smart_features(True)
|
||||
return log
|
||||
|
||||
def test_singleton(self, logger):
|
||||
logger2 = EsptoolLogger()
|
||||
@@ -90,31 +93,40 @@ class TestLogger:
|
||||
== f"{logger.ansi_red}This is an error{logger.ansi_normal}\n"
|
||||
)
|
||||
|
||||
def test_print_overwrite_tty(self, logger):
|
||||
with (
|
||||
patch("sys.stdout", new=StringIO()) as fake_out,
|
||||
patch("sys.stdout.isatty", return_value=True),
|
||||
):
|
||||
logger.print_overwrite("msg1", last_line=False)
|
||||
logger.print_overwrite("msg2", last_line=True)
|
||||
def test_stage(self, logger):
|
||||
with patch("sys.stdout", new=StringIO()) as fake_out:
|
||||
logger.stage()
|
||||
assert logger._stage_active
|
||||
logger.print("Line1")
|
||||
logger.print("Line2")
|
||||
logger.stage(finish=True)
|
||||
assert not logger._stage_active
|
||||
logger.print("Line3")
|
||||
|
||||
output = fake_out.getvalue()
|
||||
assert "msg1\n" not in output # msg1 should not have a newline
|
||||
assert f"\r{logger.ansi_clear}msg1" in output
|
||||
assert f"\r{logger.ansi_clear}msg2\n" in output
|
||||
assert f"{logger.ansi_line_up}{logger.ansi_line_clear}" * 2 in output
|
||||
assert "Line1\nLine2\n" in output
|
||||
assert "Line1\nLine2\nLine3\n" not in output
|
||||
|
||||
def test_print_overwrite_non_tty(self, logger):
|
||||
with (
|
||||
patch("sys.stdout", new=StringIO()) as fake_out,
|
||||
patch("sys.stdout.isatty", return_value=False),
|
||||
):
|
||||
logger.print_overwrite("msg1", last_line=False)
|
||||
logger.print_overwrite("msg2", last_line=True)
|
||||
assert fake_out.getvalue() == "msg1\nmsg2\n" # Acting as a normal print()
|
||||
|
||||
def test_set_progress(self, logger):
|
||||
logger.set_progress(50.0)
|
||||
# Since set_progress is not implemented - just ensure it doesn't raise an error
|
||||
assert True
|
||||
def test_progress_bar(self, logger):
|
||||
with patch("sys.stdout", new=StringIO()) as fake_out:
|
||||
logger.progress_bar(
|
||||
cur_iter=2,
|
||||
total_iters=4,
|
||||
prefix="Progress: ",
|
||||
suffix=" (2/4)",
|
||||
bar_length=10,
|
||||
)
|
||||
logger.progress_bar(
|
||||
cur_iter=4,
|
||||
total_iters=4,
|
||||
prefix="Progress: ",
|
||||
suffix=" (4/4)",
|
||||
bar_length=10,
|
||||
)
|
||||
output = fake_out.getvalue()
|
||||
assert "Progress: [====> ] 50.0% (2/4)" in output
|
||||
assert "Progress: [==========] 100.0% (4/4) \n" in output
|
||||
|
||||
def test_set_incomplete_logger(self, logger):
|
||||
with pytest.raises(
|
||||
|
Reference in New Issue
Block a user