Files
esptool/esp_rfc2217_server/esp_port_manager.py

102 lines
3.6 KiB
Python

# SPDX-FileCopyrightText: 2014-2024 Fredrik Ahlberg, Angus Gratton,
# Espressif Systems (Shanghai) CO LTD, other contributors as noted.
#
# SPDX-License-Identifier: BSD-3-Clause
import os
import threading
from esptool.reset import (
ClassicReset,
CustomReset,
DEFAULT_RESET_DELAY,
HardReset,
UnixTightReset,
)
import serial
import serial.rfc2217
from serial.rfc2217 import (
COM_PORT_OPTION,
SET_CONTROL,
SET_CONTROL_DTR_OFF,
SET_CONTROL_DTR_ON,
SET_CONTROL_RTS_OFF,
SET_CONTROL_RTS_ON,
)
from esptool.config import load_config_file
cfg, _ = load_config_file(verbose=True)
cfg = cfg["esptool"]
class EspPortManager(serial.rfc2217.PortManager):
"""
The beginning of the reset sequence is detected and the proper reset sequence
is applied in a thread. The rest of the reset sequence received is just ignored
and not sent to the serial port.
"""
def __init__(self, serial_port, connection, esp32r0_delay, logger=None):
self.esp32r0_delay = esp32r0_delay
self.is_download_mode = False
super().__init__(serial_port, connection, logger)
def _telnet_process_subnegotiation(self, suboption):
if suboption[0:1] == COM_PORT_OPTION and suboption[1:2] == SET_CONTROL:
if suboption[2:3] == SET_CONTROL_DTR_OFF:
self.is_download_mode = False
self.serial.dtr = False
return
elif suboption[2:3] == SET_CONTROL_RTS_OFF and not self.is_download_mode:
reset_thread = threading.Thread(target=self._hard_reset_thread)
reset_thread.daemon = True
reset_thread.name = "hard_reset_thread"
reset_thread.start()
return
elif suboption[2:3] == SET_CONTROL_DTR_ON and not self.is_download_mode:
self.is_download_mode = True
reset_thread = threading.Thread(target=self._reset_thread)
reset_thread.daemon = True
reset_thread.name = "reset_thread"
reset_thread.start()
return
elif suboption[2:3] in [
SET_CONTROL_DTR_ON,
SET_CONTROL_RTS_ON,
SET_CONTROL_RTS_OFF,
]:
return
# only in cases not handled above do the original implementation in PortManager
super()._telnet_process_subnegotiation(suboption)
def _hard_reset_thread(self):
"""
The reset logic used for hard resetting the chip.
"""
if self.logger:
self.logger.info("Activating hard reset in thread")
cfg_custom_hard_reset_sequence = cfg.get("custom_hard_reset_sequence")
if cfg_custom_hard_reset_sequence is not None:
CustomReset(self.serial, cfg_custom_hard_reset_sequence)()
else:
HardReset(self.serial)()
def _reset_thread(self):
"""
The reset logic is used from esptool because the RTS and DTR signals
cannot be retransmitted through RFC 2217 with proper timing.
"""
if self.logger:
self.logger.info("Activating reset in thread")
delay = DEFAULT_RESET_DELAY
if self.esp32r0_delay:
delay += 0.5
cfg_custom_reset_sequence = cfg.get("custom_reset_sequence")
if cfg_custom_reset_sequence is not None:
CustomReset(self.serial, cfg_custom_reset_sequence)()
elif os.name != "nt":
UnixTightReset(self.serial, delay)()
else:
ClassicReset(self.serial, delay)()