Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ var/
*.egg-info/
.installed.cfg
*.egg
.venv

# PyInstaller
# Usually these files are written by a python script from a template
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# CHANGELOG

## 99.6.0

* Improve celery json logging. Include beat with separate log level options and testing.

## 99.5.2

* Make inheritence of annotations on SerialisedModel work on both the class and its instances
Expand Down
37 changes: 16 additions & 21 deletions notifications_utils/logging/celery.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,28 @@
import os
import warnings
from logging.config import dictConfig

from celery.signals import setup_logging

config = None

def set_up_logging(logger):
if logger is None or not hasattr(logger, "warning"):
raise AttributeError("The provided logger object is invalid.")

def custom_showwarning(message, category, filename, lineno, file=None, line=None):
log_entry = {
"level": "WARNING",
"message": str(message),
"category": category.__name__,
"filename": filename,
"lineno": lineno,
}
logger.warning(str(message), log_entry)

warnings.showwarning = custom_showwarning

def set_up_logging(conf):
global config
config = conf
# we connect to the setup_logging signal to configure logging during the worker startup
# and beat startup. If we don't do this and go directly to the setup_logging_connect
# we will not have some of the startup messages.
setup_logging.connect(setup_logging_connect)


def setup_logging_connect(*args, **kwargs):
log_level = os.environ.get("CELERY_LOG_LEVEL", "CRITICAL").upper()
if config is None:
raise ValueError("Configuration object is not set. Please call set_up_logging first.")

worker_log_level = config.get("CELERY_WORKER_LOG_LEVEL", "CRITICAL").upper()
beat_log_level = config.get("CELERY_BEAT_LOG_LEVEL", "INFO").upper()

# Override the default celery logger to use the JSON formatter
# We need to be very careful with the worker logger as it can leak PII data
LOGGING_CONFIG = {
"version": 1,
"disable_existing_loggers": False,
Expand All @@ -39,15 +35,14 @@ def setup_logging_connect(*args, **kwargs):
},
"handlers": {
"default": {
"level": log_level,
"formatter": "generic",
"class": "logging.StreamHandler",
"stream": "ext://sys.stdout", # Default is stderr
},
},
"loggers": {
"celery.worker": {"handlers": ["default"], "level": log_level, "propagate": True},
"celery.beat": {"handlers": ["default"], "level": "INFO", "propagate": True},
"celery.worker": {"handlers": ["default"], "level": worker_log_level, "propagate": False},
"celery.beat": {"handlers": ["default"], "level": beat_log_level, "propagate": False},
},
}

Expand Down
2 changes: 1 addition & 1 deletion notifications_utils/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@
# - `make version-minor` for new features
# - `make version-patch` for bug fixes

__version__ = "99.5.2" # ab51164dad67bba11aa71960e75e033e
__version__ = "99.6.0" # 1f7738ff685ca1c60ba5172f9d88b628
37 changes: 24 additions & 13 deletions tests/logging/dummy_celery_app.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,44 @@
import logging
import os
import signal
import tempfile
from datetime import timedelta

from celery import Celery
from pythonjsonlogger.json import JsonFormatter

import notifications_utils.logging.celery as celery_logging

logger = logging.getLogger()

logHandler = logging.StreamHandler()
formatter = JsonFormatter()
logHandler.setFormatter(formatter)
logger.addHandler(logHandler)
class Config:
CELERY_WORKER_LOG_LEVEL = os.getenv("CELERY_WORKER_LOG_LEVEL", "CRITICAL").upper()
CELERY_BEAT_LOG_LEVEL = os.getenv("CELERY_BEAT_LOG_LEVEL", "INFO").upper()

celery_logging.set_up_logging(logger)
def get(self, key, default=None):
return getattr(self, key, default)


celery_logging.set_up_logging(Config())

temp_dir = tempfile.mkdtemp()
app = Celery("test_app")
app.conf.update(
broker_url="memory://", # deliberate celery failure
broker_url="filesystem://",
broker_transport_options={
"data_folder_in": temp_dir,
"data_folder_out": temp_dir,
"control_folder": os.path.join(temp_dir, "control"),
},
beat_schedule_filename=os.path.join(temp_dir, "celerybeat-schedule.db"),
beat_schedule={
"test-task": {
"task": "test_task",
"schedule": timedelta(seconds=1), # Run every 1 seconds
}
},
)

WORKER_PID = os.getpid()


@app.task
@app.task(name="test_task")
def test_task():
os.kill(WORKER_PID, signal.SIGTERM)


test_task.delay()
118 changes: 66 additions & 52 deletions tests/logging/test_celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,70 +2,38 @@
import os
import pathlib
import subprocess
import warnings
from unittest.mock import MagicMock, patch
from unittest.mock import patch

import pytest

from notifications_utils.logging.celery import set_up_logging, setup_logging_connect
from notifications_utils.logging.celery import setup_logging_connect


