Skip to content

Commit b6f3cf8

Browse files
authored
Merge pull request #1237 from alphagov/BC-celery-logging-mk-2
Celery Logging Update
2 parents 962c6e1 + 1ac276b commit b6f3cf8

File tree

6 files changed

+112
-87
lines changed

6 files changed

+112
-87
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ var/
2222
*.egg-info/
2323
.installed.cfg
2424
*.egg
25+
.venv
2526

2627
# PyInstaller
2728
# Usually these files are written by a python script from a template

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# CHANGELOG
22

3+
## 99.6.0
4+
5+
* Improve celery json logging. Include beat with separate log level options and testing.
6+
37
## 99.5.2
48

59
* Make inheritence of annotations on SerialisedModel work on both the class and its instances

notifications_utils/logging/celery.py

Lines changed: 16 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,28 @@
1-
import os
2-
import warnings
31
from logging.config import dictConfig
42

53
from celery.signals import setup_logging
64

5+
config = None
76

8-
def set_up_logging(logger):
9-
if logger is None or not hasattr(logger, "warning"):
10-
raise AttributeError("The provided logger object is invalid.")
11-
12-
def custom_showwarning(message, category, filename, lineno, file=None, line=None):
13-
log_entry = {
14-
"level": "WARNING",
15-
"message": str(message),
16-
"category": category.__name__,
17-
"filename": filename,
18-
"lineno": lineno,
19-
}
20-
logger.warning(str(message), log_entry)
21-
22-
warnings.showwarning = custom_showwarning
237

8+
def set_up_logging(conf):
9+
global config
10+
config = conf
11+
# we connect to the setup_logging signal to configure logging during the worker startup
12+
# and beat startup. If we don't do this and go directly to the setup_logging_connect
13+
# we will not have some of the startup messages.
2414
setup_logging.connect(setup_logging_connect)
2515

2616

2717
def setup_logging_connect(*args, **kwargs):
28-
log_level = os.environ.get("CELERY_LOG_LEVEL", "CRITICAL").upper()
18+
if config is None:
19+
raise ValueError("Configuration object is not set. Please call set_up_logging first.")
20+
21+
worker_log_level = config.get("CELERY_WORKER_LOG_LEVEL", "CRITICAL").upper()
22+
beat_log_level = config.get("CELERY_BEAT_LOG_LEVEL", "INFO").upper()
2923

24+
# Override the default celery logger to use the JSON formatter
25+
# We need to be very careful with the worker logger as it can leak PII data
3026
LOGGING_CONFIG = {
3127
"version": 1,
3228
"disable_existing_loggers": False,
@@ -39,15 +35,14 @@ def setup_logging_connect(*args, **kwargs):
3935
},
4036
"handlers": {
4137
"default": {
42-
"level": log_level,
4338
"formatter": "generic",
4439
"class": "logging.StreamHandler",
4540
"stream": "ext://sys.stdout", # Default is stderr
4641
},
4742
},
4843
"loggers": {
49-
"celery.worker": {"handlers": ["default"], "level": log_level, "propagate": True},
50-
"celery.beat": {"handlers": ["default"], "level": "INFO", "propagate": True},
44+
"celery.worker": {"handlers": ["default"], "level": worker_log_level, "propagate": False},
45+
"celery.beat": {"handlers": ["default"], "level": beat_log_level, "propagate": False},
5146
},
5247
}
5348

notifications_utils/version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,4 @@
55
# - `make version-minor` for new features
66
# - `make version-patch` for bug fixes
77

8-
__version__ = "99.5.2" # ab51164dad67bba11aa71960e75e033e
8+
__version__ = "99.6.0" # 1f7738ff685ca1c60ba5172f9d88b628

tests/logging/dummy_celery_app.py

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,44 @@
1-
import logging
21
import os
32
import signal
3+
import tempfile
4+
from datetime import timedelta
45

56
from celery import Celery
6-
from pythonjsonlogger.json import JsonFormatter
77

88
import notifications_utils.logging.celery as celery_logging
99

10-
logger = logging.getLogger()
1110

