Skip to content

Commit 81f5647

Browse files
committed
parseable plugin for fluentbit
Signed-off-by: AdheipSingh <[email protected]>
1 parent 417d129 commit 81f5647

File tree

5 files changed

+374
-0
lines changed

5 files changed

+374
-0
lines changed

dockerfiles/Dockerfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ RUN cmake -DFLB_RELEASE=On \
7575
-DFLB_IN_SYSTEMD=On \
7676
-DFLB_OUT_KAFKA=On \
7777
-DFLB_OUT_PGSQL=On \
78+
-DFLB_OUT_PARSEABLE=On \
7879
-DFLB_NIGHTLY_BUILD="$FLB_NIGHTLY_BUILD" \
7980
-DFLB_LOG_NO_CONTROL_CHARS=On \
8081
-DFLB_CHUNK_TRACE="$FLB_CHUNK_TRACE" \

plugins/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,7 @@ REGISTER_OUT_PLUGIN("out_nrlogs")
316316
REGISTER_OUT_PLUGIN("out_null")
317317
REGISTER_OUT_PLUGIN("out_opensearch")
318318
REGISTER_OUT_PLUGIN("out_oracle_log_analytics")
319+
REGISTER_OUT_PLUGIN("out_parseable")
319320

320321
if (NOT CMAKE_SYSTEM_NAME MATCHES "Windows")
321322
REGISTER_OUT_PLUGIN("out_plot")

plugins/out_parseable/CMakeLists.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
set(src
2+
parseable.c)
3+
4+
FLB_PLUGIN(out_parseable "${src}" "")

plugins/out_parseable/parseable.c

