mirror of
https://github.com/thiagoralves/OpenPLC_v3.git
synced 2025-10-14 02:19:43 +08:00
[RTOP-26] PR Review Fix
This commit is contained in:
@@ -332,7 +332,7 @@ elif [ "$1" == "win_msys2" ]; then
|
||||
#Setting up venv
|
||||
python3 -m venv "$VENV_DIR"
|
||||
"$VENV_DIR/bin/python3" get-pip3.py
|
||||
"$VENV_DIR/bin/python3" -m pip install flask==2.3.3 werkzeug==2.3.7 flask-login==0.6.2 pyserial pymodbus==2.5.3
|
||||
"$VENV_DIR/bin/python3" -m pip install flask==2.3.3 werkzeug==2.3.7 flask-login==0.6.2 pyserial pymodbus==2.5.3 cryptography flask_jwt_extended flask_sqlalchemy python-dotenv
|
||||
|
||||
echo ""
|
||||
echo "[MATIEC COMPILER]"
|
||||
|
@@ -4,8 +4,10 @@ from flask_sqlalchemy import SQLAlchemy
|
||||
from flask_jwt_extended import create_access_token, current_user, jwt_required, JWTManager, verify_jwt_in_request, get_jwt
|
||||
from werkzeug.security import generate_password_hash, check_password_hash
|
||||
|
||||
import logging
|
||||
from typing import Callable, Optional
|
||||
import config
|
||||
|
||||
import os
|
||||
env = os.getenv("FLASK_ENV", "development")
|
||||
|
||||
@@ -16,6 +18,7 @@ if env == "production":
|
||||
else:
|
||||
app_restapi.config.from_object(config.DevConfig)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
restapi_bp = Blueprint('restapi_blueprint', __name__)
|
||||
_handler_callback_get: Optional[Callable[[str, dict], dict]] = None
|
||||
_handler_callback_post: Optional[Callable[[str, dict], dict]] = None
|
||||
@@ -47,12 +50,12 @@ class User(db.Model):
|
||||
password = password + app_restapi.config["PEPPER"]
|
||||
self.password_hash = generate_password_hash(password,
|
||||
method=self.derivation_method)
|
||||
# print(f"Password set for user {self.username} | {self.password_hash}")
|
||||
logger.debug(f"Password set for user {self.username} | {self.password_hash}")
|
||||
return self.password_hash
|
||||
|
||||
def check_password(self, password: str) -> bool:
|
||||
password = password + app_restapi.config["PEPPER"]
|
||||
# print(f"Checking password {self.password_hash} | {password}")
|
||||
logger.debug(f"Checking password {self.password_hash} | {password}")
|
||||
return check_password_hash(self.password_hash, password)
|
||||
|
||||
def to_dict(self):
|
||||
@@ -78,22 +81,14 @@ def register_callback_post(callback: Callable[[str, dict], dict]):
|
||||
_handler_callback_post = callback
|
||||
print("POST Callback registered successfully for rest_blueprint!")
|
||||
|
||||
# TODO implement role-based access control
|
||||
# For now, we will just check if the user is an admin
|
||||
# def is_admin():
|
||||
# return current_user.role == "admin"
|
||||
|
||||
@restapi_bp.route("/", methods=["POST"])
|
||||
def create_user():
|
||||
# TODO implement role-based access control
|
||||
# check if there are any users in the database
|
||||
try:
|
||||
users_exist = User.query.first() is not None
|
||||
except Exception as e:
|
||||
print(f"Error checking for users: {e}")
|
||||
return jsonify({"msg": "User creation error"}), 401
|
||||
# if users_exist and (not current_user or not is_admin()):
|
||||
# return jsonify({"msg": "Admin privileges required"}), 403
|
||||
|
||||
# if there are no users, we don't need to verify JWT
|
||||
if users_exist and verify_jwt_in_request(optional=True) is None:
|
||||
@@ -123,18 +118,12 @@ def create_user():
|
||||
@restapi_bp.route("/<int:user_id>", methods=["GET"])
|
||||
@jwt_required()
|
||||
def update_user(user_id):
|
||||
# TODO implement role-based access control
|
||||
# For now, we will just check if the user is an admin
|
||||
# if not is_admin():
|
||||
# return jsonify({"msg": "Admin privileges required"}), 403
|
||||
try:
|
||||
user = User.query.get(user_id)
|
||||
except Exception as e:
|
||||
print(f"Error retrieving user: {e}")
|
||||
return jsonify({"msg": "User retrieval error"}), 500
|
||||
|
||||
# if users_exist and (not current_user or not is_admin()):
|
||||
# return jsonify({"msg": "Admin privileges required"}), 403
|
||||
if not user:
|
||||
return jsonify({"msg": "User not found"}), 404
|
||||
|
||||
@@ -143,11 +132,6 @@ def update_user(user_id):
|
||||
# TODO List all users (Admin only)
|
||||
@restapi_bp.route("/", methods=["GET"])
|
||||
def list_users():
|
||||
# TODO implement role-based access control
|
||||
# For now, we will just check if the user is an admin
|
||||
# if not is_admin():
|
||||
# return jsonify({"msg": "Admin privileges required"}), 403
|
||||
#
|
||||
# If there are no users, we don't need to verify JWT
|
||||
try:
|
||||
verify_jwt_in_request()
|
||||
@@ -201,20 +185,13 @@ def change_password(user_id):
|
||||
return jsonify({"msg": f"Password for user {user.username} updated successfully"}), 200
|
||||
|
||||
# delete a user by ID
|
||||
# only authenticated users can delete a user
|
||||
# if user does not exist, return 404 Not Found
|
||||
# if user exists, delete the user and return 200 OK with a success message
|
||||
@restapi_bp.route("/<int:user_id>", methods=["DELETE"])
|
||||
@jwt_required()
|
||||
def delete_user(user_id):
|
||||
try:
|
||||
# TODO implement role-based access control
|
||||
# For now, we will just check if the user is an admin
|
||||
# if not is_admin():
|
||||
# return jsonify({"msg": "Admin privileges required"}), 403
|
||||
user = User.query.get(user_id)
|
||||
except Exception as e:
|
||||
print(f"Error retrieving user: {e}")
|
||||
logger.error(f"Error retrieving user: {e}")
|
||||
return jsonify({"msg": "User retrieval error"}), 500
|
||||
|
||||
if not user:
|
||||
@@ -222,9 +199,7 @@ def delete_user(user_id):
|
||||
|
||||
db.session.delete(user)
|
||||
db.session.commit()
|
||||
|
||||
revoke_jwt()
|
||||
|
||||
return jsonify({"msg": f"User {user.username} deleted successfully"}), 200
|
||||
|
||||
|
||||
@@ -240,15 +215,14 @@ def login():
|
||||
|
||||
try:
|
||||
user = User.query.filter_by(username=username).one_or_none()
|
||||
print(f"User found: {user}")
|
||||
logger.debug(f"User found: {user}")
|
||||
except Exception as e:
|
||||
print(f"Error retrieving user: {e}")
|
||||
logger.error(f"Error retrieving user: {e}")
|
||||
return jsonify({"msg": "User retrieval error"}), 500
|
||||
|
||||
if not user or not user.check_password(password):
|
||||
return jsonify("Wrong username or password"), 401
|
||||
|
||||
# TODO check to use as cookie
|
||||
access_token = create_access_token(identity=user)
|
||||
return jsonify(access_token=access_token)
|
||||
|
||||
@@ -257,9 +231,17 @@ def login():
|
||||
@jwt_required()
|
||||
def logout():
|
||||
revoke_jwt()
|
||||
|
||||
return jsonify({"msg": "User logged out successfully"}), 200
|
||||
|
||||
def revoke_jwt():
|
||||
jti = get_jwt()["jti"]
|
||||
try:
|
||||
# Add the JWT ID to the blacklist
|
||||
jwt_blacklist.add(jti)
|
||||
except Exception as e:
|
||||
logger.error(f"Error revoking JWT: {e}")
|
||||
|
||||
|
||||
@restapi_bp.route("/<command>", methods=["GET"])
|
||||
@jwt_required()
|
||||
def restapi_plc_get(command):
|
||||
@@ -268,11 +250,11 @@ def restapi_plc_get(command):
|
||||
|
||||
try:
|
||||
data = request.args.to_dict()
|
||||
|
||||
result = _handler_callback_get(command, data)
|
||||
return jsonify(result), 200
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error in restapi_plc_get: {e}")
|
||||
logger.error(f"Error in restapi_plc_get: {e}")
|
||||
return jsonify({"error": str(e)}), 500
|
||||
|
||||
|
||||
@@ -289,13 +271,5 @@ def restapi_plc_post(command):
|
||||
result = _handler_callback_post(command, data)
|
||||
return jsonify(result), 200
|
||||
except Exception as e:
|
||||
print(f"Error in restapi_plc_post: {e}")
|
||||
logger.error(f"Error in restapi_plc_post: {e}")
|
||||
return jsonify({"error": str(e)}), 500
|
||||
|
||||
def revoke_jwt():
|
||||
jti = get_jwt()["jti"]
|
||||
try:
|
||||
# Add the JWT ID to the blacklist
|
||||
jwt_blacklist.add(jti)
|
||||
except Exception as e:
|
||||
print(f"Error revoking JWT: {e}")
|
||||
|
@@ -69,10 +69,10 @@ def restapi_callback_get(argument: str, data: dict) -> dict:
|
||||
status = _logs
|
||||
if status is not str:
|
||||
_status = "No compilation in progress"
|
||||
if all(word in status for word in ["finished", "successfully"]):
|
||||
if "Compilation finished successfully!" in status:
|
||||
_status = "Success"
|
||||
_error = "No error"
|
||||
elif any(word in status for word in ["error", "Exception", "failed"]):
|
||||
elif "Compilation finished with errors!" in status:
|
||||
_status = "Error"
|
||||
_error = openplc_runtime.get_compilation_error()
|
||||
else:
|
||||
@@ -104,13 +104,11 @@ def restapi_callback_post(argument: str, data: dict) -> dict:
|
||||
if st_file.content_length > 32 * 1024 * 1024: # 32 MB limit
|
||||
return {"UploadFileFail": "File is too large"}
|
||||
|
||||
print(f"{st_file.filename}")
|
||||
|
||||
# replace program file on database
|
||||
try:
|
||||
database = "openplc.db"
|
||||
conn = create_connection(database)
|
||||
print(f"{database} connected")
|
||||
logger.info(f"{database} connected")
|
||||
if (conn != None):
|
||||
try:
|
||||
cur = conn.cursor()
|
||||
@@ -123,18 +121,13 @@ def restapi_callback_post(argument: str, data: dict) -> dict:
|
||||
return {"UploadFileFail": f"Error connecting to the database: {e}"}
|
||||
|
||||
filename = str(row[3])
|
||||
print(f"{filename} filename")
|
||||
|
||||
st_file.save(f"st_files/{filename}")
|
||||
print(f"{filename} saved")
|
||||
|
||||
|
||||
except Exception as e:
|
||||
return {"UploadFileFail": e}
|
||||
|
||||
# TODO Check if the runtime is compiling
|
||||
# if (openplc_runtime.status() == "Compiling"):
|
||||
# return {"RuntimeStatus": "Compiling"}
|
||||
if (openplc_runtime.status() == "Compiling"):
|
||||
return {"RuntimeStatus": "Compiling"}
|
||||
|
||||
try:
|
||||
openplc_runtime.compile_program(f"{filename}")
|
||||
@@ -180,7 +173,7 @@ def is_allowed_file(file):
|
||||
return False
|
||||
|
||||
def configure_runtime():
|
||||
# global openplc_runtime
|
||||
global openplc_runtime
|
||||
database = "openplc.db"
|
||||
conn = create_connection(database)
|
||||
if (conn != None):
|
||||
|
Reference in New Issue
Block a user