Skip to content

filter_kubernetes: add entity attribute retrieval logics #10586

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions plugins/filter_kubernetes/kube_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,11 @@ struct flb_kube *flb_kube_conf_create(struct flb_filter_instance *ins,
flb_plg_info(ctx->ins, "https=%i host=%s port=%i",
ctx->api_https, ctx->api_host, ctx->api_port);
}

ctx->pod_hash_table = flb_hash_table_create_with_ttl(ctx->pod_service_map_ttl,
FLB_HASH_TABLE_EVICT_OLDER,
FLB_HASH_TABLE_SIZE,
FLB_HASH_TABLE_SIZE);
return ctx;
}

Expand All @@ -206,6 +211,10 @@ void flb_kube_conf_destroy(struct flb_kube *ctx)
flb_hash_table_destroy(ctx->namespace_hash_table);
}

if (ctx->pod_hash_table) {
flb_hash_table_destroy(ctx->pod_hash_table);
}

if (ctx->merge_log == FLB_TRUE) {
flb_free(ctx->unesc_buf);
}
Expand All @@ -214,6 +223,9 @@ void flb_kube_conf_destroy(struct flb_kube *ctx)
if (ctx->parser == NULL && ctx->regex) {
flb_regex_destroy(ctx->regex);
}
if (ctx->deploymentRegex) {
flb_regex_destroy(ctx->deploymentRegex);
}

flb_free(ctx->api_host);
flb_free(ctx->token);
Expand All @@ -228,6 +240,18 @@ void flb_kube_conf_destroy(struct flb_kube *ctx)
flb_upstream_destroy(ctx->kube_api_upstream);
}

if(ctx->pod_association_tls) {
flb_tls_destroy(ctx->pod_association_tls);
}

if (ctx->pod_association_upstream) {
flb_upstream_destroy(ctx->pod_association_upstream);
}

if (ctx->platform) {
flb_free(ctx->platform);
}

#ifdef FLB_HAVE_TLS
if (ctx->tls) {
flb_tls_destroy(ctx->tls);
Expand Down
68 changes: 68 additions & 0 deletions plugins/filter_kubernetes/kube_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,40 @@
#define FLB_KUBE_TAG_PREFIX "kube.var.log.containers."
#endif

/*
* Maximum attribute length for Entity's KeyAttributes
* values
* https://docs.aws.amazon.com/applicationsignals/latest/APIReference/API_Service.html#:~:text=Maximum%20length%20of%201024.
*/
#define KEY_ATTRIBUTES_MAX_LEN 1024
#define SERVICE_NAME_SOURCE_MAX_LEN 64

/*
* Configmap used for verifying whether if FluentBit is
* on EKS or native Kubernetes
*/
#define KUBE_SYSTEM_NAMESPACE "kube-system"
#define AWS_AUTH_CONFIG_MAP "aws-auth"

/*
* Possible platform values for Kubernetes plugin
*/
#define NATIVE_KUBERNETES_PLATFORM "k8s"
#define EKS_PLATFORM "eks"

struct kube_meta;

struct service_attributes {
char name[KEY_ATTRIBUTES_MAX_LEN];
int name_len;
char environment[KEY_ATTRIBUTES_MAX_LEN];
int environment_len;
char name_source[SERVICE_NAME_SOURCE_MAX_LEN];
int name_source_len;
int fields;

};

/* Filter context */
struct flb_kube {
/* Configuration parameters */
Expand Down Expand Up @@ -124,6 +156,7 @@ struct flb_kube {

/* Regex context to parse records */
struct flb_regex *regex;
struct flb_regex *deploymentRegex;
struct flb_parser *parser;

/* TLS CA certificate file */
Expand Down Expand Up @@ -165,6 +198,41 @@ struct flb_kube {
int kube_meta_cache_ttl;
int kube_meta_namespace_cache_ttl;

/* Configuration used for enabling pod to service name mapping*/
int use_pod_association;
char *pod_association_host;
char *pod_association_endpoint;
int pod_association_port;

/*
* TTL is used to check how long should the mapped entry
* remain in the hash table
*/
struct flb_hash_table *pod_hash_table;
int pod_service_map_ttl;
int pod_service_map_refresh_interval;
flb_sds_t pod_service_preload_cache_path;
struct flb_upstream *pod_association_upstream;
/*
* This variable holds the Kubernetes platform type
* Current checks for EKS or Native Kuberentes
*/
char *platform;
/*
* This value is used for holding the platform config
* value. Platform will be overriden with this variable
* if it's set
*/
char *set_platform;

Comment on lines +201 to +227
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Verify thread safety for pod_hash_table access

The pod_hash_table field will likely be accessed from multiple threads (main thread and background fetch thread mentioned in the PR summary). Ensure proper synchronization mechanisms are in place.


🏁 Script executed:

#!/bin/bash
# Search for mutex/lock usage around pod_hash_table access
rg -A 5 -B 5 "pod_hash_table" plugins/filter_kubernetes/

Length of output: 5956


Synchronize pod_hash_table reads in kube_meta.c

While parse_pod_service_map() properly guards writes to ctx->pod_hash_table with pthread_mutex_lock(&metadata_mutex) / pthread_mutex_unlock(&metadata_mutex), the lookups in kube_meta.c occur without any locking. This introduces a data‐race between the background fetch thread (writes) and the pipeline threads (reads).

Please wrap each flb_hash_table_get(ctx->pod_hash_table, …) in plugins/filter_kubernetes/kube_meta.c with the same metadata_mutex lock/unlock pair. For example:

- pod_service_found = flb_hash_table_get(ctx->pod_hash_table,
-                                      meta->podname, meta->podname_len,
-                                      &tmp_service_attributes,
-                                      &tmp_service_attr_size);
+ pthread_mutex_lock(&metadata_mutex);
+ pod_service_found = flb_hash_table_get(ctx->pod_hash_table,
+                                      meta->podname, meta->podname_len,
+                                      &tmp_service_attributes,
+                                      &tmp_service_attr_size);
+ pthread_mutex_unlock(&metadata_mutex);

Locations to update:

  • plugins/filter_kubernetes/kube_meta.c around the two calls to flb_hash_table_get(...) where pod_service_found is assigned.

This change will ensure safe concurrent access to the shared hash table.

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In plugins/filter_kubernetes/kube_meta.c around the lines where
pod_service_found is assigned using flb_hash_table_get, the reads from
ctx->pod_hash_table are not protected by the metadata_mutex, causing a data race
with writes in parse_pod_service_map. To fix this, wrap each flb_hash_table_get
call with pthread_mutex_lock(&metadata_mutex) before the call and
pthread_mutex_unlock(&metadata_mutex) immediately after, ensuring thread-safe
concurrent access to the shared pod_hash_table.

//Agent TLS certs
struct flb_tls *pod_association_tls;
char *pod_association_host_server_ca_file;
char *pod_association_host_client_cert_file;
char *pod_association_host_client_key_file;
int pod_association_host_tls_debug;
int pod_association_host_tls_verify;

struct flb_tls *tls;
struct flb_tls *kubelet_tls;

Expand Down
Loading
Loading