12-
logHandler = logging.StreamHandler()
13-
formatter = JsonFormatter()
14-
logHandler.setFormatter(formatter)
15-
logger.addHandler(logHandler)
11+
class Config:
12+
CELERY_WORKER_LOG_LEVEL = os.getenv("CELERY_WORKER_LOG_LEVEL", "CRITICAL").upper()
13+
CELERY_BEAT_LOG_LEVEL = os.getenv("CELERY_BEAT_LOG_LEVEL", "INFO").upper()
1614

17-
celery_logging.set_up_logging(logger)
15+
def get(self, key, default=None):
16+
return getattr(self, key, default)
1817

1918

19+
celery_logging.set_up_logging(Config())
20+
21+
temp_dir = tempfile.mkdtemp()
2022
app = Celery("test_app")
2123
app.conf.update(
22-
broker_url="memory://", # deliberate celery failure
24+
broker_url="filesystem://",
25+
broker_transport_options={
26+
"data_folder_in": temp_dir,
27+
"data_folder_out": temp_dir,
28+
"control_folder": os.path.join(temp_dir, "control"),
29+
},
30+
beat_schedule_filename=os.path.join(temp_dir, "celerybeat-schedule.db"),
31+
beat_schedule={
32+
"test-task": {
33+
"task": "test_task",
34+
"schedule": timedelta(seconds=1), # Run every 1 seconds
35+
}
36+
},
2337
)
2438

2539
WORKER_PID = os.getpid()
2640

2741

28-
@app.task
42+
@app.task(name="test_task")
2943
def test_task():
3044
os.kill(WORKER_PID, signal.SIGTERM)
31-
32-
33-
test_task.delay()

tests/logging/test_celery.py

Lines changed: 66 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -2,70 +2,38 @@
22
import os
33
import pathlib
44
import subprocess
5-
import warnings
6-
from unittest.mock import MagicMock, patch
5+
from unittest.mock import patch
76

87
import pytest
98

10-
from notifications_utils.logging.celery import set_up_logging, setup_logging_connect
9+
from notifications_utils.logging.celery import setup_logging_connect
1110

1211

13-
def test_set_up_logging_success():
14-
"""Test that set_up_logging correctly overrides warnings.showwarning."""
15-
logger = MagicMock()
16-
set_up_logging(logger)
12+
class Config:
13+
CELERY_WORKER_LOG_LEVEL = "CRITICAL"
14+
CELERY_BEAT_LOG_LEVEL = "INFO"
1715

18-
# Assert that warnings.showwarning is overridden
19-
assert warnings.showwarning != warnings._showwarning_orig
20-
21-
# Simulate a warning and check if it logs correctly
22-
warnings.showwarning("Test message", UserWarning, "test_file.py", 42)
23-
logger.warning.assert_called_once_with(
24-
"Test message",
25-
{
26-
"level": "WARNING",
27-
"message": "Test message",
28-
"category": "UserWarning",
29-
"filename": "test_file.py",
30-
"lineno": 42,
31-
},
32-
)
33-
34-
35-
def test_set_up_logging_missing_logger():
36-
logger = None
37-
38-
with pytest.raises(AttributeError, match="The provided logger object is invalid."):
39-
set_up_logging(logger)
16+
def get(self, key, default=None):
17+
return getattr(self, key, default)
4018

4119

4220
@patch("notifications_utils.logging.celery.dictConfig")
21+
@patch("notifications_utils.logging.celery.config", Config())
4322
def test_setup_logging_connect_success(mock_dict_config):
4423
"""Test that setup_logging_connect successfully configures logging."""
24+
4525
setup_logging_connect()
4626

4727
# Assert that dictConfig was called
4828
mock_dict_config.assert_called_once()
4929

5030

51-
def test_celery_dummy_logs(tmp_path):
52-
command = [
53-
"celery",
54-
"--quiet",
55-
"-A",
56-
"dummy_celery_app",
57-
"worker",
58-
"--loglevel=INFO",
59-
]
60-
61-
# Set the environment variable
62-
env = os.environ.copy()
63-
env["CELERY_LOG_LEVEL"] = "INFO"
31+
def assert_command_has_outputs(tmp_path, command, filename, expected_messages, unexpected_messages=None, env=None):
32+
if unexpected_messages is None:
33+
unexpected_messages = []
6434

