-
Notifications
You must be signed in to change notification settings - Fork 1.7k
out_s3: add Blob handling support (rebase of #9907) #10675
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Signed-off-by: Leonardo Alminana <[email protected]>
Signed-off-by: Leonardo Alminana <[email protected]>
Signed-off-by: Leonardo Alminana <[email protected]>
Signed-off-by: Leonardo Alminana <[email protected]>
Signed-off-by: Leonardo Alminana <[email protected]>
Signed-off-by: Leonardo Alminana <[email protected]>
Signed-off-by: Leonardo Alminana <[email protected]>
Signed-off-by: Leonardo Alminana <[email protected]>
Signed-off-by: Leonardo Alminana <[email protected]>
Signed-off-by: Leonardo Alminana <[email protected]>
Signed-off-by: Leonardo Alminana <[email protected]>
Signed-off-by: Leonardo Alminana <[email protected]>
Signed-off-by: Leonardo Alminana <[email protected]>
Signed-off-by: Leonardo Alminana <[email protected]>
Signed-off-by: Leonardo Alminana <[email protected]>
Signed-off-by: Leonardo Alminana <[email protected]>
Signed-off-by: Leonardo Alminana <[email protected]>
Signed-off-by: Leonardo Alminana <[email protected]>
WalkthroughThis update introduces comprehensive support for blob storage and multipart uploads with pre-signed URLs in the S3 output plugin. It adds a new blob database backend, enhances thread safety in the SQLite wrapper, updates S3 multipart upload functions to accept pre-signed URLs, and integrates new configuration options and worker lifecycle callbacks. Supporting header and build files are updated accordingly. Changes
Sequence Diagram(s)sequenceDiagram
participant Worker as S3 Worker
participant BlobDB as Blob Database
participant AuthEP as Auth Endpoint
participant S3 as S3 Service
Worker->>BlobDB: Register blob file & parts
Worker->>AuthEP: Request pre-signed URL (PutObject/Multipart)
AuthEP-->>Worker: Return pre-signed URL
Worker->>S3: Upload part/object using pre-signed URL
S3-->>Worker: Upload response
Worker->>BlobDB: Update part/file state
alt All parts uploaded
Worker->>AuthEP: Request complete pre-signed URL
AuthEP-->>Worker: Return complete URL
Worker->>S3: Complete multipart upload
S3-->>Worker: Completion response
Worker->>BlobDB: Mark file as uploaded
else Abort needed
Worker->>AuthEP: Request abort pre-signed URL
AuthEP-->>Worker: Return abort URL
Worker->>S3: Abort multipart upload
S3-->>Worker: Abort response
Worker->>BlobDB: Mark file as aborted
end
Estimated code review effort🎯 5 (Critical) | ⏱️ ~90 minutes Poem
Note ⚡️ Unit Test Generation is now available in beta!Learn more here, or try it out under "Finishing Touches" below. ✨ Finishing Touches
🧪 Generate unit tests
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 12
🔭 Outside diff range comments (2)
src/flb_sqldb.c (2)
63-86
: Potential race condition in shared database lookupThe database lookup and user counter increment (lines 63-86) are not protected by locks. Multiple threads could potentially find the same database and increment the users counter simultaneously, leading to incorrect reference counting.
Consider protecting the shared database lookup with appropriate locking:
+ /* Lock before searching the shared database list */ + flb_lock_acquire(&some_global_lock, FLB_LOCK_INFINITE_RETRY_LIMIT, FLB_LOCK_DEFAULT_RETRY_DELAY); + mk_list_foreach(head, &config->sqldb_list) { db_temp = mk_list_entry(head, struct flb_sqldb, _head); // ... existing lookup code ... } /* Found a database that can be shared */ if (db_temp) { /* Increase users counter */ db_temp->users++; // ... rest of the code ... } + + flb_lock_release(&some_global_lock, FLB_LOCK_INFINITE_RETRY_LIMIT, FLB_LOCK_DEFAULT_RETRY_DELAY);
133-140
: Thread safety issue with shared database user counterThe decrement of
parent->users
at line 135 is not thread-safe. Multiple threads could decrement this counter simultaneously, potentially leading to incorrect reference counting and premature database closure.The parent database's lock should be acquired before modifying the users counter:
if (db->shared == FLB_TRUE) { parent = db->parent; + flb_sqldb_lock(parent); parent->users--; + flb_sqldb_unlock(parent); }
🧹 Nitpick comments (7)
plugins/out_s3/s3_multipart.c (1)
495-499
: Duplicate error messageThe error message at line 497 is identical to the one in complete_multipart_upload. Consider making it specific to abort operation.
- flb_plg_error(ctx->ins, "Cannot complete multipart upload for key %s: " + flb_plg_error(ctx->ins, "Cannot abort multipart upload for key %s: " "upload ID is unset ", m_upload->s3_key);plugins/out_s3/s3.h (1)
127-140
: Consider struct field organization for better memory alignmentThe new fields mix different data types (pointers, sizes, times) which might lead to suboptimal memory alignment and padding.
Consider grouping fields by type for better memory alignment:
+ /* Blob database fields */ struct flb_blob_db blob_db; flb_sds_t blob_database_file; + + /* Size and time fields */ size_t part_size; time_t upload_parts_timeout; time_t upload_parts_freshness_threshold; + + /* Delivery attempt limits */ int file_delivery_attempt_limit; int part_delivery_attempt_limit; + + /* Authorization endpoint fields */ flb_sds_t authorization_endpoint_url; flb_sds_t authorization_endpoint_username; flb_sds_t authorization_endpoint_password; flb_sds_t authorization_endpoint_bearer_token; struct flb_upstream *authorization_endpoint_upstream; struct flb_tls *authorization_endpoint_tls_context;src/flb_blob_db.c (2)
359-384
: Lock held during entire database operationThe function holds the database lock for the entire duration of the SQLite operation. While this ensures thread safety, it might impact performance under high concurrency.
Consider using SQLite's built-in thread-safe mode (SQLITE_OPEN_FULLMUTEX) instead of manual locking for better granularity. This would allow multiple readers to access the database simultaneously.
630-658
: Complex error handling with multiple allocationsThe function has complex error handling with multiple string allocations that need cleanup. The current implementation is correct but could be simplified.
Consider using a cleanup label pattern to reduce code duplication:
+cleanup: + if (exists == -1) { + if (*path != NULL) { + cfl_sds_destroy(*path); + *path = NULL; + } + if (*remote_id != NULL) { + cfl_sds_destroy(*remote_id); + *remote_id = NULL; + } + if (*tag != NULL) { + cfl_sds_destroy(*tag); + *tag = NULL; + } + *id = 0; + } + flb_sqldb_unlock(context->db); - - if (exists == -1) { - // ... existing cleanup code ... - } - + return exists;include/fluent-bit/flb_blob_db.h (3)
27-40
: Consider adding indexes for query performance.The
blob_files
table schema is well-structured. However, based on the query patterns in the file (e.g., queries filtering byaborted
,last_delivery_attempt
,created
), consider adding indexes to improve query performance.Add these indexes after the table creation:
CREATE INDEX IF NOT EXISTS idx_blob_files_aborted ON blob_files(aborted); CREATE INDEX IF NOT EXISTS idx_blob_files_created ON blob_files(created); CREATE INDEX IF NOT EXISTS idx_blob_files_last_delivery ON blob_files(last_delivery_attempt); CREATE INDEX IF NOT EXISTS idx_blob_files_path ON blob_files(path);
173-183
: Fix typo and formatting in comment.The comment has a typo and formatting issues.
/* * Query to retrieve the oldest file which all it parts are mark as uploaded, this * query will group the results in a single record, e.g: * -* path part_ids +* path part_ids * ---------------- ---------- ------------------------------------------------------------ * /.../alice29.txt 1726423769 0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18, * 19,20,21,22,23,24,25,26,27,28,29,30 * - * this query is used to compose + * This query is used to compose the complete multipart upload request. */
242-244
: Fix typo in comment.-/* These errors are highly speciifc and thus client code should be able to +/* These errors are highly specific and thus client code should be able to * range check them. */
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (12)
include/fluent-bit/flb_aws_util.h
(1 hunks)include/fluent-bit/flb_blob_db.h
(1 hunks)include/fluent-bit/flb_sqldb.h
(3 hunks)plugins/in_blob/blob_db.c
(1 hunks)plugins/out_s3/s3.c
(18 hunks)plugins/out_s3/s3.h
(4 hunks)plugins/out_s3/s3_multipart.c
(7 hunks)src/CMakeLists.txt
(1 hunks)src/aws/flb_aws_util.c
(1 hunks)src/flb_blob_db.c
(1 hunks)src/flb_signv4.c
(2 hunks)src/flb_sqldb.c
(4 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (28)
- GitHub Check: pr-windows-build / call-build-windows-package (Windows 64bit (Arm64), amd64_arm64, -DCMAKE_SYSTEM_NAME=Windows -DCMA...
- GitHub Check: pr-windows-build / call-build-windows-package (Windows 64bit, x64, x64-windows-static, 3.31.6)
- GitHub Check: pr-windows-build / call-build-windows-package (Windows 32bit, x86, x86-windows-static, 3.31.6)
- GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, gcc, g++, ubuntu-24.04, clang-14)
- GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, clang, clang++, ubuntu-24.04, clang-14)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_MEMORY=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_COVERAGE=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SMALL=On, 3.31.6, clang, clang++)
- GitHub Check: pr-compile-centos-7
- GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, clang, clang++, ubuntu-22.04, clang-12)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=Off, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_ADDRESS=On, 3.31.6, gcc, g++)
- GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, gcc, g++, ubuntu-22.04, clang-12)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=Off, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_THREAD=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_ADDRESS=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_THREAD=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_MEMORY=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=Off, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_UNDEFINED=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_UNDEFINED=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SMALL=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=Off, 3.31.6, gcc, g++)
- GitHub Check: PR - fuzzing test
🔇 Additional comments (23)
plugins/in_blob/blob_db.c (1)
5-5
: LGTM! Copyright year updated appropriately.The copyright year has been correctly updated to include 2025, which is appropriate for code being added in the current year.
src/CMakeLists.txt (1)
95-95
: LGTM! Properly integrated new blob database source file.The addition of
flb_blob_db.c
to the core source files is correctly placed and follows the existing organizational pattern in the CMake configuration.include/fluent-bit/flb_aws_util.h (2)
195-195
: LGTM! Minor formatting improvement.The comment formatting has been cleaned up appropriately.
199-201
: LGTM! Well-designed blob key generation function.The new
flb_get_s3_blob_key
function declaration follows consistent naming conventions and provides appropriate parameters for blob-specific S3 key generation. This complements the existingflb_get_s3_key
function for time-based keys.src/flb_signv4.c (2)
437-437
: LGTM! Minor whitespace cleanup.Removed unnecessary trailing whitespace.
625-627
: LGTM! Proper HTTP DELETE method support added.The DELETE method has been correctly implemented in the canonical request generation, following the same pattern as other HTTP methods. This supports multipart upload operations that may require DELETE requests for aborting uploads.
include/fluent-bit/flb_sqldb.h (3)
25-25
: LGTM! Proper include for thread safety.Adding the
flb_lock.h
include is necessary to support the new locking mechanism.
34-34
: LGTM! Thread safety enhancement.Adding a lock member to the
flb_sqldb
structure is a good design choice for ensuring thread-safe access to SQLite databases.
48-50
: LGTM! Consistent lock management API.The
flb_sqldb_lock
andflb_sqldb_unlock
function declarations provide a clean API for managing database access synchronization. This follows standard locking patterns and will enable safe concurrent access to SQLite databases.src/flb_sqldb.c (1)
38-44
: Good use of zero-initializationThe change from
flb_malloc
toflb_calloc
ensures all fields are zero-initialized, which is a defensive programming practice that prevents uninitialized memory issues.src/aws/flb_aws_util.c (2)
988-1007
: Comprehensive error handlingThe error handling block properly cleans up all allocated resources with appropriate NULL checks. This is a good example of defensive programming.
833-841
: Memory leak on early returnThe function allocates
tmp_tag
but doesn't free it if the allocation fails. The error path should be used instead of direct return.tmp_tag = flb_sds_create_len(tag, strlen(tag)); if(!tmp_tag){ - goto error; + flb_errno(); + return NULL; }Actually, the current code is correct - it uses
goto error
which will handle cleanup. No change needed.plugins/out_s3/s3_multipart.c (2)
337-337
: Sufficient buffer size for part numbersThe buffer size of 11 characters is adequate for storing the maximum part number (10,000) as a string, including the null terminator.
430-437
: No memory leak: flb_sds_copy correctly grows and reuses the bufferThe
flb_sds_copy
implementation insrc/flb_sds.c
automatically callsflb_sds_increase
when the existing capacity is insufficient, returns the (possibly reallocated) SDS pointer, and the caller assigns and later destroys it. Inplugins/out_s3/s3_multipart.c
, the code:
- Allocates
uri
withflb_sds_create_size(...)
- Calls
tmp = flb_sds_copy(uri, pre_signed_url, strlen(pre_signed_url));
- Checks
!tmp
and cleans up on error- Assigns
uri = tmp
and eventually callsflb_sds_destroy(uri)
ensures no buffers are leaked or overwritten.
You can safely ignore the original memory-leak concern.
Likely an incorrect or invalid review comment.
plugins/out_s3/s3.h (1)
47-48
: MAX_FILE_SIZE_PUT_OBJECT constant formattingThe constant value would be more readable with underscores for thousand separators, consistent with other size constants in the file.
-#define MAX_FILE_SIZE_PUT_OBJECT 1000000000 +#define MAX_FILE_SIZE_PUT_OBJECT 1_000_000_000Actually, C doesn't support digit separators. The current format is fine.
src/flb_blob_db.c (2)
1400-1561
: Stub implementation for non-SQLite buildsThe stub implementations correctly return error codes when SQLite is not available. This provides good compile-time compatibility.
1308-1356
: Potential buffer overflow in remote_id_list iterationThe cleanup loop in the error path has an off-by-one error. When
remote_id_index
equals the size, the first checkwhile (remote_id_index > 0)
would skip index 0, but then it's cleaned up separately. However, if we break atremote_id_index == 0
, the separate cleanup would cause a double-free.Fix the cleanup logic:
if (result == -1) { - while (remote_id_index > 0) { - if (remote_id_list[remote_id_index] != NULL) { - flb_sds_destroy(remote_id_list[remote_id_index]); - } - remote_id_index--; - } - - if (remote_id_list[remote_id_index] != NULL) { - flb_sds_destroy(remote_id_list[remote_id_index]); + for (size_t i = 0; i < remote_id_index; i++) { + if (remote_id_list[i] != NULL) { + flb_sds_destroy(remote_id_list[i]); + } } memset(remote_id_list, 0, sizeof(cfl_sds_t) * remote_id_list_size); }Likely an incorrect or invalid review comment.
include/fluent-bit/flb_blob_db.h (5)
20-21
: LGTM!Header guard is properly defined using standard naming convention.
87-97
: LGTM! Well-designed query with proper concurrency handling.The query correctly ensures that aborted files are only selected when no parts are currently in progress, preventing race conditions during multipart uploads.
144-170
: LGTM! Excellent query design for ordered part processing.The query correctly enforces that part 0 must be uploaded before other parts can be processed, which is a good design for ensuring metadata or header information is uploaded first.
294-335
: LGTM! Well-structured database context.The
flb_blob_db
struct properly encapsulates all database operations with prepared statements and thread safety via the global lock. The conditional compilation for SQLite types is handled correctly.
337-451
: LGTM! Comprehensive and well-designed API.The function declarations provide a complete interface for blob database operations with consistent naming conventions and appropriate parameter types. The API properly handles thread safety through lock/unlock functions.
plugins/out_s3/s3.c (1)
1889-1987
: Fix memory leaks in URL parsing error pathsThe function has memory leaks when URL parsing fails. The
scheme
andport
variables are not freed in all error paths.static int blob_initialize_authorization_endpoint_upstream(struct flb_s3 *context) { int ret; struct flb_upstream *upstream; struct flb_tls *tls_context; char *scheme = NULL; char *host = NULL; char *port = NULL; char *uri = NULL; int upstream_flags; context->authorization_endpoint_upstream = NULL; context->authorization_endpoint_tls_context = NULL; /* Parse and split URL */ ret = flb_utils_url_split(context->authorization_endpoint_url, &scheme, &host, &port, &uri); if (ret == -1) { flb_plg_error(context->ins, "Invalid URL: %s", context->authorization_endpoint_url); return -1; } - if (scheme != NULL) { - flb_free(scheme); - - scheme = NULL; - } - - if (port != NULL) { - flb_free(port); - - port = NULL; - } - if (host == NULL || uri == NULL) { flb_plg_error(context->ins, "Invalid URL: %s", context->authorization_endpoint_url); + if (scheme != NULL) { + flb_free(scheme); + } + if (port != NULL) { + flb_free(port); + } if (host != NULL) { flb_free(host); } - if (uri != NULL) { flb_free(uri); } return -2; } + + if (scheme != NULL) { + flb_free(scheme); + scheme = NULL; + } + + if (port != NULL) { + flb_free(port); + port = NULL; + }Likely an incorrect or invalid review comment.
#define SQL_CREATE_BLOB_PARTS \ | ||
"CREATE TABLE IF NOT EXISTS blob_parts (" \ | ||
" id INTEGER PRIMARY KEY," \ | ||
" file_id INTEGER NOT NULL," \ | ||
" part_id INTEGER NOT NULL," \ | ||
" remote_id TEXT NOT NULL DEFAULT ''," \ | ||
" uploaded INTEGER DEFAULT 0," \ | ||
" in_progress INTEGER DEFAULT 0," \ | ||
" offset_start INTEGER," \ | ||
" offset_end INTEGER," \ | ||
" delivery_attempts INTEGER DEFAULT 0," \ | ||
" FOREIGN KEY (file_id) REFERENCES blob_files(id) " \ | ||
" ON DELETE CASCADE" \ | ||
");" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add uniqueness constraint and indexes for blob_parts table.
The blob_parts
table should have a uniqueness constraint on the combination of file_id
and part_id
to prevent duplicate parts. Also, add indexes based on the query patterns.
Modify the table definition:
#define SQL_CREATE_BLOB_PARTS \
"CREATE TABLE IF NOT EXISTS blob_parts (" \
" id INTEGER PRIMARY KEY," \
" file_id INTEGER NOT NULL," \
" part_id INTEGER NOT NULL," \
" remote_id TEXT NOT NULL DEFAULT ''," \
" uploaded INTEGER DEFAULT 0," \
" in_progress INTEGER DEFAULT 0," \
" offset_start INTEGER," \
" offset_end INTEGER," \
" delivery_attempts INTEGER DEFAULT 0," \
+ " UNIQUE(file_id, part_id)," \
" FOREIGN KEY (file_id) REFERENCES blob_files(id) " \
" ON DELETE CASCADE" \
");"
And add these indexes:
CREATE INDEX IF NOT EXISTS idx_blob_parts_file_id ON blob_parts(file_id);
CREATE INDEX IF NOT EXISTS idx_blob_parts_upload_state ON blob_parts(uploaded, in_progress);
🤖 Prompt for AI Agents
In include/fluent-bit/flb_blob_db.h lines 42 to 55, add a uniqueness constraint
on the combination of file_id and part_id columns in the blob_parts table
definition to prevent duplicate entries. Additionally, after the table creation,
add two indexes: one on file_id and another composite index on uploaded and
in_progress columns to optimize query performance based on these fields.
flb_plg_info(ctx->ins, "Successfully completed multipart upload " | ||
"for %s, UploadId=%s", m_upload->s3_key, | ||
m_upload->upload_id); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Incorrect success message in abort function
The success message says "Successfully completed multipart upload" but this is the abort function, not complete.
- flb_plg_info(ctx->ins, "Successfully completed multipart upload "
+ flb_plg_info(ctx->ins, "Successfully aborted multipart upload "
"for %s, UploadId=%s", m_upload->s3_key,
m_upload->upload_id);
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
flb_plg_info(ctx->ins, "Successfully completed multipart upload " | |
"for %s, UploadId=%s", m_upload->s3_key, | |
m_upload->upload_id); | |
flb_plg_info(ctx->ins, "Successfully aborted multipart upload " | |
"for %s, UploadId=%s", m_upload->s3_key, | |
m_upload->upload_id); |
🤖 Prompt for AI Agents
In plugins/out_s3/s3_multipart.c around lines 537 to 539, the log message
incorrectly states "Successfully completed multipart upload" in the abort
function. Update the message to accurately reflect the abort action, such as
"Successfully aborted multipart upload" to correctly indicate the operation
performed.
#ifdef FLB_SYSTEM_WINDOWS | ||
static int setenv(const char *name, const char *value, int overwrite) | ||
{ | ||
return SetEnvironmentVariableA(name, value); | ||
} | ||
#endif |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix Windows setenv implementation
The Windows setenv
implementation has several issues:
- Ignores the
overwrite
parameter - Incorrect return value handling (SetEnvironmentVariableA returns BOOL, not int)
- No error handling
static int setenv(const char *name, const char *value, int overwrite)
{
- return SetEnvironmentVariableA(name, value);
+ if (!overwrite && GetEnvironmentVariableA(name, NULL, 0) > 0) {
+ return 0;
+ }
+ if (SetEnvironmentVariableA(name, value)) {
+ return 0;
+ }
+ return -1;
}
🤖 Prompt for AI Agents
In plugins/out_s3/s3.c around lines 56 to 61, the Windows setenv function
incorrectly ignores the overwrite parameter, does not handle the BOOL return
type properly, and lacks error handling. Update the function to check if the
environment variable exists when overwrite is zero, return 0 on success and -1
on failure, and use GetLastError or equivalent to handle errors appropriately.
struct worker_info *info; | ||
struct flb_s3 *ctx = data; | ||
|
||
flb_plg_info(ctx->ins, "initializing worker"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix incorrect log message in worker exit callback
The log message says "initializing worker" but this is the exit/cleanup function.
- flb_plg_info(ctx->ins, "initializing worker");
+ flb_plg_info(ctx->ins, "cleaning up worker");
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
flb_plg_info(ctx->ins, "initializing worker"); | |
flb_plg_info(ctx->ins, "cleaning up worker"); |
🤖 Prompt for AI Agents
In plugins/out_s3/s3.c at line 1086, the log message incorrectly states
"initializing worker" in the worker exit or cleanup callback. Change the log
message to accurately reflect the function's purpose, such as "exiting worker"
or "cleaning up worker," to correctly indicate that this is the worker exit
callback.
if (port != NULL) { | ||
port_as_short = (uint16_t) strtoul(port, NULL, 10); | ||
} | ||
else { | ||
if (scheme != NULL) { | ||
if (strcasecmp(scheme, "https") == 0) { | ||
port_as_short = 443; | ||
} | ||
else { | ||
port_as_short = 80; | ||
} | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix uninitialized variable usage
The variable port_as_short
may be used uninitialized if both port
and scheme
are NULL.
uint16_t port_as_short;
flb_sds_t tmp;
+ port_as_short = 80; // Default value
+
/* Parse and split URL */
ret = flb_utils_url_split(url,
&scheme, &host, &port, &uri);
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
if (port != NULL) { | |
port_as_short = (uint16_t) strtoul(port, NULL, 10); | |
} | |
else { | |
if (scheme != NULL) { | |
if (strcasecmp(scheme, "https") == 0) { | |
port_as_short = 443; | |
} | |
else { | |
port_as_short = 80; | |
} | |
} | |
} | |
uint16_t port_as_short; | |
flb_sds_t tmp; | |
/* Default port in case both port and scheme are NULL */ | |
port_as_short = 80; | |
/* Parse and split URL */ | |
ret = flb_utils_url_split(url, | |
&scheme, &host, &port, &uri); | |
if (port != NULL) { | |
port_as_short = (uint16_t) strtoul(port, NULL, 10); | |
} | |
else { | |
if (scheme != NULL) { | |
if (strcasecmp(scheme, "https") == 0) { | |
port_as_short = 443; | |
} | |
else { | |
port_as_short = 80; | |
} | |
} | |
} |
🤖 Prompt for AI Agents
In plugins/out_s3/s3.c around lines 2015 to 2028, the variable port_as_short can
be used uninitialized if both port and scheme are NULL. To fix this, initialize
port_as_short to a default value (e.g., 0) before the conditional checks, or add
an else clause that assigns a safe default value when both port and scheme are
NULL to ensure port_as_short is always initialized before use.
while (1) { | ||
ret = flb_blob_db_file_get_next_stale(&ctx->blob_db, | ||
&file_id, | ||
&file_path, | ||
ctx->upload_parts_freshness_threshold, | ||
&file_remote_id, | ||
&file_tag, | ||
&part_count); | ||
|
||
if (ret == 1) { | ||
if (part_count > 1) { | ||
ret = abort_blob_upload(ctx, file_tag, file_path, file_remote_id); | ||
|
||
if (ret != 0) { | ||
cfl_sds_destroy(file_tag); | ||
cfl_sds_destroy(file_path); | ||
cfl_sds_destroy(file_remote_id); | ||
cfl_sds_destroy(file_destination); | ||
|
||
flb_blob_db_unlock(&ctx->blob_db); | ||
|
||
return -1; | ||
} | ||
} | ||
|
||
flb_blob_file_update_remote_id(&ctx->blob_db, file_id, ""); | ||
flb_blob_db_file_reset_upload_states(&ctx->blob_db, file_id); | ||
flb_blob_db_file_set_aborted_state(&ctx->blob_db, file_id, 0); | ||
|
||
cfl_sds_destroy(file_remote_id); | ||
cfl_sds_destroy(file_path); | ||
cfl_sds_destroy(source); | ||
|
||
file_remote_id = NULL; | ||
file_path = NULL; | ||
source = NULL; | ||
} | ||
else { | ||
break; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add iteration limits to prevent infinite loops
The while loops for processing stale files could run indefinitely if the database keeps returning files. Add iteration limits for safety.
+ int max_iterations = 1000;
+ int iterations = 0;
+
- while (1) {
+ while (iterations < max_iterations) {
+ iterations++;
ret = flb_blob_db_file_get_next_stale(&ctx->blob_db,
&file_id,
&file_path,
ctx->upload_parts_freshness_threshold,
&file_remote_id,
&file_tag,
&part_count);
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
while (1) { | |
ret = flb_blob_db_file_get_next_stale(&ctx->blob_db, | |
&file_id, | |
&file_path, | |
ctx->upload_parts_freshness_threshold, | |
&file_remote_id, | |
&file_tag, | |
&part_count); | |
if (ret == 1) { | |
if (part_count > 1) { | |
ret = abort_blob_upload(ctx, file_tag, file_path, file_remote_id); | |
if (ret != 0) { | |
cfl_sds_destroy(file_tag); | |
cfl_sds_destroy(file_path); | |
cfl_sds_destroy(file_remote_id); | |
cfl_sds_destroy(file_destination); | |
flb_blob_db_unlock(&ctx->blob_db); | |
return -1; | |
} | |
} | |
flb_blob_file_update_remote_id(&ctx->blob_db, file_id, ""); | |
flb_blob_db_file_reset_upload_states(&ctx->blob_db, file_id); | |
flb_blob_db_file_set_aborted_state(&ctx->blob_db, file_id, 0); | |
cfl_sds_destroy(file_remote_id); | |
cfl_sds_destroy(file_path); | |
cfl_sds_destroy(source); | |
file_remote_id = NULL; | |
file_path = NULL; | |
source = NULL; | |
} | |
else { | |
break; | |
} | |
} | |
int max_iterations = 1000; | |
int iterations = 0; | |
while (iterations < max_iterations) { | |
iterations++; | |
ret = flb_blob_db_file_get_next_stale(&ctx->blob_db, | |
&file_id, | |
&file_path, | |
ctx->upload_parts_freshness_threshold, | |
&file_remote_id, | |
&file_tag, | |
&part_count); | |
if (ret == 1) { | |
if (part_count > 1) { | |
ret = abort_blob_upload(ctx, file_tag, file_path, file_remote_id); | |
if (ret != 0) { | |
cfl_sds_destroy(file_tag); | |
cfl_sds_destroy(file_path); | |
cfl_sds_destroy(file_remote_id); | |
cfl_sds_destroy(file_destination); | |
flb_blob_db_unlock(&ctx->blob_db); | |
return -1; | |
} | |
} | |
flb_blob_file_update_remote_id(&ctx->blob_db, file_id, ""); | |
flb_blob_db_file_reset_upload_states(&ctx->blob_db, file_id); | |
flb_blob_db_file_set_aborted_state(&ctx->blob_db, file_id, 0); | |
cfl_sds_destroy(file_remote_id); | |
cfl_sds_destroy(file_path); | |
cfl_sds_destroy(source); | |
file_remote_id = NULL; | |
file_path = NULL; | |
source = NULL; | |
} | |
else { | |
break; | |
} | |
} |
🤖 Prompt for AI Agents
In plugins/out_s3/s3.c around lines 2640 to 2680, the while(1) loop processing
stale files can run indefinitely if the database continuously returns files. To
fix this, add a maximum iteration count limit to the loop and break out once the
limit is reached to prevent infinite looping and potential resource exhaustion.
while (*valid_blob_path == '.' || | ||
*valid_blob_path == '/') { | ||
valid_blob_path++; | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Path traversal vulnerability in blob path handling
The blob path sanitization only removes leading dots and slashes, but doesn't prevent path traversal attacks using sequences like ../../../etc/passwd
embedded within the path.
Consider implementing comprehensive path validation:
valid_blob_path = (char *) blob_path;
while (*valid_blob_path == '.' ||
*valid_blob_path == '/') {
valid_blob_path++;
}
+
+/* Check for path traversal sequences */
+if (strstr(valid_blob_path, "../") != NULL ||
+ strstr(valid_blob_path, "..\\") != NULL) {
+ flb_warn("[s3_key] Invalid blob path: contains path traversal sequence");
+ goto error;
+}
🤖 Prompt for AI Agents
In src/aws/flb_aws_util.c around lines 939 to 943, the current code only strips
leading dots and slashes from the blob path but does not prevent path traversal
sequences like ../../../ embedded within the path. To fix this, implement
comprehensive path validation that scans the entire path string to detect and
reject any path traversal patterns such as ".." segments or absolute path
indicators. Ensure the validation logic sanitizes or rejects paths containing
these sequences before further processing.
random_alphanumeric = flb_sts_session_name(); | ||
if (!random_alphanumeric) { | ||
goto error; | ||
} | ||
/* only use 8 chars of the random string */ | ||
random_alphanumeric[8] = '\0'; | ||
tmp_key = replace_uri_tokens(s3_key, RANDOM_STRING, random_alphanumeric); | ||
if (!tmp_key) { | ||
flb_free(random_alphanumeric); | ||
goto error; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential buffer overflow with random string manipulation
The code modifies the random string by setting index 8 to null terminator, but doesn't verify that the allocated string is at least 9 characters long.
Add a length check before modifying the string:
random_alphanumeric = flb_sts_session_name();
if (!random_alphanumeric) {
goto error;
}
+/* Ensure the random string is long enough */
+if (strlen(random_alphanumeric) < 8) {
+ flb_free(random_alphanumeric);
+ goto error;
+}
/* only use 8 chars of the random string */
random_alphanumeric[8] = '\0';
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
random_alphanumeric = flb_sts_session_name(); | |
if (!random_alphanumeric) { | |
goto error; | |
} | |
/* only use 8 chars of the random string */ | |
random_alphanumeric[8] = '\0'; | |
tmp_key = replace_uri_tokens(s3_key, RANDOM_STRING, random_alphanumeric); | |
if (!tmp_key) { | |
flb_free(random_alphanumeric); | |
goto error; | |
} | |
random_alphanumeric = flb_sts_session_name(); | |
if (!random_alphanumeric) { | |
goto error; | |
} | |
/* Ensure the random string is long enough */ | |
if (strlen(random_alphanumeric) < 8) { | |
flb_free(random_alphanumeric); | |
goto error; | |
} | |
/* only use 8 chars of the random string */ | |
random_alphanumeric[8] = '\0'; | |
tmp_key = replace_uri_tokens(s3_key, RANDOM_STRING, random_alphanumeric); | |
if (!tmp_key) { | |
flb_free(random_alphanumeric); | |
goto error; | |
} |
🤖 Prompt for AI Agents
In src/aws/flb_aws_util.c around lines 958 to 968, the code sets
random_alphanumeric[8] = '\0' without verifying the string length, risking
buffer overflow. Add a check to ensure the length of random_alphanumeric is at
least 9 characters before setting the null terminator at index 8. If the length
is insufficient, handle the error appropriately to avoid unsafe memory access.
result = flb_sqldb_query(db, SQL_CREATE_BLOB_FILES, NULL, NULL); | ||
|
||
if (result != FLB_OK) { | ||
flb_sqldb_close(db); | ||
|
||
return FLB_BLOB_DB_ERROR_FILE_TABLE_CREATION; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
SQL foreign key constraint enablement
The code enables foreign keys after creating tables. If the tables already exist from a previous run, foreign keys might not be enforced for existing data.
Consider checking if foreign keys are already enabled:
🏁 Script executed:
#!/bin/bash
# Check if there's a way to verify foreign key state in fluent-bit codebase
rg -A 5 "PRAGMA foreign_keys" --type c
Length of output: 1205
🏁 Script executed:
#!/bin/bash
set -e
echo "1) Looking for PRAGMA invocation via macro in flb_blob_db.c"
rg -n "SQL_PRAGMA_FOREIGN_KEYS" -A5 src/flb_blob_db.c || echo "No occurrences of SQL_PRAGMA_FOREIGN_KEYS"
echo
echo "2) Looking for any direct PRAGMA foreign_keys usage in flb_blob_db.c"
rg -n "PRAGMA foreign_keys" -n src/flb_blob_db.c || echo "No direct PRAGMA foreign_keys calls"
echo
echo "3) Inspecting table creation sequence"
rg -n "SQL_CREATE_BLOB_FILES" -A2 src/flb_blob_db.c
rg -n "SQL_CREATE_BLOB_PARTS" -A2 src/flb_blob_db.c
Length of output: 1105
🏁 Script executed:
#!/bin/bash
set -e
echo "Displaying lines 230-300 of src/flb_blob_db.c for context"
sed -n '230,300p' src/flb_blob_db.c
Length of output: 1644
Move PRAGMA foreign_keys = ON
before creating tables
To ensure foreign-key enforcement is active for all subsequent operations (including any DDL that may depend on it) invoke the PRAGMA immediately after opening the database, not only after creating the tables.
• File: src/flb_blob_db.c
In flb_blob_db_open
, call:
result = flb_sqldb_query(db, SQL_PRAGMA_FOREIGN_KEYS, NULL, NULL);
if (result != FLB_OK) {
flb_sqldb_close(db);
return FLB_BLOB_DB_ERROR_SQLDB_FK_INIT_FAILURE;
}
immediately after flb_sqldb_open(...)
and before any SQL_CREATE_*
statements.
• Then proceed with:
result = flb_sqldb_query(db, SQL_CREATE_BLOB_FILES, NULL, NULL);
…
result = flb_sqldb_query(db, SQL_CREATE_BLOB_PARTS, NULL, NULL);
This guarantees foreign-key enforcement is enabled for all table creation and later DML.
🤖 Prompt for AI Agents
In src/flb_blob_db.c around lines 243 to 249, move the execution of the PRAGMA
foreign_keys = ON statement to immediately after opening the database with
flb_sqldb_open and before any table creation queries. Add a call to
flb_sqldb_query with SQL_PRAGMA_FOREIGN_KEYS right after opening the DB, check
for errors, and if it fails, close the DB and return
FLB_BLOB_DB_ERROR_SQLDB_FK_INIT_FAILURE. Then continue with the existing table
creation queries to ensure foreign key enforcement is active for all subsequent
operations.
int flb_sqldb_lock(struct flb_sqldb *db) | ||
{ | ||
return flb_lock_acquire(&db->lock, | ||
FLB_LOCK_INFINITE_RETRY_LIMIT, | ||
FLB_LOCK_DEFAULT_RETRY_DELAY); | ||
} | ||
|
||
int flb_sqldb_unlock(struct flb_sqldb *db) | ||
{ | ||
return flb_lock_release(&db->lock, | ||
FLB_LOCK_INFINITE_RETRY_LIMIT, | ||
FLB_LOCK_DEFAULT_RETRY_DELAY); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Consider adding NULL checks in lock/unlock functions
The lock/unlock functions don't validate the db
parameter. Adding NULL checks would make these functions more robust against programming errors.
int flb_sqldb_lock(struct flb_sqldb *db)
{
+ if (!db) {
+ return -1;
+ }
return flb_lock_acquire(&db->lock,
FLB_LOCK_INFINITE_RETRY_LIMIT,
FLB_LOCK_DEFAULT_RETRY_DELAY);
}
int flb_sqldb_unlock(struct flb_sqldb *db)
{
+ if (!db) {
+ return -1;
+ }
return flb_lock_release(&db->lock,
FLB_LOCK_INFINITE_RETRY_LIMIT,
FLB_LOCK_DEFAULT_RETRY_DELAY);
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
int flb_sqldb_lock(struct flb_sqldb *db) | |
{ | |
return flb_lock_acquire(&db->lock, | |
FLB_LOCK_INFINITE_RETRY_LIMIT, | |
FLB_LOCK_DEFAULT_RETRY_DELAY); | |
} | |
int flb_sqldb_unlock(struct flb_sqldb *db) | |
{ | |
return flb_lock_release(&db->lock, | |
FLB_LOCK_INFINITE_RETRY_LIMIT, | |
FLB_LOCK_DEFAULT_RETRY_DELAY); | |
} | |
int flb_sqldb_lock(struct flb_sqldb *db) | |
{ | |
if (!db) { | |
return -1; | |
} | |
return flb_lock_acquire(&db->lock, | |
FLB_LOCK_INFINITE_RETRY_LIMIT, | |
FLB_LOCK_DEFAULT_RETRY_DELAY); | |
} | |
int flb_sqldb_unlock(struct flb_sqldb *db) | |
{ | |
if (!db) { | |
return -1; | |
} | |
return flb_lock_release(&db->lock, | |
FLB_LOCK_INFINITE_RETRY_LIMIT, | |
FLB_LOCK_DEFAULT_RETRY_DELAY); | |
} |
🤖 Prompt for AI Agents
In src/flb_sqldb.c around lines 173 to 185, the flb_sqldb_lock and
flb_sqldb_unlock functions do not check if the db pointer is NULL before
accessing its lock member. Add NULL checks at the start of both functions to
return an error or failure code if db is NULL, preventing potential segmentation
faults and improving robustness.
This is a rebased version of PR #9907
Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.
Summary by CodeRabbit
New Features
Improvements
Bug Fixes
Chores