Lines changed: 350 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,350 @@
1+
#include <fluent-bit/flb_output_plugin.h>
2+
#include <fluent-bit/flb_utils.h>
3+
#include <fluent-bit/flb_slist.h>
4+
#include <fluent-bit/flb_time.h>
5+
#include <fluent-bit/flb_pack.h>
6+
#include <fluent-bit/flb_config_map.h>
7+
#include <fluent-bit/flb_metrics.h>
8+
#include <fluent-bit/flb_log_event_decoder.h>
9+
#include <fluent-bit/flb_event.h>
10+
#include <fluent-bit/flb_record_accessor.h>
11+
12+
13+
#include "parseable.h"
14+
15+
static int cb_parseable_init(struct flb_output_instance *ins,
16+
struct flb_config *config, void *data)
17+
{
18+
int ret;
19+
struct flb_out_parseable *ctx = NULL;
20+
(void) ins;
21+
(void) config;
22+
(void) data;
23+
24+
ctx = flb_calloc(1, sizeof(struct flb_out_parseable));
25+
if (!ctx) {
26+
flb_errno();
27+
return -1;
28+
}
29+
ctx->ins = ins;
30+
31+
/* Read in config values */
32+
ret = flb_output_config_map_set(ins, (void *) ctx);
33+
if (ret == -1) {
34+
flb_free(ctx);
35+
return -1;
36+
}
37+
38+
flb_plg_info(ctx->ins, "Configured port: %d", ctx->server_port);
39+
40+
ctx->upstream = flb_upstream_create(config,
41+
ctx->server_host,
42+
ctx->server_port,
43+
FLB_IO_TCP,
44+
NULL);
45+
46+
if (!ctx->upstream) {
47+
flb_free(ctx);
48+
return -1;
49+
}
50+
51+
/* Export context */
52+
flb_output_set_context(ins, ctx);
53+
54+
return 0;
55+
}
56+
57+
/* Main flush callback */
58+
static void cb_parseable_flush(struct flb_event_chunk *event_chunk,
59+
struct flb_output_flush *out_flush,
60+
struct flb_input_instance *i_ins,
61+
void *out_context,
62+
struct flb_config *config)
63+
{
64+
struct flb_out_parseable *ctx = out_context;
65+
struct flb_log_event_decoder log_decoder;
66+
struct flb_log_event log_event;
67+
struct flb_record_accessor *ra = NULL;
68+
struct flb_record_accessor *ns_ra = NULL; // For checking namespace
69+
(void) config;
70+
struct flb_http_client *client;
71+
struct flb_connection *u_conn;
72+
flb_sds_t body;
73+
flb_sds_t x_p_stream_value = NULL;
74+
int ret;
75+
int i;
76+
size_t b_sent;
77+
msgpack_sbuffer sbuf;
78+
msgpack_packer pk;
79+
80+
/* Initialize event decoder */
81+
flb_plg_info(ctx->ins, "Initializing event decoder...");
82+
ret = flb_log_event_decoder_init(&log_decoder, (char *) event_chunk->data, event_chunk->size);
83+
if (ret != FLB_EVENT_DECODER_SUCCESS) {
84+
flb_plg_error(ctx->ins, "Failed to initialize event decoder");
85+
FLB_OUTPUT_RETURN(FLB_ERROR);
86+
}
87+
88+
/* Create record accessor if stream is set to $NAMESPACE */
89+
if (ctx->stream && strcmp(ctx->stream, "$NAMESPACE") == 0) {
90+
ra = flb_ra_create("$kubernetes['namespace_name']", FLB_TRUE);
91+
if (!ra) {
92+
flb_plg_error(ctx->ins, "Failed to create record accessor");
93+
flb_log_event_decoder_destroy(&log_decoder);
94+
FLB_OUTPUT_RETURN(FLB_ERROR);
95+
}
96+
}
97+
98+
/* Create record accessor for namespace exclusion check */
99+
if (ctx->exclude_namespaces) {
100+
ns_ra = flb_ra_create("$kubernetes['namespace_name']", FLB_TRUE);
101+
if (!ns_ra) {
102+
flb_plg_error(ctx->ins, "Failed to create namespace record accessor");
103+
if (ra) {
104+
flb_ra_destroy(ra);
105+
}
106+
flb_log_event_decoder_destroy(&log_decoder);
107+
FLB_OUTPUT_RETURN(FLB_ERROR);
108+
}
109+
}
110+
111+
/* Process each event */
112+
flb_plg_info(ctx->ins, "Processing events...");
113+
while (flb_log_event_decoder_next(&log_decoder, &log_event) == FLB_EVENT_DECODER_SUCCESS) {
114+
/* Check if namespace is in exclusion list */
115+
if (ns_ra && ctx->exclude_namespaces) {
116+
flb_sds_t current_ns = flb_ra_translate(ns_ra, NULL, -1, *log_event.body, NULL);
117+
if (current_ns) {
118+
struct cfl_list *head;
119+
struct flb_slist_entry *entry;
120+
int skip = 0;
121+
122+
cfl_list_foreach(head, ctx->exclude_namespaces) {
123+
entry = cfl_list_entry(head, struct flb_slist_entry, _head);
124+
if (strcmp(current_ns, entry->str) == 0) {
125+
flb_plg_debug(ctx->ins, "Skipping excluded namespace: %s", current_ns);
126+
skip = 1;
127+
break;
128+
}
129+
}
130+
131+
flb_sds_destroy(current_ns);
132+
if (skip) {
133+
continue;
134+
}
135+
}
136+
}
137+
138+
/* Initialize the packer and buffer */
139+
msgpack_sbuffer_init(&sbuf);
140+
msgpack_packer_init(&pk, &sbuf, msgpack_sbuffer_write);
141+
142+
/* Pack the map with one additional field */
143+
msgpack_pack_map(&pk, log_event.body->via.map.size + 1);
144+
145+
/* Pack original map content */
146+
for (i = 0; i < log_event.body->via.map.size; i++) {
147+
msgpack_pack_object(&pk, log_event.body->via.map.ptr[i].key);
148+
msgpack_pack_object(&pk, log_event.body->via.map.ptr[i].val);
149+
}
150+
151+
/* Add source field */
152+
msgpack_pack_str_with_body(&pk, "source", 6);
153+
msgpack_pack_str_with_body(&pk, "fluent bit parseable plugin", 25);
154+
155+
/* Convert to JSON */
156+
body = flb_msgpack_raw_to_json_sds(sbuf.data, sbuf.size);
157+
if (!body) {
158+
flb_plg_error(ctx->ins, "Failed to convert msgpack to JSON");
159+
msgpack_sbuffer_destroy(&sbuf);
160+
if (ra) {
161+
flb_ra_destroy(ra);
162+
}
163+
if (ns_ra) {
164+
flb_ra_destroy(ns_ra);
165+
}
166+
flb_log_event_decoder_destroy(&log_decoder);
167+
FLB_OUTPUT_RETURN(FLB_ERROR);
168+
}
169+
170+
/* Determine X-P-Stream value */
171+
if (ra) {
172+
/* Use record accessor to get namespace_name */
173+
flb_sds_t ns = flb_ra_translate(ra, NULL, -1, *log_event.body, NULL);
174+
if (!ns) {
175+
flb_plg_error(ctx->ins, "Failed to extract namespace_name using record accessor");
176+
flb_sds_destroy(body);
177+
msgpack_sbuffer_destroy(&sbuf);
178+
flb_ra_destroy(ra);
179+
if (ns_ra) {
180+
flb_ra_destroy(ns_ra);
181+
}
182+
flb_log_event_decoder_destroy(&log_decoder);
183+
FLB_OUTPUT_RETURN(FLB_ERROR);
184+
}
185+
x_p_stream_value = ns;
186+
}
187+
else if (ctx->stream) {
188+
x_p_stream_value = flb_sds_create(ctx->stream);
189+
if (!x_p_stream_value) {
190+
flb_plg_error(ctx->ins, "Failed to set X-P-Stream header");
191+
flb_sds_destroy(body);
192+
msgpack_sbuffer_destroy(&sbuf);
193+
if (ra) {
194+
flb_ra_destroy(ra);
195+
}
196+
if (ns_ra) {
197+
flb_ra_destroy(ns_ra);
198+
}
199+
flb_log_event_decoder_destroy(&log_decoder);
200+
FLB_OUTPUT_RETURN(FLB_ERROR);
201+
}
202+
}
203+
else {
204+
flb_plg_error(ctx->ins, "Stream is not set");
205+
flb_sds_destroy(body);
206+
msgpack_sbuffer_destroy(&sbuf);
207+
if (ra) {
208+
flb_ra_destroy(ra);
209+
}
210+
if (ns_ra) {
211+
flb_ra_destroy(ns_ra);
212+
}
213+
flb_log_event_decoder_destroy(&log_decoder);
214+
FLB_OUTPUT_RETURN(FLB_ERROR);
215+
}
216+
217+
/* Get upstream connection */
218+
u_conn = flb_upstream_conn_get(ctx->upstream);
219+
if (!u_conn) {
220+
flb_plg_error(ctx->ins, "Connection initialization error");
221+
flb_sds_destroy(body);
222+
flb_sds_destroy(x_p_stream_value);
223+
msgpack_sbuffer_destroy(&sbuf);
224+
if (ra) {
225+
flb_ra_destroy(ra);
226+
}
227+
if (ns_ra) {
228+
flb_ra_destroy(ns_ra);
229+
}
230+
flb_log_event_decoder_destroy(&log_decoder);
231+
FLB_OUTPUT_RETURN(FLB_ERROR);
232+
}
233+
234+
/* Create HTTP client */
235+
client = flb_http_client(u_conn,
236+
FLB_HTTP_POST, "/api/v1/ingest",
237+
body, flb_sds_len(body),
238+
ctx->server_host, ctx->server_port,
239+
NULL, 0);
240+
if (!client) {
241+
flb_plg_error(ctx->ins, "Could not create HTTP client");
242+
flb_sds_destroy(body);
243+
flb_sds_destroy(x_p_stream_value);
244+
msgpack_sbuffer_destroy(&sbuf);
245+
flb_upstream_conn_release(u_conn);
246+
if (ra) {
247+
flb_ra_destroy(ra);
248+
}
249+
if (ns_ra) {
250+
flb_ra_destroy(ns_ra);
251+
}
252+
flb_log_event_decoder_destroy(&log_decoder);
253+
FLB_OUTPUT_RETURN(FLB_ERROR);
254+
}
255+
256+
/* Set headers */
257+
flb_http_add_header(client, "Content-Type", 12, "application/json", 16);
258+
flb_http_add_header(client, "X-P-Stream", 10, x_p_stream_value, flb_sds_len(x_p_stream_value));
259+
flb_http_basic_auth(client, ctx->username, ctx->password);
260+
261+
/* Perform request */
262+
ret = flb_http_do(client, &b_sent);
263+
flb_plg_info(ctx->ins, "HTTP request sent. Status=%i", client->resp.status);
264+
265+
/* Clean up resources for this iteration */
266+
flb_sds_destroy(body);
267+
flb_sds_destroy(x_p_stream_value);
268+
flb_http_client_destroy(client);
269+
flb_upstream_conn_release(u_conn);
270+
msgpack_sbuffer_destroy(&sbuf);
271+
}
272+
273+
/* Final cleanup */
274+
if (ra) {
275+
flb_ra_destroy(ra);
276+
}
277+
if (ns_ra) {
278+
flb_ra_destroy(ns_ra);
279+
}
280+
flb_log_event_decoder_destroy(&log_decoder);
281+
FLB_OUTPUT_RETURN(FLB_OK);
282+
}
283+
284+
static int cb_parseable_exit(void *data, struct flb_config *config)
285+
{
286+
struct flb_out_parseable *ctx = data;
287+
288+
if (!ctx) {
289+
return 0;
290+
}
291+
292+
if (ctx->exclude_namespaces) {
293+
flb_slist_destroy((struct mk_list *)ctx->exclude_namespaces);
294+
}
295+
296+
/* Free up resources */
297+
if (ctx->upstream) {
298+
flb_upstream_destroy(ctx->upstream);
299+
}
300+
flb_free(ctx);
301+
return 0;
302+
}
303+
304+
/* Configuration properties map */
305+
static struct flb_config_map config_map[] = {
306+
{
307+
FLB_CONFIG_MAP_STR, "server_host", NULL,
308+
0, FLB_TRUE, offsetof(struct flb_out_parseable, server_host),
309+
"The host of the server to send logs to."
310+
},
311+
{
312+
FLB_CONFIG_MAP_STR, "username", NULL,
313+
0, FLB_TRUE, offsetof(struct flb_out_parseable, username),
314+
"The parseable server username."
315+
},
316+
{
317+
FLB_CONFIG_MAP_STR, "password", NULL,
318+
0, FLB_TRUE, offsetof(struct flb_out_parseable, password),
319+
"The parseable server password."
320+
},
321+
{
322+
FLB_CONFIG_MAP_STR, "stream", NULL,
323+
0, FLB_TRUE, offsetof(struct flb_out_parseable, stream),
324+
"The stream name to send logs to. Using $NAMESPACE will dynamically create a namespace."
325+
},
326+
{
327+
FLB_CONFIG_MAP_INT, "server_port", NULL,
328+
0, FLB_TRUE, offsetof(struct flb_out_parseable, server_port),
329+
"The port on the host to send logs to."
330+
},
331+
{
332+
FLB_CONFIG_MAP_CLIST, "Exclude_Namespaces", NULL,
333+
0, FLB_TRUE, offsetof(struct flb_out_parseable, exclude_namespaces),
334+
"A space-separated list of Kubernetes namespaces to exclude from log forwarding."
335+
},
336+
/* EOF */
337+
{0}
338+
};
339+
340+
/* Plugin registration */
341+
struct flb_output_plugin out_parseable_plugin = {
342+
.name = "parseable",
343+
.description = "Sends events to a HTTP server",
344+
.cb_init = cb_parseable_init,
345+
.cb_flush = cb_parseable_flush,
346+
.cb_exit = cb_parseable_exit,
347+
.flags = 0,
348+
.event_type = FLB_OUTPUT_LOGS,
349+
.config_map = config_map
350+
};

plugins/out_parseable/parseable.h

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
#ifndef FLB_OUT_PARSEABLE_H
2+
#define FLB_OUT_PARSEABLE_H
3+
4+
#include <fluent-bit/flb_output_plugin.h>
5+
#include <fluent-bit/flb_sds.h>
6+
7+
struct flb_out_parseable {
8+
flb_sds_t server_host;
9+
int server_port;
10+
flb_sds_t username;
11+
flb_sds_t password;
12+
flb_sds_t stream;
13+
struct cfl_list *exclude_namespaces; // Use cfl_list for namespace exclusion
14+
struct flb_upstream *upstream;
15+
struct flb_output_instance *ins;
16+
};
17+
18+
#endif

0 commit comments

Comments
 (0)