Skip to content

out_s3: added blob handling feature #9907

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 19 commits into
base: master
Choose a base branch
from

Conversation

leonardo-albertovich
Copy link
Collaborator

No description provided.

Signed-off-by: Leonardo Alminana <[email protected]>
Signed-off-by: Leonardo Alminana <[email protected]>
Copy link

@swapneils swapneils left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some comments that I'd prefer to clarify and/or change before merging this.

Curious why the ubuntu unit tests are failing, as well.


/* A match against "$TAG[" indicates an invalid or out of bounds tag part. */
if (strstr(s3_key, tmp)){
flb_warn("[s3_key] Invalid / Out of bounds tag part: At most 10 tag parts "

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Is going above 10 tag parts the most likely cause of this issue? The "first part" of the error message implies that tags in the wrong place also trigger this check, in which case the "second part" should mention that in more detail as well to avoid confusing operators.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part of the code is faithful to the original, I'd raise that point to the code owner who would be much mor qualified to address it than myself.


valid_blob_path = (char *) blob_path;

while (*valid_blob_path == '.' ||

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this value ever use the ../ format? If so the semantics here would ignore that motion, which is incorrect.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, ignoring that part would be correct, regardless of it, what we want is the internal structure so ./a/b/c/d/file.txt would be the same as ../../../../a/b/c/d/file.txt becase what we want is a/b/c/d/file.txt

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Do we have a way to deal with duplicates in this case, e.g. ./logs/1.txt and ../../logs/1.txt both being sent to the same bucket? With the current implementation it seems like these would overwrite each other, which is unintuitive since from the user side these are two different filepaths.

Alternatively, can we gate this filtering behind a blob-specific flag of some sort? (Assuming the above concern is valid, that is, we shouldn't be introducing flags unless they provide customer benefit.)
I think it's probably reasonable for the current behavior to be the default, to minimize the chance of too-long S3 keys, but users should be able to see this in documentation and opt-out if their specific filenames are incompatible with the assumption that the internal directory structure is a unique key.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's been a misunderstanding here, firstly, this is already gated as it's in flb_get_s3_blob_key which is only used for blobs but also, there should be no way for a blob to have a path that includes ../.

struct multipart_upload *m_upload,
char *pre_signed_url);

int abort_multipart_upload(struct flb_s3 *ctx,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we creating this function? I see we're using it to abort multipart blob uploads in s3.c, but why is that needed here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I'm following you, it's there because it's a prototype, could you please clarify the point?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nvm, I see from here that we set this status to 1 in sqlite on exit or retry exhaustion, and given it's only called within the upload function itself there shouldn't be issues with race conditions making us retry when we shouldn't.


int flb_blob_db_lock(struct flb_blob_db *context)
{
return flb_lock_acquire(&context->global_lock,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I understand every blob publish operation is dependent on manipulating the same sqlite table. Do we know how this affects performance when parallelizing publishes? Is there a way to get a narrower lock here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to narrow the scope of the locks as much as possible, as it is the only parts that lock are the ones that hold the lock are the ones that query or modify the database, there are very few external calls in that part of the code (aborting an upload, obtaining the pre-signed url, committing an upload) and to be honest I'd rather err on the safe side than risk introducing a bug in this case.


int flb_blob_db_unlock(struct flb_blob_db *context)
{
return flb_lock_release(&context->global_lock,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: out of curiosity, why are we using global_lock here but db_lock in azure_blob_db.c?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

azure_blob_db.c was the first version of this code which in turn was derived from another piece of code. When I took over this implementation I decided to abstract the blob database management into a globally available component to prevent future code duplication.

Back then the plan was to refactor azure_blob to use this component but that slipped through the cracks.


s3_client = ctx->s3_client;
if (s3_plugin_under_test() == FLB_TRUE) {
/* c = mock_s3_call("TEST_ABORT_MULTIPART_UPLOAD_ERROR", "AbortMultipartUpload"); */

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Do we need this line?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we do because it's key to the testing system, however, I left it commented because mock_s3_call does not have a code path for AbortMultipartUpload which I meant to add but forgot about.

I'll add that and uncomment the line.

time_t upload_parts_freshness_threshold;
int file_delivery_attempt_limit;
int part_delivery_attempt_limit;
flb_sds_t authorization_endpoint_url;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How are we using this authorization endpoint? Isn't the process of publishing binary data to S3 equivalent to string data, +/- the declared body type in the HTTP requests?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The authorization endpoint is an internal requirement, @edsiper might be able to explain it better. It's basically a service that provides pre signed URLs.

sched = flb_sched_ctx_get();

/* convert from seconds to milliseconds (scheduler needs ms) */
ms = ctx->upload_parts_timeout * 1000;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the correct semantics?

From the description of upload_parts_timeout below it seems like a maximum age to retry blob publishing, past which we will drop a blob instead of retrying.
As I understand the code, here we're instead inserting blob parts into the database and then attempting to publish them every upload_parts_timeout seconds, dropping them on the first failure.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a naming error, the maximum age is dictated by upload_part_freshness_limit.

As for the dropping part, that's dictated by file_delivery_attempt_limit and part_delivery_attempt_limit which default to 1.

@@ -732,6 +732,203 @@ char* strtok_concurrent(
#endif
}

/* Constructs S3 object key as per the blob format. */
flb_sds_t flb_get_s3_blob_key(const char *format,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A majority of this is duplicated in flb_get_s3_key(...), does it make sense to try and introduce two helpers:

This could avoid possible divergence between the two methods over time

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a few key differences between the two and since I'm not the code owner I'm not comfortable making heavy changes to the original so I'd rather not.

char *tag,
char *source,
char *destination,
char *path,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are other instances where we are using cfl_sds_t in place of char*, should we be consistent and migrate all char* here to cfl_sds_t?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, those are output parameters, the blob db component always returns cfl_sds_t strings (which is not just a typedef for char *) but cannot force the client code to provide them for input parameters as all it needs are NULL terminated strings (otherwise we'd force client code to unnecessarily duplicate strings which would make the API very cumbersome and thus reduce its usage)


/* Fluent Bit
* ==========
* Copyright (C) 2015-2024 The Fluent Bit Authors
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the copyright block include our current year? 🫣

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch

}


/* file destination update */
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this comment include remote-id (similar to file part below)?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, copy & paste betrayed me

return FLB_BLOB_DB_ERROR_PREPARING_STATEMENT_GET_NEXT_FILE_PART;
}

result = sqlite3_prepare_v2(context->db->handler,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these next results contain a comment above (similar to prior prepared statement blocks)?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add them just to be consistent but TBH the constant name and text are almost the same, IIRC that's why I left the original comments but didn't add new ones.


static int flb_blob_db_file_reset_part_upload_states(struct flb_blob_db *context,
uint64_t id,
char *path)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to above comments on unused parameters

Copy link
Collaborator Author

@leonardo-albertovich leonardo-albertovich Apr 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, I'll fix it.


int flb_blob_db_file_reset_upload_states(struct flb_blob_db *context,
uint64_t id,
char *path)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to above comments on unused parameters

Copy link
Collaborator Author

@leonardo-albertovich leonardo-albertovich Apr 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, I'll fix it.

uint64_t part_id,
size_t offset_start,
size_t offset_end,
int64_t *out_id)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to above comments on unused parameters

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, I think this one is different than the others as it seems I forgot to set an output parameter, I'll fix it.


flb_blob_db_lock(&ctx->blob_db);

while (1) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I've addressed it, thank you.

return ret;
}

static int blob_fetch_multipart_complete_pre_signed_url(struct flb_s3 *context,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These fetch_*_pre_signed_url methods are near duplicates with minor changes to the tmp (path variable), can we consider using a helper method for getting the presigned url and have each method handle their own tmp/path generation to reduce code?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, I'll address it.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's a big lift, we can push it to a later tech-debt PR that is more about code-shrink than feature-build

@@ -334,7 +334,7 @@ static int complete_multipart_upload_payload(struct flb_s3 *ctx,
int offset = 0;
flb_sds_t etag;
size_t size = COMPLETE_MULTIPART_UPLOAD_BASE_LEN;
char part_num[7];
char part_num[11];
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we not limited to numbers 1..10000 per docs - https://docs.aws.amazon.com/AmazonS3/latest/API/API_UploadPart.html

Part numbers can be any number from 1 to 10,000, inclusive.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AWS might reject the request if the part number is higher than 10000 but there's an sprintf call which writes whichever 32 bit signed integer it gets in the part number field into that buffer and considering how memory alignment works I'm not worried about spending 8 more bytes if that means that if there is such a fringe case we get a nice error rather than a stack overflow.

Signed-off-by: Leonardo Alminana <[email protected]>
Signed-off-by: Leonardo Alminana <[email protected]>
@leonardo-albertovich
Copy link
Collaborator Author

Please let me know if there's anything I missed, thanks for taking the time to review this PR.

@edsiper
Copy link
Member

edsiper commented May 16, 2025

@leonardo-albertovich this is the last item to address: #9907 (comment)

@edsiper
Copy link
Member

edsiper commented Jun 27, 2025

PR looks good and I run some extra tests. However need to do a manual rebase since GH is not allowing it

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants