Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 27 additions & 6 deletions app.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from flask import Flask, jsonify
import argparse
import traceback

from flask import Flask, jsonify, request
from flask_cors import CORS

from config import Config, limiter
Expand Down Expand Up @@ -56,11 +59,29 @@

@app.errorhandler(Exception)
def handle_exception(e):
return (
jsonify(error="Internal Server Error", message=extract_error_message(str(e))),
500,
)
# If the app is in debug mode, return the full traceback
if app.debug:
return (
jsonify(
error="Internal Server Error",
message=str(e),
type=type(e).__name__,
url=request.url,
traceback=traceback.format_exc().splitlines(),

Check warning

Code scanning / CodeQL

Information exposure through an exception Medium

Stack trace information
flows to this location and may be exposed to an external user.

Copilot Autofix

AI 10 months ago

To fix the problem, we should avoid exposing the stack trace to the end user, even in debug mode. Instead, we can log the stack trace on the server side for debugging purposes and return a generic error message to the user. This way, developers can still access the stack trace for debugging, but it will not be exposed to potential attackers.

  • Modify the handle_exception function to log the stack trace using a logging library and return a generic error message to the user.
  • Add the necessary import for the logging library.
  • Ensure that the stack trace is logged only when the application is in debug mode.
Suggested changeset 1
app.py

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/app.py b/app.py
--- a/app.py
+++ b/app.py
@@ -2,2 +2,3 @@
 import traceback
+import logging
 
@@ -61,16 +62,7 @@
 def handle_exception(e):
-    # If the app is in debug mode, return the full traceback
+    # Log the full traceback if the app is in debug mode
     if app.debug:
-        return (
-            jsonify(
-                error="Internal Server Error",
-                message=str(e),
-                type=type(e).__name__,
-                url=request.url,
-                traceback=traceback.format_exc().splitlines(),
-            ),
-            500,
-        )
+        app.logger.error("Exception occurred", exc_info=True)
 
-    # Otherwise, return a more user-friendly error message
+    # Return a more user-friendly error message
     error_message = extract_error_message(str(e))
EOF
@@ -2,2 +2,3 @@
import traceback
import logging

@@ -61,16 +62,7 @@
def handle_exception(e):
# If the app is in debug mode, return the full traceback
# Log the full traceback if the app is in debug mode
if app.debug:
return (
jsonify(
error="Internal Server Error",
message=str(e),
type=type(e).__name__,
url=request.url,
traceback=traceback.format_exc().splitlines(),
),
500,
)
app.logger.error("Exception occurred", exc_info=True)

# Otherwise, return a more user-friendly error message
# Return a more user-friendly error message
error_message = extract_error_message(str(e))
Copilot is powered by AI and may make mistakes. Always verify output.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: Add logging in a future update

),
500,
)

# Otherwise, return a more user-friendly error message
error_message = extract_error_message(str(e))
return jsonify(error="Internal Server Error", message=error_message), 500


if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000)
parser = argparse.ArgumentParser(description="Run the Flask application.")
parser.add_argument(
"--debug", action="store_true", help="Run the app in debug mode."
)
args = parser.parse_args()

app.run(host="0.0.0.0", port=5000, debug=args.debug)
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
argon2-cffi>=23.1.0
cryptography>=44.0.2
Flask>=3.0.3
Flask-JWT-Extended>=2.8.0
Flask-Limiter>=3.7.0
Expand Down
10 changes: 7 additions & 3 deletions routes/authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from pymysql import MySQLError

from config import limiter

