Merge branch 'bugfix/storage_nvs_tool_keys' into 'master'

Bugfix/storage nvs tool keys

See merge request espressif/esp-idf!37068
This commit is contained in:
Radek Tandler
2025-10-02 14:38:46 +02:00
3 changed files with 45 additions and 68 deletions

View File

@@ -1,10 +1,7 @@
#!/usr/bin/env python3
# SPDX-FileCopyrightText: 2022-2024 Espressif Systems (Shanghai) CO LTD
# SPDX-FileCopyrightText: 2022-2025 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from zlib import crc32
@@ -61,7 +58,9 @@ class NVS_Partition:
def __init__(self, name: str, raw_data: bytearray):
if len(raw_data) % nvs_const.page_size != 0:
raise NotAlignedError(
f'Given partition data is not aligned to page size ({len(raw_data)} % {nvs_const.page_size} = {len(raw_data)%nvs_const.page_size})'
f'Given partition data is not aligned to page size '
f'({len(raw_data)} % {nvs_const.page_size} = '
f'{len(raw_data) % nvs_const.page_size})'
)
self.name = name
@@ -69,9 +68,9 @@ class NVS_Partition:
# Divide partition into pages
self.pages = []
for i in range(0, len(raw_data), nvs_const.page_size):
self.pages.append(NVS_Page(raw_data[i: i + nvs_const.page_size], i))
self.pages.append(NVS_Page(raw_data[i : i + nvs_const.page_size], i))
def toJSON(self) -> Dict[str, Any]:
def toJSON(self) -> dict[str, Any]:
return dict(name=self.name, pages=self.pages)
@@ -83,22 +82,15 @@ class NVS_Page:
)
# Initialize class
self.is_empty = (
page_data[0: nvs_const.entry_size]
== bytearray({0xFF}) * nvs_const.entry_size
)
self.is_empty = page_data[0 : nvs_const.entry_size] == bytearray({0xFF}) * nvs_const.entry_size
self.start_address = address
self.raw_header = page_data[0: nvs_const.entry_size]
self.raw_entry_state_bitmap = page_data[
nvs_const.entry_size: 2 * nvs_const.entry_size
]
self.raw_header = page_data[0 : nvs_const.entry_size]
self.raw_entry_state_bitmap = page_data[nvs_const.entry_size : 2 * nvs_const.entry_size]
self.entries = []
# Load header
self.header: Dict[str, Any] = {
'status': nvs_const.page_status.get(
int.from_bytes(page_data[0:4], byteorder='little'), 'Invalid'
),
self.header: dict[str, Any] = {
'status': nvs_const.page_status.get(int.from_bytes(page_data[0:4], byteorder='little'), 'Invalid'),
'page_index': int.from_bytes(page_data[4:8], byteorder='little'),
'version': 256 - page_data[8],
'crc': {
@@ -111,16 +103,12 @@ class NVS_Page:
entry_states = []
for c in self.raw_entry_state_bitmap:
for index in range(0, 8, 2):
entry_states.append(
nvs_const.entry_status.get((c >> index) & 3, 'Invalid')
)
entry_states.append(nvs_const.entry_status.get((c >> index) & 3, 'Invalid'))
entry_states = entry_states[:-2]
# Load entries
i = 2
while i < int(
nvs_const.page_size / nvs_const.entry_size
): # Loop through every entry
while i < int(nvs_const.page_size / nvs_const.entry_size): # Loop through every entry
span = page_data[(i * nvs_const.entry_size) + 2]
if span in [0xFF, 0]: # 'Default' span length to prevent span overflow
span = 1
@@ -128,7 +116,7 @@ class NVS_Page:
# Load an entry
entry = NVS_Entry(
index=(i - 2),
entry_data=page_data[i * nvs_const.entry_size: (i + 1) * nvs_const.entry_size],
entry_data=page_data[i * nvs_const.entry_size : (i + 1) * nvs_const.entry_size],
entry_state=entry_states[i - 2],
)
self.entries.append(entry)
@@ -142,18 +130,14 @@ class NVS_Page:
break
child_entry = NVS_Entry(
index=entry_idx,
entry_data=page_data[
page_addr
* nvs_const.entry_size: (page_addr + 1)
* nvs_const.entry_size
],
entry_data=page_data[page_addr * nvs_const.entry_size : (page_addr + 1) * nvs_const.entry_size],
entry_state=entry_states[entry_idx],
)
entry.child_assign(child_entry)
entry.compute_crc()
i += span
def toJSON(self) -> Dict[str, Any]:
def toJSON(self) -> dict[str, Any]:
return dict(
is_empty=self.is_empty,
start_address=self.start_address,
@@ -168,15 +152,15 @@ class NVS_Entry:
def __init__(self, index: int, entry_data: bytearray, entry_state: str):
if len(entry_data) != nvs_const.entry_size:
raise NotAlignedError(
f'Given entry is not aligned to entry size ({len(entry_data)} % {nvs_const.entry_size} = {len(entry_data)%nvs_const.entry_size})'
f'Given entry is not aligned to entry size '
f'({len(entry_data)} % {nvs_const.entry_size} = '
f'{len(entry_data) % nvs_const.entry_size})'
)
def item_convert(i_type: int, data: bytearray) -> Dict:
def item_convert(i_type: int, data: bytearray) -> dict:
byte_size_mask = 0x0F
number_sign_mask = 0xF0
fixed_entry_length_threshold = (
0x20 # Fixed length entry type number is always smaller than this
)
fixed_entry_length_threshold = 0x20 # Fixed length entry type number is always smaller than this
if i_type in nvs_const.item_type:
# Deal with non variable length entries
if i_type < fixed_entry_length_threshold:
@@ -206,14 +190,11 @@ class NVS_Entry:
return {'value': None}
def key_decode(data: bytearray) -> Optional[str]:
def key_decode(data: bytearray) -> str | None:
decoded = ''
for n in data.rstrip(b'\x00'):
char = chr(n)
if char.isascii():
decoded += char
else:
return None
decoded += char
return decoded
self.raw = entry_data
@@ -230,7 +211,7 @@ class NVS_Entry:
key = self.raw[8:24]
data = self.raw[24:32]
raw_without_crc = self.raw[:4] + self.raw[8:32]
self.metadata: Dict[str, Any] = {
self.metadata: dict[str, Any] = {
'namespace': namespace,
'type': nvs_const.item_type.get(entry_type, f'0x{entry_type:02x}'),
'span': span,
@@ -242,7 +223,7 @@ class NVS_Entry:
'data_computed': 0,
},
}
self.children: List['NVS_Entry'] = []
self.children: list[NVS_Entry] = []
self.key = key_decode(key)
if self.key is None:
self.data = None
@@ -279,7 +260,7 @@ class NVS_Entry:
children_data = children_data[: self.data['size']] # Discard padding
self.metadata['crc']['data_computed'] = crc32(children_data, 0xFFFFFFFF)
def toJSON(self) -> Dict[str, Any]:
def toJSON(self) -> dict[str, Any]:
return dict(
raw=self.raw,
state=self.state,

View File

@@ -1,5 +1,5 @@
[pytest]
addopts = -s -p no:pytest_embedded -p no:idf-ci
addopts = -s -p no:idf-ci
# log related
log_cli = True

View File

@@ -3,16 +3,12 @@
# SPDX-License-Identifier: Apache-2.0
import base64
import json
from collections.abc import Callable
from importlib.metadata import version
from io import BufferedRandom
from io import BytesIO
from pathlib import Path
from typing import Any
from typing import Callable
from typing import Dict
from typing import List
from typing import Optional
from typing import Union
from zlib import crc32
import esp_idf_nvs_partition_gen.nvs_partition_gen as nvs_partition_gen
@@ -85,9 +81,9 @@ Name vehicula leo eu dolor pellentesque, ultrices tempus ex hendrerit.
"""
def get_entry_type_bin(entry_type_str: str) -> Optional[int]:
def get_entry_type_bin(entry_type_str: str) -> int | None:
# Reverse `item_type` dict lookup
entry_type_bin: Optional[int] = next(key for key, value in nvs_const.item_type.items() if value == entry_type_str)
entry_type_bin: int | None = next(key for key, value in nvs_const.item_type.items() if value == entry_type_str)
if entry_type_bin is None:
logger.info(logger.yellow(f'Unknown entry type {entry_type_str}'))
return entry_type_bin
@@ -118,8 +114,8 @@ def create_entry_data_bytearray(
@pytest.fixture
def generate_nvs() -> Callable:
def _execute_nvs_setup(nvs_setup_func: Callable, output: Optional[Path] = None) -> NVS_Partition:
nvs_file: Optional[Union[BytesIO, BufferedRandom]] = None
def _execute_nvs_setup(nvs_setup_func: Callable, output: Path | None = None) -> NVS_Partition:
nvs_file: BytesIO | BufferedRandom | None = None
if output is None:
nvs_file = BytesIO()
else:
@@ -138,7 +134,7 @@ def generate_nvs() -> Callable:
# Setup functions
def setup_ok_primitive(nvs_file: Optional[Union[BytesIO, BufferedRandom]]) -> NVS:
def setup_ok_primitive(nvs_file: BytesIO | BufferedRandom | None) -> NVS:
size_fixed, read_only = nvs_partition_gen.check_size(str(0x4000))
nvs_obj = nvs_partition_gen.nvs_open(
result_obj=nvs_file,
@@ -157,7 +153,7 @@ def setup_ok_primitive(nvs_file: Optional[Union[BytesIO, BufferedRandom]]) -> NV
return nvs_obj
def setup_ok_variable_len(nvs_file: Optional[Union[BytesIO, BufferedRandom]]) -> NVS:
def setup_ok_variable_len(nvs_file: BytesIO | BufferedRandom | None) -> NVS:
size_fixed, read_only = nvs_partition_gen.check_size(str(0x5000))
nvs_obj = nvs_partition_gen.nvs_open(
result_obj=nvs_file,
@@ -182,7 +178,7 @@ def setup_ok_variable_len(nvs_file: Optional[Union[BytesIO, BufferedRandom]]) ->
return nvs_obj
def setup_ok_mixed(nvs_file: Optional[Union[BytesIO, BufferedRandom]]) -> NVS:
def setup_ok_mixed(nvs_file: BytesIO | BufferedRandom | None) -> NVS:
size_fixed, read_only = nvs_partition_gen.check_size(str(0x6000))
nvs_obj = nvs_partition_gen.nvs_open(
result_obj=nvs_file,
@@ -220,7 +216,7 @@ def setup_ok_mixed(nvs_file: Optional[Union[BytesIO, BufferedRandom]]) -> NVS:
return nvs_obj
def setup_bad_mixed_same_key_different_page(nvs_file: Optional[Union[BytesIO, BufferedRandom]]) -> NVS:
def setup_bad_mixed_same_key_different_page(nvs_file: BytesIO | BufferedRandom | None) -> NVS:
size_fixed, read_only = nvs_partition_gen.check_size(str(0x6000))
nvs_obj = nvs_partition_gen.nvs_open(
result_obj=nvs_file,
@@ -284,7 +280,7 @@ def setup_bad_mixed_same_key_different_page(nvs_file: Optional[Union[BytesIO, Bu
return nvs_obj
def setup_bad_same_key_primitive(nvs_file: Optional[Union[BytesIO, BufferedRandom]]) -> NVS:
def setup_bad_same_key_primitive(nvs_file: BytesIO | BufferedRandom | None) -> NVS:
size_fixed, read_only = nvs_partition_gen.check_size(str(0x4000))
nvs_obj = nvs_partition_gen.nvs_open(
result_obj=nvs_file,
@@ -306,7 +302,7 @@ def setup_bad_same_key_primitive(nvs_file: Optional[Union[BytesIO, BufferedRando
return nvs_obj
def setup_bad_same_key_variable_len(nvs_file: Optional[Union[BytesIO, BufferedRandom]]) -> NVS:
def setup_bad_same_key_variable_len(nvs_file: BytesIO | BufferedRandom | None) -> NVS:
size_fixed, read_only = nvs_partition_gen.check_size(str(0x4000))
nvs_obj = nvs_partition_gen.nvs_open(
result_obj=nvs_file,
@@ -325,7 +321,7 @@ def setup_bad_same_key_variable_len(nvs_file: Optional[Union[BytesIO, BufferedRa
return nvs_obj
def setup_bad_same_key_blob_index(nvs_file: Optional[Union[BytesIO, BufferedRandom]]) -> NVS:
def setup_bad_same_key_blob_index(nvs_file: BytesIO | BufferedRandom | None) -> NVS:
size_fixed, read_only = nvs_partition_gen.check_size(str(0x6000))
nvs_obj = nvs_partition_gen.nvs_open(
result_obj=nvs_file,
@@ -350,7 +346,7 @@ def setup_bad_same_key_blob_index(nvs_file: Optional[Union[BytesIO, BufferedRand
return nvs_obj
def setup_read_only(nvs_file: Optional[Union[BytesIO, BufferedRandom]]) -> NVS:
def setup_read_only(nvs_file: BytesIO | BufferedRandom | None) -> NVS:
size_fixed, read_only = nvs_partition_gen.check_size(str(0x1000))
nvs_obj = nvs_partition_gen.nvs_open(
result_obj=nvs_file,
@@ -370,7 +366,7 @@ def setup_read_only(nvs_file: Optional[Union[BytesIO, BufferedRandom]]) -> NVS:
return nvs_obj
def setup_minimal_json(nvs_file: Optional[Union[BytesIO, BufferedRandom]]) -> NVS:
def setup_minimal_json(nvs_file: BytesIO | BufferedRandom | None) -> NVS:
size_fixed, read_only = nvs_partition_gen.check_size(str(0x4000))
nvs_obj = nvs_partition_gen.nvs_open(
result_obj=nvs_file,
@@ -394,7 +390,7 @@ def setup_minimal_json(nvs_file: Optional[Union[BytesIO, BufferedRandom]]) -> NV
return nvs_obj
def setup_ok_non_ascii_string(nvs_file: Optional[Union[BytesIO, BufferedRandom]]) -> NVS:
def setup_ok_non_ascii_string(nvs_file: BytesIO | BufferedRandom | None) -> NVS:
size_fixed, read_only = nvs_partition_gen.check_size(str(0x4000))
nvs_obj = nvs_partition_gen.nvs_open(
result_obj=nvs_file,
@@ -412,8 +408,8 @@ def setup_ok_non_ascii_string(nvs_file: Optional[Union[BytesIO, BufferedRandom]]
# Helper functions
def prepare_duplicate_list(nvs: NVS_Partition) -> Dict[str, List[NVS_Entry]]:
seen_written_entires_all: Dict[str, List[NVS_Entry]] = {}
def prepare_duplicate_list(nvs: NVS_Partition) -> dict[str, list[NVS_Entry]]:
seen_written_entires_all: dict[str, list[NVS_Entry]] = {}
for page in nvs.pages:
# page: NVS_Page
for entry in page.entries:
@@ -421,7 +417,7 @@ def prepare_duplicate_list(nvs: NVS_Partition) -> Dict[str, List[NVS_Entry]]:
# Duplicate entry check (1) - same key, different index - find duplicates
seen_written_entires_all = nvs_check.identify_entry_duplicates(entry, seen_written_entires_all)
# Duplicate entry check (2) - same key, different index
duplicates: Dict[str, List[NVS_Entry]] = nvs_check.filter_entry_duplicates(seen_written_entires_all)
duplicates: dict[str, list[NVS_Entry]] = nvs_check.filter_entry_duplicates(seen_written_entires_all)
return duplicates