mirror of
https://github.com/espressif/esp-idf.git
synced 2025-10-14 02:08:21 +08:00
fix(nvs_flash): Parsing NVS partition containing non ASCII keys
This commit is contained in:
@@ -1,10 +1,7 @@
|
||||
#!/usr/bin/env python3
|
||||
# SPDX-FileCopyrightText: 2022-2024 Espressif Systems (Shanghai) CO LTD
|
||||
# SPDX-FileCopyrightText: 2022-2025 Espressif Systems (Shanghai) CO LTD
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
from typing import Any
|
||||
from typing import Dict
|
||||
from typing import List
|
||||
from typing import Optional
|
||||
from zlib import crc32
|
||||
|
||||
|
||||
@@ -61,7 +58,9 @@ class NVS_Partition:
|
||||
def __init__(self, name: str, raw_data: bytearray):
|
||||
if len(raw_data) % nvs_const.page_size != 0:
|
||||
raise NotAlignedError(
|
||||
f'Given partition data is not aligned to page size ({len(raw_data)} % {nvs_const.page_size} = {len(raw_data)%nvs_const.page_size})'
|
||||
f'Given partition data is not aligned to page size '
|
||||
f'({len(raw_data)} % {nvs_const.page_size} = '
|
||||
f'{len(raw_data) % nvs_const.page_size})'
|
||||
)
|
||||
|
||||
self.name = name
|
||||
@@ -69,9 +68,9 @@ class NVS_Partition:
|
||||
# Divide partition into pages
|
||||
self.pages = []
|
||||
for i in range(0, len(raw_data), nvs_const.page_size):
|
||||
self.pages.append(NVS_Page(raw_data[i: i + nvs_const.page_size], i))
|
||||
self.pages.append(NVS_Page(raw_data[i : i + nvs_const.page_size], i))
|
||||
|
||||
def toJSON(self) -> Dict[str, Any]:
|
||||
def toJSON(self) -> dict[str, Any]:
|
||||
return dict(name=self.name, pages=self.pages)
|
||||
|
||||
|
||||
@@ -83,22 +82,15 @@ class NVS_Page:
|
||||
)
|
||||
|
||||
# Initialize class
|
||||
self.is_empty = (
|
||||
page_data[0: nvs_const.entry_size]
|
||||
== bytearray({0xFF}) * nvs_const.entry_size
|
||||
)
|
||||
self.is_empty = page_data[0 : nvs_const.entry_size] == bytearray({0xFF}) * nvs_const.entry_size
|
||||
self.start_address = address
|
||||
self.raw_header = page_data[0: nvs_const.entry_size]
|
||||
self.raw_entry_state_bitmap = page_data[
|
||||
nvs_const.entry_size: 2 * nvs_const.entry_size
|
||||
]
|
||||
self.raw_header = page_data[0 : nvs_const.entry_size]
|
||||
self.raw_entry_state_bitmap = page_data[nvs_const.entry_size : 2 * nvs_const.entry_size]
|
||||
self.entries = []
|
||||
|
||||
# Load header
|
||||
self.header: Dict[str, Any] = {
|
||||
'status': nvs_const.page_status.get(
|
||||
int.from_bytes(page_data[0:4], byteorder='little'), 'Invalid'
|
||||
),
|
||||
self.header: dict[str, Any] = {
|
||||
'status': nvs_const.page_status.get(int.from_bytes(page_data[0:4], byteorder='little'), 'Invalid'),
|
||||
'page_index': int.from_bytes(page_data[4:8], byteorder='little'),
|
||||
'version': 256 - page_data[8],
|
||||
'crc': {
|
||||
@@ -111,16 +103,12 @@ class NVS_Page:
|
||||
entry_states = []
|
||||
for c in self.raw_entry_state_bitmap:
|
||||
for index in range(0, 8, 2):
|
||||
entry_states.append(
|
||||
nvs_const.entry_status.get((c >> index) & 3, 'Invalid')
|
||||
)
|
||||
entry_states.append(nvs_const.entry_status.get((c >> index) & 3, 'Invalid'))
|
||||
entry_states = entry_states[:-2]
|
||||
|
||||
# Load entries
|
||||
i = 2
|
||||
while i < int(
|
||||
nvs_const.page_size / nvs_const.entry_size
|
||||
): # Loop through every entry
|
||||
while i < int(nvs_const.page_size / nvs_const.entry_size): # Loop through every entry
|
||||
span = page_data[(i * nvs_const.entry_size) + 2]
|
||||
if span in [0xFF, 0]: # 'Default' span length to prevent span overflow
|
||||
span = 1
|
||||
@@ -128,7 +116,7 @@ class NVS_Page:
|
||||
# Load an entry
|
||||
entry = NVS_Entry(
|
||||
index=(i - 2),
|
||||
entry_data=page_data[i * nvs_const.entry_size: (i + 1) * nvs_const.entry_size],
|
||||
entry_data=page_data[i * nvs_const.entry_size : (i + 1) * nvs_const.entry_size],
|
||||
entry_state=entry_states[i - 2],
|
||||
)
|
||||
self.entries.append(entry)
|
||||
@@ -142,18 +130,14 @@ class NVS_Page:
|
||||
break
|
||||
child_entry = NVS_Entry(
|
||||
index=entry_idx,
|
||||
entry_data=page_data[
|
||||
page_addr
|
||||
* nvs_const.entry_size: (page_addr + 1)
|
||||
* nvs_const.entry_size
|
||||
],
|
||||
entry_data=page_data[page_addr * nvs_const.entry_size : (page_addr + 1) * nvs_const.entry_size],
|
||||
entry_state=entry_states[entry_idx],
|
||||
)
|
||||
entry.child_assign(child_entry)
|
||||
entry.compute_crc()
|
||||
i += span
|
||||
|
||||
def toJSON(self) -> Dict[str, Any]:
|
||||
def toJSON(self) -> dict[str, Any]:
|
||||
return dict(
|
||||
is_empty=self.is_empty,
|
||||
start_address=self.start_address,
|
||||
@@ -168,15 +152,15 @@ class NVS_Entry:
|
||||
def __init__(self, index: int, entry_data: bytearray, entry_state: str):
|
||||
if len(entry_data) != nvs_const.entry_size:
|
||||
raise NotAlignedError(
|
||||
f'Given entry is not aligned to entry size ({len(entry_data)} % {nvs_const.entry_size} = {len(entry_data)%nvs_const.entry_size})'
|
||||
f'Given entry is not aligned to entry size '
|
||||
f'({len(entry_data)} % {nvs_const.entry_size} = '
|
||||
f'{len(entry_data) % nvs_const.entry_size})'
|
||||
)
|
||||
|
||||
def item_convert(i_type: int, data: bytearray) -> Dict:
|
||||
def item_convert(i_type: int, data: bytearray) -> dict:
|
||||
byte_size_mask = 0x0F
|
||||
number_sign_mask = 0xF0
|
||||
fixed_entry_length_threshold = (
|
||||
0x20 # Fixed length entry type number is always smaller than this
|
||||
)
|
||||
fixed_entry_length_threshold = 0x20 # Fixed length entry type number is always smaller than this
|
||||
if i_type in nvs_const.item_type:
|
||||
# Deal with non variable length entries
|
||||
if i_type < fixed_entry_length_threshold:
|
||||
@@ -206,14 +190,11 @@ class NVS_Entry:
|
||||
|
||||
return {'value': None}
|
||||
|
||||
def key_decode(data: bytearray) -> Optional[str]:
|
||||
def key_decode(data: bytearray) -> str | None:
|
||||
decoded = ''
|
||||
for n in data.rstrip(b'\x00'):
|
||||
char = chr(n)
|
||||
if char.isascii():
|
||||
decoded += char
|
||||
else:
|
||||
return None
|
||||
decoded += char
|
||||
return decoded
|
||||
|
||||
self.raw = entry_data
|
||||
@@ -230,7 +211,7 @@ class NVS_Entry:
|
||||
key = self.raw[8:24]
|
||||
data = self.raw[24:32]
|
||||
raw_without_crc = self.raw[:4] + self.raw[8:32]
|
||||
self.metadata: Dict[str, Any] = {
|
||||
self.metadata: dict[str, Any] = {
|
||||
'namespace': namespace,
|
||||
'type': nvs_const.item_type.get(entry_type, f'0x{entry_type:02x}'),
|
||||
'span': span,
|
||||
@@ -242,7 +223,7 @@ class NVS_Entry:
|
||||
'data_computed': 0,
|
||||
},
|
||||
}
|
||||
self.children: List['NVS_Entry'] = []
|
||||
self.children: list[NVS_Entry] = []
|
||||
self.key = key_decode(key)
|
||||
if self.key is None:
|
||||
self.data = None
|
||||
@@ -279,7 +260,7 @@ class NVS_Entry:
|
||||
children_data = children_data[: self.data['size']] # Discard padding
|
||||
self.metadata['crc']['data_computed'] = crc32(children_data, 0xFFFFFFFF)
|
||||
|
||||
def toJSON(self) -> Dict[str, Any]:
|
||||
def toJSON(self) -> dict[str, Any]:
|
||||
return dict(
|
||||
raw=self.raw,
|
||||
state=self.state,
|
||||
|
@@ -1,5 +1,5 @@
|
||||
[pytest]
|
||||
addopts = -s -p no:pytest_embedded -p no:idf-ci
|
||||
addopts = -s -p no:idf-ci
|
||||
|
||||
# log related
|
||||
log_cli = True
|
||||
|
@@ -3,16 +3,12 @@
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
import base64
|
||||
import json
|
||||
from collections.abc import Callable
|
||||
from importlib.metadata import version
|
||||
from io import BufferedRandom
|
||||
from io import BytesIO
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
from typing import Callable
|
||||
from typing import Dict
|
||||
from typing import List
|
||||
from typing import Optional
|
||||
from typing import Union
|
||||
from zlib import crc32
|
||||
|
||||
import esp_idf_nvs_partition_gen.nvs_partition_gen as nvs_partition_gen
|
||||
@@ -85,9 +81,9 @@ Name vehicula leo eu dolor pellentesque, ultrices tempus ex hendrerit.
|
||||
"""
|
||||
|
||||
|
||||
def get_entry_type_bin(entry_type_str: str) -> Optional[int]:
|
||||
def get_entry_type_bin(entry_type_str: str) -> int | None:
|
||||
# Reverse `item_type` dict lookup
|
||||
entry_type_bin: Optional[int] = next(key for key, value in nvs_const.item_type.items() if value == entry_type_str)
|
||||
entry_type_bin: int | None = next(key for key, value in nvs_const.item_type.items() if value == entry_type_str)
|
||||
if entry_type_bin is None:
|
||||
logger.info(logger.yellow(f'Unknown entry type {entry_type_str}'))
|
||||
return entry_type_bin
|
||||
@@ -118,8 +114,8 @@ def create_entry_data_bytearray(
|
||||
|
||||
@pytest.fixture
|
||||
def generate_nvs() -> Callable:
|
||||
def _execute_nvs_setup(nvs_setup_func: Callable, output: Optional[Path] = None) -> NVS_Partition:
|
||||
nvs_file: Optional[Union[BytesIO, BufferedRandom]] = None
|
||||
def _execute_nvs_setup(nvs_setup_func: Callable, output: Path | None = None) -> NVS_Partition:
|
||||
nvs_file: BytesIO | BufferedRandom | None = None
|
||||
if output is None:
|
||||
nvs_file = BytesIO()
|
||||
else:
|
||||
@@ -138,7 +134,7 @@ def generate_nvs() -> Callable:
|
||||
|
||||
|
||||
# Setup functions
|
||||
def setup_ok_primitive(nvs_file: Optional[Union[BytesIO, BufferedRandom]]) -> NVS:
|
||||
def setup_ok_primitive(nvs_file: BytesIO | BufferedRandom | None) -> NVS:
|
||||
size_fixed, read_only = nvs_partition_gen.check_size(str(0x4000))
|
||||
nvs_obj = nvs_partition_gen.nvs_open(
|
||||
result_obj=nvs_file,
|
||||
@@ -157,7 +153,7 @@ def setup_ok_primitive(nvs_file: Optional[Union[BytesIO, BufferedRandom]]) -> NV
|
||||
return nvs_obj
|
||||
|
||||
|
||||
def setup_ok_variable_len(nvs_file: Optional[Union[BytesIO, BufferedRandom]]) -> NVS:
|
||||
def setup_ok_variable_len(nvs_file: BytesIO | BufferedRandom | None) -> NVS:
|
||||
size_fixed, read_only = nvs_partition_gen.check_size(str(0x5000))
|
||||
nvs_obj = nvs_partition_gen.nvs_open(
|
||||
result_obj=nvs_file,
|
||||
@@ -182,7 +178,7 @@ def setup_ok_variable_len(nvs_file: Optional[Union[BytesIO, BufferedRandom]]) ->
|
||||
return nvs_obj
|
||||
|
||||
|
||||
def setup_ok_mixed(nvs_file: Optional[Union[BytesIO, BufferedRandom]]) -> NVS:
|
||||
def setup_ok_mixed(nvs_file: BytesIO | BufferedRandom | None) -> NVS:
|
||||
size_fixed, read_only = nvs_partition_gen.check_size(str(0x6000))
|
||||
nvs_obj = nvs_partition_gen.nvs_open(
|
||||
result_obj=nvs_file,
|
||||
@@ -220,7 +216,7 @@ def setup_ok_mixed(nvs_file: Optional[Union[BytesIO, BufferedRandom]]) -> NVS:
|
||||
return nvs_obj
|
||||
|
||||
|
||||
def setup_bad_mixed_same_key_different_page(nvs_file: Optional[Union[BytesIO, BufferedRandom]]) -> NVS:
|
||||
def setup_bad_mixed_same_key_different_page(nvs_file: BytesIO | BufferedRandom | None) -> NVS:
|
||||
size_fixed, read_only = nvs_partition_gen.check_size(str(0x6000))
|
||||
nvs_obj = nvs_partition_gen.nvs_open(
|
||||
result_obj=nvs_file,
|
||||
@@ -284,7 +280,7 @@ def setup_bad_mixed_same_key_different_page(nvs_file: Optional[Union[BytesIO, Bu
|
||||
return nvs_obj
|
||||
|
||||
|
||||
def setup_bad_same_key_primitive(nvs_file: Optional[Union[BytesIO, BufferedRandom]]) -> NVS:
|
||||
def setup_bad_same_key_primitive(nvs_file: BytesIO | BufferedRandom | None) -> NVS:
|
||||
size_fixed, read_only = nvs_partition_gen.check_size(str(0x4000))
|
||||
nvs_obj = nvs_partition_gen.nvs_open(
|
||||
result_obj=nvs_file,
|
||||
@@ -306,7 +302,7 @@ def setup_bad_same_key_primitive(nvs_file: Optional[Union[BytesIO, BufferedRando
|
||||
return nvs_obj
|
||||
|
||||
|
||||
def setup_bad_same_key_variable_len(nvs_file: Optional[Union[BytesIO, BufferedRandom]]) -> NVS:
|
||||
def setup_bad_same_key_variable_len(nvs_file: BytesIO | BufferedRandom | None) -> NVS:
|
||||
size_fixed, read_only = nvs_partition_gen.check_size(str(0x4000))
|
||||
nvs_obj = nvs_partition_gen.nvs_open(
|
||||
result_obj=nvs_file,
|
||||
@@ -325,7 +321,7 @@ def setup_bad_same_key_variable_len(nvs_file: Optional[Union[BytesIO, BufferedRa
|
||||
return nvs_obj
|
||||
|
||||
|
||||
def setup_bad_same_key_blob_index(nvs_file: Optional[Union[BytesIO, BufferedRandom]]) -> NVS:
|
||||
def setup_bad_same_key_blob_index(nvs_file: BytesIO | BufferedRandom | None) -> NVS:
|
||||
size_fixed, read_only = nvs_partition_gen.check_size(str(0x6000))
|
||||
nvs_obj = nvs_partition_gen.nvs_open(
|
||||
result_obj=nvs_file,
|
||||
@@ -350,7 +346,7 @@ def setup_bad_same_key_blob_index(nvs_file: Optional[Union[BytesIO, BufferedRand
|
||||
return nvs_obj
|
||||
|
||||
|
||||
def setup_read_only(nvs_file: Optional[Union[BytesIO, BufferedRandom]]) -> NVS:
|
||||
def setup_read_only(nvs_file: BytesIO | BufferedRandom | None) -> NVS:
|
||||
size_fixed, read_only = nvs_partition_gen.check_size(str(0x1000))
|
||||
nvs_obj = nvs_partition_gen.nvs_open(
|
||||
result_obj=nvs_file,
|
||||
@@ -370,7 +366,7 @@ def setup_read_only(nvs_file: Optional[Union[BytesIO, BufferedRandom]]) -> NVS:
|
||||
return nvs_obj
|
||||
|
||||
|
||||
def setup_minimal_json(nvs_file: Optional[Union[BytesIO, BufferedRandom]]) -> NVS:
|
||||
def setup_minimal_json(nvs_file: BytesIO | BufferedRandom | None) -> NVS:
|
||||
size_fixed, read_only = nvs_partition_gen.check_size(str(0x4000))
|
||||
nvs_obj = nvs_partition_gen.nvs_open(
|
||||
result_obj=nvs_file,
|
||||
@@ -394,7 +390,7 @@ def setup_minimal_json(nvs_file: Optional[Union[BytesIO, BufferedRandom]]) -> NV
|
||||
return nvs_obj
|
||||
|
||||
|
||||
def setup_ok_non_ascii_string(nvs_file: Optional[Union[BytesIO, BufferedRandom]]) -> NVS:
|
||||
def setup_ok_non_ascii_string(nvs_file: BytesIO | BufferedRandom | None) -> NVS:
|
||||
size_fixed, read_only = nvs_partition_gen.check_size(str(0x4000))
|
||||
nvs_obj = nvs_partition_gen.nvs_open(
|
||||
result_obj=nvs_file,
|
||||
@@ -412,8 +408,8 @@ def setup_ok_non_ascii_string(nvs_file: Optional[Union[BytesIO, BufferedRandom]]
|
||||
|
||||
|
||||
# Helper functions
|
||||
def prepare_duplicate_list(nvs: NVS_Partition) -> Dict[str, List[NVS_Entry]]:
|
||||
seen_written_entires_all: Dict[str, List[NVS_Entry]] = {}
|
||||
def prepare_duplicate_list(nvs: NVS_Partition) -> dict[str, list[NVS_Entry]]:
|
||||
seen_written_entires_all: dict[str, list[NVS_Entry]] = {}
|
||||
for page in nvs.pages:
|
||||
# page: NVS_Page
|
||||
for entry in page.entries:
|
||||
@@ -421,7 +417,7 @@ def prepare_duplicate_list(nvs: NVS_Partition) -> Dict[str, List[NVS_Entry]]:
|
||||
# Duplicate entry check (1) - same key, different index - find duplicates
|
||||
seen_written_entires_all = nvs_check.identify_entry_duplicates(entry, seen_written_entires_all)
|
||||
# Duplicate entry check (2) - same key, different index
|
||||
duplicates: Dict[str, List[NVS_Entry]] = nvs_check.filter_entry_duplicates(seen_written_entires_all)
|
||||
duplicates: dict[str, list[NVS_Entry]] = nvs_check.filter_entry_duplicates(seen_written_entires_all)
|
||||
return duplicates
|
||||
|
||||
|
||||
|
Reference in New Issue
Block a user