-
Notifications
You must be signed in to change notification settings - Fork 1.8k
storage: config: engine: Introduce dead letter queue #11000
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. WalkthroughAdds filesystem-backed Dead-Letter Queue (DLQ) support: new config fields and keys, storage API to quarantine rejected chunks, engine calls to invoke quarantine on specific output failure paths, filesystem implementation to write DLQ files, and unit tests covering DLQ scenarios. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant Engine as Engine (handle_output_event)
participant Task as Output Task / retry logic
participant StorageAPI as flb_storage_quarantine_chunk
participant CIO as CIO (source chunk)
participant Rej as Rejected Stream (DLQ FS)
rect rgb(245,250,255)
note over Engine,Task: Output failure detected (drop / retry create/schedule fail / error)
Engine->>Task: identify rejected chunk & input instance
alt DLQ enabled & FS backend available & chunk available
Engine->>StorageAPI: quarantine_chunk(ctx, ch, tag, status, out_name)
StorageAPI->>CIO: bring chunk up if needed / read content
StorageAPI->>Rej: get_or_create_rejected_stream()
StorageAPI->>Rej: open DLQ chunk (sanitized name), write, fsync, close
StorageAPI-->>Engine: return success / failure
else DLQ disabled or FS unavailable
Engine-->>Engine: skip quarantine (no-op)
end
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Potential hotspots to review:
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
e91922d to
4530987
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codex Review
Here are some automated review suggestions for this pull request.
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (3)
src/flb_storage.c (1)
750-785: Guard rejected-stream lazy creation against races; consider pre-creating at initThis function lazily creates and caches ctx->storage_rejected_stream with no synchronization. Concurrent calls (from multiple output workers) can race, leading to duplicate create attempts or a torn write to the cached pointer.
- Prefer pre-creating the rejected stream during flb_storage_create when storage_keep_rejected is enabled.
- If you keep lazy creation, add a lock or other synchronization around get/create/cache.
Also, sanitize the configured name: storage_rejected_path may contain path separators; pass a single safe stream name to ChunkIO.
If ChunkIO stream creation is guaranteed to be thread-safe and idempotent, please confirm; otherwise, adopt one of the above.
Example minimal sanitization within this function:
- struct cio_stream *st; - const char *name; + struct cio_stream *st; + const char *raw; + char name[256]; + size_t i; @@ - name = ctx->storage_rejected_path ? ctx->storage_rejected_path : "rejected"; + raw = ctx->storage_rejected_path ? ctx->storage_rejected_path : "rejected"; + snprintf(name, sizeof(name), "%s", raw); + name[sizeof(name) - 1] = '\0'; + for (i = 0; name[i] != '\0'; i++) { + if (name[i] == '/' || name[i] == '\\') { + name[i] = '_'; + } + }tests/internal/storage_dlq.c (2)
262-274: Use join_path for portability (Windows path separators)Hardcoding "/" may fail on Windows. Reuse join_path.
- snprintf(path, sizeof(path), "%s/%s", root, stream_name); - path[sizeof(path)-1] = '\0'; + join_path(path, sizeof(path), root, stream_name);
458-463: Avoid non-portable basename; implement a portable helperbasename() isn’t available on Windows and requires libgen.h on Unix. Use a small portable helper.
Apply this helper near other utilities:
+static const char *base_name_portable(const char *p) +{ + const char *s1 = strrchr(p, '/'); +#ifdef _WIN32 + const char *s2 = strrchr(p, '\\'); + const char *b = (s1 && s2) ? (s1 > s2 ? s1 : s2) : (s1 ? s1 : s2); +#else + const char *b = s1; +#endif + return b ? b + 1 : p; +}Then replace usage:
- /* get just the filename (basename) */ - strncpy(latest_copy, latest, sizeof(latest_copy)-1); - latest_copy[sizeof(latest_copy)-1] = '\0'; - base = basename(latest_copy); + /* get just the filename (basename) */ + base = base_name_portable(latest);And remove the now-unused latest_copy declaration:
- char latest_copy[1024];
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
include/fluent-bit/flb_config.h(2 hunks)include/fluent-bit/flb_storage.h(2 hunks)src/flb_config.c(3 hunks)src/flb_engine.c(5 hunks)src/flb_storage.c(1 hunks)tests/internal/CMakeLists.txt(1 hunks)tests/internal/storage_dlq.c(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (5)
src/flb_storage.c (1)
include/fluent-bit/flb_mem.h (1)
flb_free(126-128)
include/fluent-bit/flb_storage.h (1)
src/flb_storage.c (1)
flb_storage_quarantine_chunk(787-859)
src/flb_engine.c (5)
src/flb_input_chunk.c (1)
flb_input_chunk_get_tag(2174-2200)src/flb_sds.c (2)
flb_sds_create_len(58-76)flb_sds_destroy(389-399)src/flb_input.c (1)
flb_input_name(790-797)src/flb_output.c (1)
flb_output_name(1087-1094)src/flb_storage.c (1)
flb_storage_quarantine_chunk(787-859)
src/flb_config.c (1)
include/fluent-bit/flb_mem.h (1)
flb_free(126-128)
tests/internal/storage_dlq.c (2)
src/flb_config.c (2)
flb_config_exit(448-628)flb_config_init(232-446)src/flb_storage.c (1)
flb_storage_quarantine_chunk(787-859)
🔇 Additional comments (2)
src/flb_engine.c (1)
234-276: Solid defensive gating on DLQ handling.Short-circuiting when DLQ is disabled or the task lacks a chunk keeps us off the hot path unless quarantine is actually configured, which is exactly what we need here.
tests/internal/storage_dlq.c (1)
386-397: Confirm cio_qsort comparator usagePassing NULL as comparator assumes ChunkIO provides a sensible default. If not guaranteed, pass a comparator or drop the call.
8874e0d to
de96d72
Compare
Signed-off-by: Hiroshi Hatake <[email protected]>
caca209 to
e7a599b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/flb_engine.c(5 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
src/flb_engine.c (5)
src/flb_input_chunk.c (1)
flb_input_chunk_get_tag(3236-3253)src/flb_sds.c (2)
flb_sds_create_len(58-76)flb_sds_destroy(389-399)src/flb_input.c (1)
flb_input_name(790-797)src/flb_output.c (1)
flb_output_name(1099-1106)src/flb_storage.c (1)
flb_storage_quarantine_chunk(846-923)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (31)
- GitHub Check: pr-windows-build / call-build-windows-package (Windows 64bit (Arm64), amd64_arm64, -DCMAKE_SYSTEM_NAME=Windows -DCMA...
- GitHub Check: pr-windows-build / call-build-windows-package (Windows 64bit, x64, x64-windows-static, 3.31.6)
- GitHub Check: pr-windows-build / call-build-windows-package (Windows 32bit, x86, x86-windows-static, 3.31.6)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_MEMORY=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=Off, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_COMPILER_STRICT_POINTER_TYPES=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_ADDRESS=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=Off, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_COVERAGE=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_THREAD=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_ARROW=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_MEMORY=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_THREAD=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_UNDEFINED=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SMALL=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_ADDRESS=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_UNDEFINED=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=Off, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SMALL=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=Off, 3.31.6, gcc, g++)
- GitHub Check: PR - fuzzing test
- GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, gcc, g++, ubuntu-24.04, clang-14)
- GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, gcc, g++, ubuntu-22.04, clang-12)
- GitHub Check: pr-compile-without-cxx (3.31.6)
- GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, clang, clang++, ubuntu-22.04, clang-12)
- GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, clang, clang++, ubuntu-24.04, clang-14)
- GitHub Check: pr-compile-centos-7
Signed-off-by: Hiroshi Hatake <[email protected]>
1f91cf5 to
90acba1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
src/flb_engine.c (2)
257-259: Redundant null check foric.At line 257, the check
!icis redundant because line 251 already validates!task->icand returns early if null. Sinceicis assigned directly fromtask->icat line 255, it cannot be null at line 257. The check for!ic->chunkis necessary and should be kept.Apply this diff to remove the redundant check:
- if (!ic || !ic->chunk) { + if (!ic->chunk) { return; }
416-416: Consider passing meaningful status codes to DLQ.All four call sites pass
status_code = 0tohandle_dlq_if_available, which loses diagnostic context. Consider passing distinct codes to help operators differentiate rejection reasons:
- Line 416: No retry configured
- Line 464: Retry creation failed
- Line 518: Retry scheduling failed
- Line 556: General output error
This would enhance DLQ analysis and troubleshooting.
Also applies to: 464-464, 518-518, 556-556
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/flb_engine.c(5 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
src/flb_engine.c (5)
src/flb_input_chunk.c (1)
flb_input_chunk_get_tag(3236-3253)src/flb_sds.c (2)
flb_sds_create_len(58-76)flb_sds_destroy(389-399)src/flb_input.c (1)
flb_input_name(790-797)src/flb_output.c (1)
flb_output_name(1099-1106)src/flb_storage.c (1)
flb_storage_quarantine_chunk(846-923)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (28)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_MEMORY=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_THREAD=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_COMPILER_STRICT_POINTER_TYPES=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_ARROW=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_THREAD=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_MEMORY=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=Off, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=Off, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_COVERAGE=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_UNDEFINED=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_ADDRESS=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_ADDRESS=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_UNDEFINED=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=Off, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SMALL=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SMALL=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=Off, 3.31.6, clang, clang++)
- GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, clang, clang++, ubuntu-22.04, clang-12)
- GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, clang, clang++, ubuntu-24.04, clang-14)
- GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, gcc, g++, ubuntu-24.04, clang-14)
- GitHub Check: pr-compile-centos-7
- GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, gcc, g++, ubuntu-22.04, clang-12)
- GitHub Check: pr-compile-without-cxx (3.31.6)
- GitHub Check: PR - fuzzing test
🔇 Additional comments (1)
src/flb_engine.c (1)
235-280: Overall implementation looks solid; past critical issue resolved.The logic at line 262 correctly calls
flb_input_chunk_get_tagfirst and then validates the outputs, addressing the critical issue flagged in past reviews. The function properly handles resource cleanup and provides appropriate fallbacks.
This is because the current Fluent Bit mechanism does not offer for preserving invalid chunks for requesting via network.
So, if users encounter such behavior, the chunks will be just deleted and there is no clues to solve it.
Related to #9363.
Enter
[N/A]in the box, if an item is not applicable to your change.Testing
Before we can approve your change; please submit the following in a comment:
For example, this feature will be enabled with:
If this is a change to packaging of containers or native binaries then please confirm it works for all targets.
ok-package-testlabel to test for all targets (requires maintainer to do).Documentation
fluent/fluent-bit-docs#2166
Backporting
Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.
Summary by CodeRabbit
New Features
Tests