from jwt_helper import (
TokenError,
extract_token_from_header,
Expand All @@ -13,6 +12,8 @@
)
from utility import (
database_cursor,
encrypt_email,
hash_email,
hash_password,
validate_password,
verify_password,
Expand All @@ -22,8 +23,10 @@


def login_person_by_email(email):
email_hash = hash_email(email)

with database_cursor() as cursor:
cursor.callproc("login_person_by_email", (email,))
cursor.callproc("login_person_by_email", (email_hash,))
return cursor.fetchone()


Expand All @@ -48,11 +51,12 @@ def register():
return jsonify(message="Password does not meet security requirements"), 400

hashed_password = hash_password(password)
email = hash_email(email), encrypt_email(email)

try:
with database_cursor() as cursor:
cursor.callproc(
"register_person", (name, email, hashed_password, language_code)
"register_person", (name, *email, hashed_password, language_code)
)
except MySQLError as e:
if "User name already exists" in str(e):
Expand Down
34 changes: 32 additions & 2 deletions routes/person.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,49 @@

from utility import (
database_cursor,
decrypt_email,
encrypt_email,
hash_email,
hash_password,
mask_email,
validate_password,
verify_password,
)

person_blueprint = Blueprint("person", __name__)


def login_person_by_id(person_id):
def login_person_by_id(person_id: int) -> dict:
with database_cursor() as cursor:
cursor.callproc("login_person_by_id", (person_id,))
return cursor.fetchone()


def mask_person_email(person: dict) -> None:
"""Mask the email address of a person safely."""
encrypted_email = person.get("encrypted_email")

if not encrypted_email:
person["email"] = "Unknown"
return

try:
person["email"] = mask_email(decrypt_email(encrypted_email))
except Exception:
person["email"] = "Decryption Error"

# Remove unreadable fields
person.pop("encrypted_email", None)
person.pop("hashed_password", None)


def update_person_in_db(person_id, name, email, hashed_password, locale_code):
email = hash_email(email), encrypt_email(email)

with database_cursor() as cursor:
cursor.callproc(
"update_person",
(person_id, name, email, hashed_password, locale_code),
(person_id, name, *email, hashed_password, locale_code),
)


Expand All @@ -30,6 +54,10 @@ def get_all_persons():
with database_cursor() as cursor:
cursor.callproc("get_all_persons")
persons = cursor.fetchall()

for person in persons:
mask_person_email(person)

return jsonify(persons)


Expand All @@ -38,6 +66,8 @@ def get_person_by_id(person_id):
with database_cursor() as cursor:
cursor.callproc("get_person_by_id", (person_id,))
person = cursor.fetchone()

mask_person_email(person)
return jsonify(person)


Expand Down
59 changes: 58 additions & 1 deletion utility.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
import base64
import hashlib
import os
import re
from contextlib import contextmanager
from re import match

from argon2 import PasswordHasher
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from dotenv import load_dotenv

from db import get_db_connection

load_dotenv()
ph = PasswordHasher()
AES_KEY = bytes.fromhex(os.getenv("AES_SECRET_KEY", os.urandom(32).hex()))
PEPPER = os.getenv("PEPPER", "SuperSecretPepper").encode("utf-8")


Expand All @@ -27,18 +33,69 @@ def database_cursor():
db.close()


def decrypt_email(encrypted_email: str) -> str:
"""Decrypts an AES-256 encrypted email."""
encrypted_data = base64.b64decode(encrypted_email)
iv, ciphertext = encrypted_data[:16], encrypted_data[16:]

cipher = Cipher(algorithms.AES(AES_KEY), modes.CBC(iv), backend=default_backend())
decryptor = cipher.decryptor()
decrypted_email = decryptor.update(ciphertext) + decryptor.finalize()

return decrypted_email.strip().decode()


def encrypt_email(email: str) -> str:
"""Encrypts an email using AES-256."""
iv = os.urandom(16) # Generate a random IV
cipher = Cipher(algorithms.AES(AES_KEY), modes.CBC(iv), backend=default_backend())
encryptor = cipher.encryptor()

# Pad email to 16-byte blocks
padded_email = email + (16 - len(email) % 16) * " "
ciphertext = encryptor.update(padded_email.encode()) + encryptor.finalize()

# Store IV + ciphertext (Base64 encoded)
return base64.b64encode(iv + ciphertext).decode()


def extract_error_message(message):
try:
return message.split(", ")[1].strip("()'")
cleaner_message = message.split(", ")[1].strip("()'")
return cleaner_message if "SQL" not in cleaner_message else "Database error"
except IndexError:
return "An unknown error occurred"


def hash_email(email: str) -> str:
"""Generate a SHA-256 hash of the email (used for fast lookup)."""
return hashlib.sha256(email.encode()).hexdigest()


def hash_password(password: str) -> tuple[str, bytes]:
peppered_password = password.encode("utf-8") + PEPPER
return ph.hash(peppered_password) # Argon2 applies salt automatically


def mask_email(email: str) -> str:
"""Masks the email address to protect user privacy."""
match = re.match(r"^([\w.+-]+)@([\w-]+)\.([a-zA-Z]{2,})$", email)
if not match:
raise ValueError("Invalid email format")

local_part, domain_name, domain_extension = match.groups()

# Mask the local part
if len(local_part) > 2:
local_part = local_part[0] + "*" * (len(local_part) - 2) + local_part[-1]

# Mask the domain name
if len(domain_name) > 2:
domain_name = domain_name[0] + "*" * (len(domain_name) - 2) + domain_name[-1]

return f"{local_part}@{domain_name}.{domain_extension}"


def validate_password(password):
"""
Validates a password based on the following criteria:
Expand Down