-
-
Notifications
You must be signed in to change notification settings - Fork 141
Fix #691: Add file locking to local results file saves to prevent race conditions #713
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Fix #691: Add file locking to local results file saves to prevent race conditions #713
Conversation
…nt race conditions - Wrap board.save(append=True) with file_lock() in _save() method - Use the same timeout as global saves (global_lock_timeout config) - Add comprehensive error handling for lock timeout scenarios - Add test to verify parallel saves work correctly without data loss This prevents data loss when multiple processes write to the same local results.csv file simultaneously in parallel execution scenarios.
for more information, see https://pre-commit.ci
WalkthroughAdds a file lock around the local results CSV write to prevent concurrent-process append races; lock acquisition includes a timeout and logs exceptions on timeout while allowing the process to continue saving the global results. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant Proc as Process
participant FS as Local FS (results.csv)
participant Lock as FileLock
participant Global as Global Scoreboard
Proc->>Lock: acquire(results.csv) with timeout
alt lock acquired
Lock->>FS: append row
FS-->>Lock: ack
Lock->>Proc: release
Proc->>Global: save global board
Global-->>Proc: ack
else timeout
Lock-->>Proc: TimeoutError
Proc->>Proc: log exception (timeout)
Proc->>Global: save global board (continue)
Global-->>Proc: ack
end
Estimated code review effort🎯 2 (Simple) | ⏱️ ~10 minutes
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: CHILL Plan: Pro 📒 Files selected for processing (1)
🧰 Additional context used🧬 Code graph analysis (1)tests/unit/amlb/test_results_race_condition.py (1)
🪛 Ruff (0.14.5)tests/unit/amlb/test_results_race_condition.py113-113: Unused function argument: (ARG001) 113-113: Unused function argument: (ARG001) 114-114: Avoid specifying long messages outside the exception class (TRY003) Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
amlb/benchmark.py(1 hunks)tests/unit/amlb/test_results_race_condition.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
tests/unit/amlb/test_results_race_condition.py (2)
amlb/results.py (5)
Scoreboard(82-349)append(312-327)save(308-310)all(86-87)as_data_frame(189-237)amlb/utils/process.py (1)
file_lock(35-49)
amlb/benchmark.py (3)
amlb/data.py (1)
path(120-121)amlb/results.py (3)
path(330-349)save(308-310)append(312-327)amlb/utils/process.py (1)
file_lock(35-49)
🪛 Ruff (0.14.5)
tests/unit/amlb/test_results_race_condition.py
115-115: Local variable original_file_lock is assigned to but never used
Remove assignment to unused variable original_file_lock
(F841)
117-117: Unused function argument: args
(ARG001)
117-117: Unused function argument: kwargs
(ARG001)
118-118: Avoid specifying long messages outside the exception class
(TRY003)
🔇 Additional comments (1)
amlb/benchmark.py (1)
474-489: Local results locking matches the intended fix; just confirm the timeout exception mappingThe new
_saveimplementation correctly wraps the localboard.save(append=True)infile_lock(local_path, timeout=rconfig().results.global_lock_timeout), mirroring the existing global lock behavior and addressing the race condition described in #691. Swallowing onlyTimeoutErrorand proceeding to_save_global(board)is a reasonable trade‑off: the run is still reflected in the global board, while the log message clearly calls out that local results may be incomplete.One thing to double‑check: ensure that the
filelockversion you depend on still raises an exception type that subclassesTimeoutErrorfor lock‑acquisition failures so that thisexcept TimeoutError:block is reliably triggered. If that ever changes, you may want to catch the library’s specific timeout class instead.
| def test_parallel_save_no_race_condition(): | ||
| """Test that multiple parallel saves don't cause data loss due to race conditions.""" | ||
| # Create a temporary directory for test results | ||
| with tempfile.TemporaryDirectory() as tmpdir: | ||
| scores_dir = Path(tmpdir) / "scores" | ||
| scores_dir.mkdir() | ||
|
|
||
| # Create multiple scoreboards with different data | ||
| num_parallel_saves = 10 | ||
| scoreboards = [] | ||
| for i in range(num_parallel_saves): | ||
| # Create a simple score entry for each iteration | ||
| score_data = { | ||
| "id": f"test_task_{i}", | ||
| "task": f"task_{i}", | ||
| "framework": "test_framework", | ||
| "constraint": "test", | ||
| "fold": i, | ||
| "type": "classification", | ||
| "result": 0.9 + i * 0.001, | ||
| "metric": "accuracy", | ||
| "mode": "local", | ||
| "version": "1.0", | ||
| "params": "", | ||
| "app_version": "1.0", | ||
| "utc": "2025-01-01T00:00:00", | ||
| "duration": 100.0, | ||
| "training_duration": 80.0, | ||
| "predict_duration": 5.0, | ||
| "models_count": 1, | ||
| "seed": i, | ||
| "info": "", | ||
| } | ||
| board = Scoreboard(scores=[score_data], scores_dir=str(scores_dir)) | ||
| scoreboards.append(board) | ||
|
|
||
| # Function to save a scoreboard (will be run in parallel) | ||
| def save_board(board): | ||
| board.save(append=True) | ||
|
|
||
| # Save all scoreboards in parallel using ThreadPoolExecutor | ||
| # This simulates the race condition scenario | ||
| with ThreadPoolExecutor(max_workers=num_parallel_saves) as executor: | ||
| futures = [executor.submit(save_board, board) for board in scoreboards] | ||
| # Wait for all to complete | ||
| for future in futures: | ||
| future.result() | ||
|
|
||
| # Load the results and verify all data was saved | ||
| result_board = Scoreboard.all(scores_dir=str(scores_dir)) | ||
| result_df = result_board.as_data_frame() | ||
|
|
||
| # Check that we have all the expected rows | ||
| assert len(result_df) == num_parallel_saves, ( | ||
| f"Expected {num_parallel_saves} rows, but got {len(result_df)}" | ||
| ) | ||
|
|
||
| # Check that all task IDs are present | ||
| expected_task_ids = {f"test_task_{i}" for i in range(num_parallel_saves)} | ||
| actual_task_ids = set(result_df["id"].values) | ||
| assert expected_task_ids == actual_task_ids, ( | ||
| f"Missing task IDs: {expected_task_ids - actual_task_ids}" | ||
| ) | ||
|
|
||
| # Check that all folds are present and unique | ||
| expected_folds = set(range(num_parallel_saves)) | ||
| actual_folds = set(result_df["fold"].values) | ||
| assert expected_folds == actual_folds, ( | ||
| f"Missing folds: {expected_folds - actual_folds}" | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Parallel‑save test doesn’t exercise the new Benchmark._save locking path
test_parallel_save_no_race_condition is currently calling Scoreboard.save(append=True) directly from multiple threads, without going through Benchmark._save or file_lock. That means:
- The behavior under test is concurrent
Scoreboard.saveon a sharedresults.csv, not the new file‑locking logic added inBenchmark._save. - This test will pass (or fail) independently of the change in
amlb/benchmark.py, so it doesn’t really validate the race‑condition fix for local benchmark results.
If the goal is to guard against regressions for issue #691, consider restructuring this test so it actually exercises the _save path that now wraps board.save with file_lock—for example, by invoking Benchmark._save (possibly through a thin helper or a minimal Benchmark instance) in parallel, or by centralizing the “save local results” behavior in a function that both production code and the test can call.
| def test_save_with_file_lock_timeout(mocker): | ||
| """Test that file lock timeout is handled gracefully.""" | ||
| with tempfile.TemporaryDirectory() as tmpdir: | ||
| scores_dir = Path(tmpdir) / "scores" | ||
| scores_dir.mkdir() | ||
|
|
||
| score_data = { | ||
| "id": "test_task", | ||
| "task": "task", | ||
| "framework": "test_framework", | ||
| "constraint": "test", | ||
| "fold": 0, | ||
| "type": "classification", | ||
| "result": 0.9, | ||
| "metric": "accuracy", | ||
| "mode": "local", | ||
| "version": "1.0", | ||
| "params": "", | ||
| "app_version": "1.0", | ||
| "utc": "2025-01-01T00:00:00", | ||
| "duration": 100.0, | ||
| "training_duration": 80.0, | ||
| "predict_duration": 5.0, | ||
| "models_count": 1, | ||
| "seed": 0, | ||
| "info": "", | ||
| } | ||
| board = Scoreboard(scores=[score_data], scores_dir=str(scores_dir)) | ||
|
|
||
| # Mock file_lock to raise TimeoutError | ||
| from amlb.utils import process | ||
|
|
||
| original_file_lock = process.file_lock | ||
|
|
||
| def mock_file_lock(*args, **kwargs): | ||
| raise TimeoutError("Lock timeout") | ||
|
|
||
| mocker.patch("amlb.utils.process.file_lock", side_effect=mock_file_lock) | ||
|
|
||
| # The save should handle the timeout gracefully (not crash) | ||
| # Note: This tests the Scoreboard.save_df method behavior | ||
| # The actual file lock integration is tested in benchmark._save() | ||
| try: | ||
| # In the actual implementation, this would be wrapped with file_lock | ||
| # Here we're just testing that the code structure allows for error handling | ||
| board.save(append=True) | ||
| except TimeoutError: | ||
| # Expected to potentially raise, but shouldn't crash the process | ||
| pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Timeout test doesn’t hit the lock path and has a few cleanups to address
In test_save_with_file_lock_timeout there are a couple of issues:
-
Lock behavior is not actually exercised:
You patchamlb.utils.process.file_lockto raiseTimeoutError, but then only callboard.save(append=True). Based on the provided snippets,Scoreboard.save/save_dfdo not callfile_lock, so the patched function is never used andTimeoutErroris never raised. The try/except block has no assertions, so the test will pass regardless of how locking or timeout handling is implemented inBenchmark._save.If the intent is to test the new timeout‑handling branch in
Benchmark._save, you likely want to:- Patch the symbol actually used there (e.g.,
amlb.benchmark.file_lock) to raiseTimeoutError, and - Call
Benchmark._save(board)(or a small wrapper) and assert on behavior, e.g., that_save_globalis still invoked (via a mock) and that an appropriate log entry is emitted.
- Patch the symbol actually used there (e.g.,
-
Static‑analysis cleanups:
original_file_lockis assigned but never used and can be removed.mock_file_lock(*args, **kwargs)triggers unused‑arg warnings; you can rename to*_args, **_kwargsor accept no arguments.- The explicit
"Lock timeout"message insideTimeoutErroris fine, but if you want to adhere to TRY003 you could rely on the default message or shorten/standardize it.
You don’t have to wire up all of the logging assertions right now, but it would be good to at least ensure this test meaningfully exercises the timeout path introduced in Benchmark._save rather than only verifying that Scoreboard.save doesn’t crash.
🧰 Tools
🪛 Ruff (0.14.5)
115-115: Local variable original_file_lock is assigned to but never used
Remove assignment to unused variable original_file_lock
(F841)
117-117: Unused function argument: args
(ARG001)
117-117: Unused function argument: kwargs
(ARG001)
118-118: Avoid specifying long messages outside the exception class
(TRY003)
🤖 Prompt for AI Agents
In tests/unit/amlb/test_results_race_condition.py around lines 83-131, the test
patches amlb.utils.process.file_lock but never exercises the code path that uses
that lock (Benchmark._save), so the TimeoutError is never raised and the test is
ineffective; fix by patching the actual symbol used in the save path (e.g.,
patch "amlb.benchmark.file_lock" to raise TimeoutError), invoke the function
that uses the lock (call Benchmark._save(board) or a small wrapper that triggers
the benchmark save path), assert the expected behavior (mock
Benchmark._save_global to verify it was still invoked or that the timeout was
handled, and/or assert a log call), and remove unused variables/unused-arg
warnings (drop original_file_lock and change mock_file_lock signature to accept
*_args, **_kwargs).
Description
This PR fixes issue #691 by adding file locking to local results file writes, preventing race conditions when multiple processes write to the same
results.csvfile simultaneously.Problem
When running AMLB in local mode in parallel (e.g., on a cluster or large machine with NFS), multiple processes could try to append to the same local file (
session_dir/scores/results.csv) at the same time. This caused one of the append operations to be dropped, resulting in data loss.The global results file was already protected with file locking, but the local results file was not.
Solution
file_lock()protection aroundboard.save(append=True)in the_save()methodglobal_lock_timeoutconfiguration (default 5 seconds)Changes
_save()method to wrap local file save with file lockingTesting
The test simulates 10 parallel saves to the same results file and verifies:
Fixes #691
Summary by CodeRabbit
Bug Fixes
Tests