Merge pull request #3 from Autonomy-Logic/RTOP-26-Login-RestAPI

[RTOP-26] login rest api
This commit is contained in:
Lucas Cordeiro Butzke
2025-07-31 15:11:23 -03:00
committed by GitHub
10 changed files with 466 additions and 95 deletions

9
.gitignore vendored
View File

@@ -39,5 +39,12 @@ utils/libmodbus_src/tests/.libs/*
webserver/scripts/openplc_driver
webserver/scripts/openplc_platform
# Runtime files
webserver/*.pem
webserver/*.db
webserver/instance/
# Runtime directories
.venv
.venv
.env*

View File

@@ -92,9 +92,9 @@ function install_py_deps {
python3 -m venv "$VENV_DIR"
"$VENV_DIR/bin/python3" -m pip install --upgrade pip
if [ "$1" == "neuron" ]; then
"$VENV_DIR/bin/python3" -m pip install flask==2.2.5 werkzeug==2.2.2 flask-login==0.6.2 pyserial pymodbus==2.5.3 cryptography
"$VENV_DIR/bin/python3" -m pip install flask==2.2.5 werkzeug==2.2.2 flask-login==0.6.2 pyserial pymodbus==2.5.3 cryptography flask_jwt_extended flask_sqlalchemy python-dotenv
else
"$VENV_DIR/bin/python3" -m pip install flask==2.3.3 werkzeug==2.3.7 flask-login==0.6.2 pyserial pymodbus==2.5.3 cryptography
"$VENV_DIR/bin/python3" -m pip install flask==2.3.3 werkzeug==2.3.7 flask-login==0.6.2 pyserial pymodbus==2.5.3 cryptography flask_jwt_extended flask_sqlalchemy python-dotenv
fi
python3 -m pip install pymodbus==2.5.3
}
@@ -202,6 +202,7 @@ function install_libsnap7 {
cd "$OPENPLC_DIR"
}
function install_systemd_service() {
if [ "$1" == "sudo" ]; then
echo "[OPENPLC SERVICE]"
@@ -278,7 +279,7 @@ if [ "$1" == "win" ]; then
#Setting up venv
python3 -m venv "$VENV_DIR"
"$VENV_DIR/bin/python3" get-pip3.py
"$VENV_DIR/bin/python3" -m pip install flask==2.3.3 werkzeug==2.3.7 flask-login==0.6.2 pyserial pymodbus==2.5.3 cryptography
"$VENV_DIR/bin/python3" -m pip install flask==2.3.3 werkzeug==2.3.7 flask-login==0.6.2 pyserial pymodbus==2.5.3 cryptography flask_jwt_extended flask_sqlalchemy python-dotenv
echo ""
echo "[MATIEC COMPILER]"
@@ -306,7 +307,7 @@ elif [ "$1" == "win_msys2" ]; then
#Setting up venv
python3 -m venv "$VENV_DIR"
"$VENV_DIR/bin/python3" get-pip3.py
"$VENV_DIR/bin/python3" -m pip install flask==2.3.3 werkzeug==2.3.7 flask-login==0.6.2 pyserial pymodbus==2.5.3
"$VENV_DIR/bin/python3" -m pip install flask==2.3.3 werkzeug==2.3.7 flask-login==0.6.2 pyserial pymodbus==2.5.3 cryptography flask_jwt_extended flask_sqlalchemy python-dotenv
echo ""
echo "[MATIEC COMPILER]"

View File

@@ -1 +1 @@
Dockerfile
blank_program.st

89
webserver/config.py Normal file
View File

@@ -0,0 +1,89 @@
from dotenv import load_dotenv
import os
import re
import secrets
import logging
from pathlib import Path
from dotenv import load_dotenv
# Always resolve .env relative to the repo root to guarantee it is found
ENV_PATH = Path(__file__).resolve().parent.parent / ".env"
DB_PATH = Path(__file__).resolve().parent.parent / "restapi.db"
BASE_DIR = os.path.abspath(os.path.dirname(__file__))
logger = logging.getLogger(__name__)
logging.basicConfig(
level=logging.DEBUG, # Minimum level to capture
format='[%(levelname)s] %(asctime)s - %(message)s',
datefmt='%H:%M:%S'
)
# Function to validate environment variable values
def is_valid_env(var_name, value):
if var_name == "SQLALCHEMY_DATABASE_URI":
return value.startswith("sqlite:///")
elif var_name in ("JWT_SECRET_KEY", "PEPPER"):
return bool(re.fullmatch(r"[a-fA-F0-9]{64}", value))
return False
# Function to generate a new .env file with valid defaults
def generate_env_file():
jwt = secrets.token_hex(32)
pepper = secrets.token_hex(32)
uri = "sqlite:///{DB_PATH}"
with open(ENV_PATH, "w") as f:
f.write("FLASK_ENV=development\n")
f.write(f"SQLALCHEMY_DATABASE_URI={uri}\n")
f.write(f"JWT_SECRET_KEY={jwt}\n")
f.write(f"PEPPER={pepper}\n")
os.chmod(ENV_PATH, 0o600)
logger.info(f".env file created at {ENV_PATH}")
# Ensure the database file exists and is writable
# Deletion is required because new secrets will change the database saved hashes
if os.path.exists(DB_PATH):
os.remove(DB_PATH)
logger.info(f"Deleted existing database file: {DB_PATH}")
# Load .env file
if not os.path.isfile(ENV_PATH):
logger.warning(".env file not found, creating one...")
generate_env_file()
load_dotenv(dotenv_path=ENV_PATH, override=False)
# Mandatory settings raise immediately if not provided
try:
for var in ("SQLALCHEMY_DATABASE_URI", "JWT_SECRET_KEY", "PEPPER"):
val = os.getenv(var)
if not val or not is_valid_env(var, val):
raise RuntimeError(f"Environment variable '{var}' is invalid or missing")
except RuntimeError as e:
logger.error(f"{e}")
# Need to regenerate .env file and remove the database as well
response = input("Do you want to regenerate the .env file? This will delete your database. [y/N]: ").strip().lower()
if response == 'y':
logger.info("Regenerating .env with new valid values...")
generate_env_file()
load_dotenv(ENV_PATH)
else:
logger.error("Exiting due to invalid environment configuration.")
exit(1)
class Config:
SQLALCHEMY_DATABASE_URI = os.environ["SQLALCHEMY_DATABASE_URI"]
JWT_SECRET_KEY = os.environ["JWT_SECRET_KEY"]
PEPPER = os.environ["PEPPER"]
class DevConfig(Config):
SQLALCHEMY_TRACK_MODIFICATIONS = False # keep performance parity with prod
DEBUG = True
class ProdConfig(Config):
SQLALCHEMY_TRACK_MODIFICATIONS = False
DEBUG = False
ENV = "production"

View File

@@ -10,31 +10,16 @@ import os
class CertGen():
"""
Generates a self-signed TLS certificate and private key.
Args:
hostname (str): The common name (CN) for the certificate.
ip_addresses (list, optional): A list of IP addresses to include in the SAN extension.
cert_file (str): The filename for the certificate (PEM format).
key_file (str): The filename for the private key (PEM format).
"""
"""Generates a self-signed TLS certificate and private key."""
def __init__(self, hostname, ip_addresses=None):
self.hostname = hostname
self.ip_addresses = ip_addresses
# Certificate validity
self.now = datetime.datetime.utcnow()
# Subject and Issuer
self.subject = self.issuer = x509.Name([
# x509.NameAttribute(NameOID.COUNTRY_NAME, u"US"), # TODO get device country
# x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"openplc"),
# x509.NameAttribute(NameOID.LOCALITY_NAME, u"openplc"),
# x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"Autonomy"),
x509.NameAttribute(NameOID.COMMON_NAME, hostname),
])
# Subject Alternative Names (SAN)
self.alt_names = [x509.DNSName(hostname)]
if ip_addresses:
for addr in ip_addresses:
@@ -82,15 +67,7 @@ class CertGen():
# TODO add a function to update the certificate on the client before expiration
def is_certificate_valid(self, cert_file):
"""
Checks if a certificate is valid (not expired and not yet valid).
Args:
cert_file (str): The path to the certificate file (PEM format).
Returns:
bool: True if the certificate is currently valid, False otherwise.
"""
"""Check if the certificate is valid."""
if not os.path.exists(cert_file):
print(f"Certificate file not found: {cert_file}")
return False

Binary file not shown.

View File

@@ -73,6 +73,10 @@ class runtime:
project_file = ""
project_name = ""
project_description = ""
compilation_status_str = ""
compilation_error_str = ""
compilation_object = None
compilation_error = None
runtime_status = "Stopped"
def start_runtime(self):
@@ -109,9 +113,7 @@ class runtime:
self.stop_runtime()
self.is_compiling = True
global compilation_status_str
global compilation_object
compilation_status_str = ""
self.compilation_status_str = ""
# Extract debug information from program
with open('./st_files/' + st_file, "r") as f:
@@ -137,9 +139,9 @@ class runtime:
else:
# No debug info... probably a program generated from the old editor. Use the blank debug info just to compile the program
f = open('./core/debug.blank', "r")
c_debug = f.read()
f.close()
with open('./core/debug.blank', "r") as f:
c_debug = f.read()
f.close()
# Write c_debug file
with open('./core/debug.cpp', "w") as f:
@@ -147,7 +149,8 @@ class runtime:
# Start compilation
a = subprocess.Popen(['./scripts/compile_program.sh', str(st_file)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
compilation_object = NonBlockingStreamReader(a.stdout)
self.compilation_object = NonBlockingStreamReader(a.stdout)
self.compilation_error = NonBlockingStreamReader(a.stderr)
else:
# Debug info was extracted from program
program = '\n'.join(program_lines)
@@ -166,21 +169,29 @@ class runtime:
# Start compilation
a = subprocess.Popen(['./scripts/compile_program.sh', str(st_file)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
compilation_object = NonBlockingStreamReader(a.stdout)
self.compilation_object = NonBlockingStreamReader(a.stdout)
self.compilation_error = NonBlockingStreamReader(a.stderr)
def compilation_status(self):
global compilation_status_str
global compilation_object
while compilation_object != None:
line = compilation_object.readline()
while self.compilation_object != None:
line = self.compilation_object.readline()
if not line: break
compilation_status_str += line
return compilation_status_str
self.compilation_status_str += line
return self.compilation_status_str
def get_compilation_error(self):
while self.compilation_error != None:
line = self.compilation_error.readline()
if not line: break
self.compilation_error_str += line
return self.compilation_error_str
def status(self):
if ('compilation_object' in globals()):
if (compilation_object.end_of_stream == False):
try:
if (self.compilation_object.end_of_stream == False):
return "Compiling"
except Exception as e:
print(f"Error checking compilation status: {e}")
if not self._rpc('exec_time()', 10000):
self.runtime_status = "Stopped"

View File

@@ -1,41 +1,265 @@
# restblueprint.py
from flask import Blueprint, jsonify, request
from flask import Flask, Blueprint, jsonify, request
from flask_sqlalchemy import SQLAlchemy
from flask_jwt_extended import create_access_token, current_user, jwt_required, JWTManager, verify_jwt_in_request, get_jwt
from werkzeug.security import generate_password_hash, check_password_hash
import logging
from typing import Callable, Optional
# Define the Blueprint
restapi_bp = Blueprint('restapi_blueprint', __name__)
import config
import os
env = os.getenv("FLASK_ENV", "development")
# Global variable to store the single callback for this blueprint
app_restapi = Flask(__name__)
if env == "production":
app_restapi.config.from_object(config.ProdConfig)
else:
app_restapi.config.from_object(config.DevConfig)
logger = logging.getLogger(__name__)
logging.basicConfig(
level=logging.DEBUG, # Minimum level to capture
format='[%(levelname)s] %(asctime)s - %(message)s',
datefmt='%H:%M:%S'
)
restapi_bp = Blueprint('restapi_blueprint', __name__)
_handler_callback_get: Optional[Callable[[str, dict], dict]] = None
_handler_callback_post: Optional[Callable[[str, dict], dict]] = None
jwt = JWTManager(app_restapi)
db = SQLAlchemy(app_restapi)
jwt_blacklist = set()
@jwt.token_in_blocklist_loader
def check_if_token_revoked(jwt_header, jwt_payload):
jti = jwt_payload["jti"]
return jti in jwt_blacklist
class User(db.Model):
id: int = db.Column(db.Integer, primary_key=True)
username: str = db.Column(db.Text, nullable=False, unique=True)
password_hash: str = db.Column(db.Text, nullable=False)
# TODO implement roles
# For now, we will just use "user" and "admin"
# In the future, we can implement more roles like "guest", "editor", etc
# and use them to control access to different parts of the API
role: str = db.Column(db.String(20), default="user")
# Use PBKDF2 with SHA256 and 600,000 iterations for password hashing
derivation_method: str = "pbkdf2:sha256:600000"
def set_password(self, password: str) -> str:
password = password + app_restapi.config["PEPPER"]
self.password_hash = generate_password_hash(password,
method=self.derivation_method)
logger.debug(f"Password set for user {self.username} | {self.password_hash}")
return self.password_hash
def check_password(self, password: str) -> bool:
password = password + app_restapi.config["PEPPER"]
return check_password_hash(self.password_hash, password)
def to_dict(self):
return {"id": self.id, "username": self.username, "role": self.role}
@jwt.user_identity_loader
def user_identity_lookup(user):
return str(user.id)
@jwt.user_lookup_loader
def user_lookup_callback(_jwt_header, jwt_data):
identity = jwt_data["sub"]
return User.query.filter_by(id=identity).one_or_none()
def register_callback_get(callback: Callable[[str, dict], dict]):
"""Registers the business logic callback function."""
global _handler_callback_get
_handler_callback_get = callback
print("GET Callback registered successfully for rest_blueprint!")
logger.info("GET Callback registered successfully for rest_blueprint!")
def register_callback_post(callback: Callable[[str, dict], dict]):
"""Registers the business logic callback function."""
global _handler_callback_post
_handler_callback_post = callback
print("POST Callback registered successfully for rest_blueprint!")
logger.info("POST Callback registered successfully for rest_blueprint!")
@restapi_bp.route("/create-user", methods=["POST"])
def create_user():
# check if there are any users in the database
try:
users_exist = User.query.first() is not None
except Exception as e:
logger.error(f"Error checking for users: {e}")
return jsonify({"msg": "User creation error"}), 401
# if there are no users, we don't need to verify JWT
if users_exist and verify_jwt_in_request(optional=True) is None:
return jsonify({"msg": "User already created!"}), 401
data = request.get_json()
username = data.get("username")
password = data.get("password")
role = data.get("role", "user")
if not username or not password:
return jsonify({"msg": "Missing username or password"}), 400
if User.query.filter_by(username=username).first():
return jsonify({"msg": "Username already exists"}), 409
# Create a new user
user = User(username=username, role=role)
user.set_password(password)
db.session.add(user)
db.session.commit()
return jsonify({"msg": "User created", "id": user.id}), 201
# verify existing users individually
@restapi_bp.route("/get-user-info/<int:user_id>", methods=["GET"])
@jwt_required()
def get_user_info(user_id):
try:
user = User.query.get(user_id)
except Exception as e:
logger.error(f"Error retrieving user: {e}")
return jsonify({"msg": "User retrieval error"}), 500
if not user:
return jsonify({"msg": "User not found"}), 404
return jsonify(user.to_dict())
@restapi_bp.route("/get-users-info", methods=["GET"])
def get_users_info():
# If there are no users, we don't need to verify JWT
try:
verify_jwt_in_request()
except Exception as e:
logger.warning("No JWT token provided, checking for users without authentication")
try:
users_exist = User.query.first() is not None
except Exception as e:
logger.error(f"Error checking for users: {e}")
return jsonify({"msg": "User retrieval error"}), 500
if not users_exist:
return jsonify({"msg": "No users found"}), 404
return jsonify({"msg": "Users found"}), 200
try:
users = User.query.all()
except Exception as e:
logger.error(f"Error retrieving users: {e}")
return jsonify({"msg": "User retrieval error"}), 500
return jsonify([user.to_dict() for user in users]), 200
# password change for specific user by any authenticated user
@restapi_bp.route("/password-change/<int:user_id>", methods=["PUT"])
@jwt_required()
def change_password(user_id):
data = request.get_json()
old_password = data.get("old_password")
new_password = data.get("new_password")
if not old_password or not new_password:
return jsonify({"msg": "Both old and new passwords are required"}), 400
try:
user = User.query.get(user_id)
except Exception as e:
logger.error(f"Error retrieving user: {e}")
return jsonify({"msg": "User retrieval error"}), 500
if not user:
return jsonify({"msg": "User not found"}), 404
if not user.check_password(old_password):
return jsonify({"msg": "Old password is incorrect"}), 403
user.set_password(new_password)
db.session.commit()
return jsonify({"msg": f"Password for user {user.username} updated successfully"}), 200
# delete a user by ID
@restapi_bp.route("/delete-user/<int:user_id>", methods=["DELETE"])
@jwt_required()
def delete_user(user_id):
try:
user = User.query.get(user_id)
except Exception as e:
logger.error(f"Error retrieving user: {e}")
return jsonify({"msg": "User retrieval error"}), 500
if not user:
return jsonify({"msg": "User not found"}), 404
db.session.delete(user)
db.session.commit()
revoke_jwt()
return jsonify({"msg": f"User {user.username} deleted successfully"}), 200
# login endpoint
@restapi_bp.route("/login", methods=["POST"])
def login():
username = request.json.get("username", None)
password = request.json.get("password", None)
try:
user = User.query.filter_by(username=username).one_or_none()
logger.debug(f"User found: {user}")
except Exception as e:
logger.error(f"Error retrieving user: {e}")
return jsonify({"msg": "User retrieval error"}), 500
if not user or not user.check_password(password):
return jsonify("Wrong username or password"), 401
access_token = create_access_token(identity=user)
return jsonify(access_token=access_token)
# logout endpoint
@restapi_bp.route("/logout", methods=["POST"])
@jwt_required()
def logout():
revoke_jwt()
return jsonify({"msg": "User logged out successfully"}), 200
def revoke_jwt():
jti = get_jwt()["jti"]
try:
# Add the JWT ID to the blacklist
jwt_blacklist.add(jti)
except Exception as e:
logger.error(f"Error revoking JWT: {e}")
@restapi_bp.route("/<command>", methods=["GET"])
@jwt_required()
def restapi_plc_get(command):
if _handler_callback_get is None:
return jsonify({"error": "No handler registered"}), 500
try:
data = request.args.to_dict()
result = _handler_callback_get(command, data)
return jsonify(result), 200
except Exception as e:
print(f"Error in restapi_plc_get: {e}")
logger.error(f"Error in restapi_plc_get: {e}")
return jsonify({"error": str(e)}), 500
@restapi_bp.route("/<command>", methods=["POST"])
@jwt_required()
def restapi_plc_post(command):
if _handler_callback_post is None:
return jsonify({"error": "No handler registered"}), 500
@@ -47,5 +271,5 @@ def restapi_plc_post(command):
result = _handler_callback_post(command, data)
return jsonify(result), 200
except Exception as e:
print(f"Error in restapi_plc_post: {e}")
logger.error(f"Error in restapi_plc_post: {e}")
return jsonify({"error": str(e)}), 500

View File

@@ -0,0 +1,17 @@
PROGRAM prog0
VAR
var_in : BOOL;
var_out : BOOL;
END_VAR
var_out := var_in;
END_PROGRAM
CONFIGURATION Config0
RESOURCE Res0 ON PLC
TASK Main(INTERVAL := T#50ms,PRIORITY := 0);
PROGRAM Inst0 WITH Main : prog0;
END_RESOURCE
END_CONFIGURATION

View File

@@ -16,25 +16,32 @@ import socket
import mimetypes
import ssl
import threading
import logging
import flask
import flask_login
from credentials import CertGen
from restapi import restapi_bp, register_callback_get, register_callback_post
from restapi import app_restapi, restapi_bp, db, register_callback_get, register_callback_post
app = flask.Flask(__name__)
app.secret_key = str(os.urandom(16))
login_manager = flask_login.LoginManager()
login_manager.init_app(app)
logger = logging.getLogger(__name__)
logging.basicConfig(
level=logging.DEBUG, # Minimum level to capture
format='[%(levelname)s] %(asctime)s - %(message)s',
datefmt='%H:%M:%S'
)
openplc_runtime = openplc.runtime()
app_restapi = flask.Flask(__name__)
# TODO define best path to store credentials
CERT_FILE = "/etc/ssl/certs/certOPENPLC.pem"
KEY_FILE = "/etc/ssl/private/keyOPENPLC.pem"
from pathlib import Path
BASE_DIR = Path(__file__).parent
CERT_FILE = (BASE_DIR / "certOPENPLC.pem").resolve()
KEY_FILE = (BASE_DIR / "keyOPENPLC.pem").resolve()
HOSTNAME = "localhost"
def restapi_callback_get(argument: str, data: dict) -> dict:
@@ -42,11 +49,11 @@ def restapi_callback_get(argument: str, data: dict) -> dict:
This is the central callback function that handles the logic
based on the 'argument' from the URL and 'data' from the request.
"""
# TODO logging debug level
print(f"GET | [{__name__}] Received argument: {argument}, data: {data}")
logger.debug(f"GET | Received argument: {argument}, data: {data}")
if argument == "start-plc":
openplc_runtime.start_runtime()
configure_runtime()
return {"status": "runtime started"}
elif argument == "stop-plc":
@@ -58,12 +65,28 @@ def restapi_callback_get(argument: str, data: dict) -> dict:
return {"runtime-logs": logs}
elif argument == "compilation-status":
status = openplc_runtime.is_compiling
return {"is-compiling": status}
elif argument == "compilation-logs":
logs = openplc_runtime.compilation_status()
return {"compilation-logs": logs}
try:
logs = openplc_runtime.compilation_status()
_logs = logs
except Exception as e:
logger.error(f"Error retrieving compilation logs: {e}")
_logs = str(e)
status = _logs
if status is not str:
_status = "No compilation in progress"
if "Compilation finished successfully!" in status:
_status = "Success"
_error = "No error"
elif "Compilation finished with errors!" in status:
_status = "Error"
_error = openplc_runtime.get_compilation_error()
else:
_status = "Compiling"
_error = openplc_runtime.get_compilation_error()
logger.debug(f"Compilation status: {_status}, logs: {_logs}", extra={"error": _error})
return {"status": _status, "logs": _logs, "error": _error}
elif argument == "status":
return {"current_status": "operational", "details": data}
@@ -75,37 +98,51 @@ def restapi_callback_get(argument: str, data: dict) -> dict:
# file upload POST handler
def restapi_callback_post(argument: str, data: dict) -> dict:
# TODO logging debug level
print(f"POST | [{__name__}] Received argument: {argument}, data: {data}")
logger.debug(f"POST | Received argument: {argument}, data: {data}")
if argument == "upload-file":
try:
# TODO validate filename, content and size
# validate filename
if 'file' not in flask.request.files:
return {"UploadFileFail": "No file part in the request"}
st_file = flask.request.files['file']
print(st_file.filename)
st_file.save(f"st_files/{st_file.filename}")
return {"UploadFile": "Success"}
# validate file size
if st_file.content_length > 32 * 1024 * 1024: # 32 MB limit
return {"UploadFileFail": "File is too large"}
except:
return {"UploadFile": "Fail"}
elif argument == "compile-program":
if (openplc_runtime.status() == "Compiling"):
return {"RuntimeStatus": "Compiling"}
# replace program file on database
try:
database = "openplc.db"
conn = create_connection(database)
logger.info(f"{database} connected")
if (conn != None):
try:
cur = conn.cursor()
cur.execute("SELECT * FROM Programs WHERE Name = 'webserver_program'")
row = cur.fetchone()
cur.close()
except Exception as e:
return {"UploadFileFail": e}
except Exception as e:
return {"UploadFileFail": f"Error connecting to the database: {e}"}
try:
# TODO return compilation result and validate filename
# st_file = flask.request.args.get('file')
st_file = flask.request.files['file']
# print(f"st_files/{st_file.filename}")
openplc_runtime.compile_program(f"{st_file.filename}")
return {"CompilationStatus": "Program Compiled"}
filename = str(row[3])
st_file.save(f"st_files/{filename}")
except Exception as e:
return {"CompilationStatus": e}
return {"UploadFileFail": e}
if (openplc_runtime.status() == "Compiling"):
return {"RuntimeStatus": "Compiling"}
try:
openplc_runtime.compile_program(f"{filename}")
return {"CompilationStatus": "Starting program compilation"}
except Exception as e:
return {"CompilationStatusFail": e}
else:
return {"PostError": "Unknown argument"}
return {"PostRequestError": "Unknown argument"}
class User(flask_login.UserMixin):
@@ -2578,11 +2615,19 @@ def main():
print("Starting the web interface...")
def run_https():
# rest api register
# rest api register
app_restapi.register_blueprint(restapi_bp, url_prefix='/api')
register_callback_get(restapi_callback_get)
register_callback_post(restapi_callback_post)
with app_restapi.app_context():
try:
db.create_all()
db.session.commit()
print("Database tables created successfully.")
except Exception as e:
print(f"Error creating database tables: {e}")
try:
# CertGen class is used to generate SSL certificates and verify their validity
cert_gen = CertGen(hostname=HOSTNAME, ip_addresses=["127.0.0.1"])