def test_set_up_logging_success():
"""Test that set_up_logging correctly overrides warnings.showwarning."""
logger = MagicMock()
set_up_logging(logger)
class Config:
CELERY_WORKER_LOG_LEVEL = "CRITICAL"
CELERY_BEAT_LOG_LEVEL = "INFO"

# Assert that warnings.showwarning is overridden
assert warnings.showwarning != warnings._showwarning_orig

# Simulate a warning and check if it logs correctly
warnings.showwarning("Test message", UserWarning, "test_file.py", 42)
logger.warning.assert_called_once_with(
"Test message",
{
"level": "WARNING",
"message": "Test message",
"category": "UserWarning",
"filename": "test_file.py",
"lineno": 42,
},
)


def test_set_up_logging_missing_logger():
logger = None

with pytest.raises(AttributeError, match="The provided logger object is invalid."):
set_up_logging(logger)
def get(self, key, default=None):
return getattr(self, key, default)


@patch("notifications_utils.logging.celery.dictConfig")
@patch("notifications_utils.logging.celery.config", Config())
def test_setup_logging_connect_success(mock_dict_config):
"""Test that setup_logging_connect successfully configures logging."""

setup_logging_connect()

# Assert that dictConfig was called
mock_dict_config.assert_called_once()


def test_celery_dummy_logs(tmp_path):
command = [
"celery",
"--quiet",
"-A",
"dummy_celery_app",
"worker",
"--loglevel=INFO",
]

# Set the environment variable
env = os.environ.copy()
env["CELERY_LOG_LEVEL"] = "INFO"
def assert_command_has_outputs(tmp_path, command, filename, expected_messages, unexpected_messages=None, env=None):
if unexpected_messages is None:
unexpected_messages = []

# assemble an example celery app directory, able to access notifications_utils
# via a cwd link
(tmp_path / "notifications_utils").symlink_to(pathlib.Path(__file__).parent.parent.parent / "notifications_utils")
(tmp_path / "dummy_celery_app.py").symlink_to(pathlib.Path(__file__).parent / "dummy_celery_app.py")
(tmp_path / filename).symlink_to(pathlib.Path(__file__).parent / filename)

try:
# Start the Celery worker process
Expand All @@ -74,19 +42,13 @@ def test_celery_dummy_logs(tmp_path):
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
env=env,
cwd=tmp_path,
env=env,
)

stdout, stderr = process.communicate(timeout=10)
logs = stdout + stderr

expected_messages = [
"No hostname was supplied. Reverting to default 'localhost'",
"Connected to memory://localhost//",
"Task dummy_celery_app.test_task",
]

# Parse the logs as JSON and check the messages field contains the expected messages
for log_line in logs.splitlines():
try:
Expand All @@ -95,6 +57,10 @@ def test_celery_dummy_logs(tmp_path):
for message in expected_messages:
if message in log_message:
expected_messages.remove(message)
for bad_message in unexpected_messages:
assert bad_message not in log_message, (
f"Unexpected message found in logs: '{bad_message}'. Logs are:\n{logs}"
)
except json.JSONDecodeError:
continue

Expand All @@ -107,3 +73,51 @@ def test_celery_dummy_logs(tmp_path):
pytest.fail("Celery command not found. Ensure Celery is installed and in PATH.")
except Exception as e:
pytest.fail(f"Unexpected error occurred: {e}")


@pytest.mark.slow
def test_celery_dummy_logs(tmp_path):
command = ["celery", "--quiet", "-A", "dummy_celery_app", "worker", "-B"]

expected_messages = [
"Connected to filesystem://localhost//",
"beat: Starting...",
"Task test_task",
"Scheduler: Sending due task test-task",
"beat: Shutting down...",
]

env = os.environ.copy()
env["CELERY_WORKER_LOG_LEVEL"] = "INFO"
env["CELERY_BEAT_LOG_LEVEL"] = "INFO"
assert_command_has_outputs(tmp_path, command, "dummy_celery_app.py", expected_messages, env=env)


@pytest.mark.slow
def test_celery_worker_logs_absent(tmp_path):
command = [
"celery",
"--quiet",
"-A",
"dummy_celery_app",
"worker",
"-B",
]

expected_messages = [
"beat: Starting...",
"Scheduler: Sending due task test-task",
"beat: Shutting down...",
]

unexpected_messages = ["Connected to filesystem://localhost//", "Task test_task"]

env = os.environ.copy()
# test we aren't leaking celery worker logs when set to CRITICAL. They could contain PII
# or other sensitive data.
env["CELERY_WORKER_LOG_LEVEL"] = "CRITICAL"
env["CELERY_BEAT_LOG_LEVEL"] = "INFO"

assert_command_has_outputs(
tmp_path, command, "dummy_celery_app.py", expected_messages, unexpected_messages, env=env
)