65-
# assemble an example celery app directory, able to access notifications_utils
66-
# via a cwd link
6735
(tmp_path / "notifications_utils").symlink_to(pathlib.Path(__file__).parent.parent.parent / "notifications_utils")
68-
(tmp_path / "dummy_celery_app.py").symlink_to(pathlib.Path(__file__).parent / "dummy_celery_app.py")
36+
(tmp_path / filename).symlink_to(pathlib.Path(__file__).parent / filename)
6937

7038
try:
7139
# Start the Celery worker process
@@ -74,19 +42,13 @@ def test_celery_dummy_logs(tmp_path):
7442
stdout=subprocess.PIPE,
7543
stderr=subprocess.PIPE,
7644
text=True,
77-
env=env,
7845
cwd=tmp_path,
46+
env=env,
7947
)
8048

8149
stdout, stderr = process.communicate(timeout=10)
8250
logs = stdout + stderr
8351

84-
expected_messages = [
85-
"No hostname was supplied. Reverting to default 'localhost'",
86-
"Connected to memory://localhost//",
87-
"Task dummy_celery_app.test_task",
88-
]
89-
9052
# Parse the logs as JSON and check the messages field contains the expected messages
9153
for log_line in logs.splitlines():
9254
try:
@@ -95,6 +57,10 @@ def test_celery_dummy_logs(tmp_path):
9557
for message in expected_messages:
9658
if message in log_message:
9759
expected_messages.remove(message)
60+
for bad_message in unexpected_messages:
61+
assert bad_message not in log_message, (
62+
f"Unexpected message found in logs: '{bad_message}'. Logs are:\n{logs}"
63+
)
9864
except json.JSONDecodeError:
9965
continue
10066

@@ -107,3 +73,51 @@ def test_celery_dummy_logs(tmp_path):
10773
pytest.fail("Celery command not found. Ensure Celery is installed and in PATH.")
10874
except Exception as e:
10975
pytest.fail(f"Unexpected error occurred: {e}")
76+
77+
78+
@pytest.mark.slow
79+
def test_celery_dummy_logs(tmp_path):
80+
command = ["celery", "--quiet", "-A", "dummy_celery_app", "worker", "-B"]
81+
82+
expected_messages = [
83+
"Connected to filesystem://localhost//",
84+
"beat: Starting...",
85+
"Task test_task",
86+
"Scheduler: Sending due task test-task",
87+
"beat: Shutting down...",
88+
]
89+
90+
env = os.environ.copy()
91+
env["CELERY_WORKER_LOG_LEVEL"] = "INFO"
92+
env["CELERY_BEAT_LOG_LEVEL"] = "INFO"
93+
assert_command_has_outputs(tmp_path, command, "dummy_celery_app.py", expected_messages, env=env)
94+
95+
96+
@pytest.mark.slow
97+
def test_celery_worker_logs_absent(tmp_path):
98+
command = [
99+
"celery",
100+
"--quiet",
101+
"-A",
102+
"dummy_celery_app",
103+
"worker",
104+
"-B",
105+
]
106+
107+
expected_messages = [
108+
"beat: Starting...",
109+
"Scheduler: Sending due task test-task",
110+
"beat: Shutting down...",
111+
]
112+
113+
unexpected_messages = ["Connected to filesystem://localhost//", "Task test_task"]
114+
115+
env = os.environ.copy()
116+
# test we aren't leaking celery worker logs when set to CRITICAL. They could contain PII
117+
# or other sensitive data.
118+
env["CELERY_WORKER_LOG_LEVEL"] = "CRITICAL"
119+
env["CELERY_BEAT_LOG_LEVEL"] = "INFO"
120+
121+
assert_command_has_outputs(
122+
tmp_path, command, "dummy_celery_app.py", expected_messages, unexpected_messages, env=env
123+
)

0 commit comments

Comments
 (0)