Skip to content

Commit 9472157

Browse files
leonardo-albertovichsparrc
authored andcommitted
in_forward: fix connection release on pause memory corruption
This change fixes a use after free issue related to connection disposal which caused the event handler to access invalid memory when the memory limits were exceeded during ingestion. In order to overcome this issue we track the plugin instances state and delay the connection cleanup process. Signed-off-by: Leonardo Albertovich <[email protected]>
1 parent 8579d63 commit 9472157

File tree

3 files changed

+66
-6
lines changed

3 files changed

+66
-6
lines changed

plugins/in_forward/fw.c

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,28 +124,37 @@ static int fw_unix_create(struct flb_in_fw_config *ctx)
124124
static int in_fw_collect(struct flb_input_instance *ins,
125125
struct flb_config *config, void *in_context)
126126
{
127+
int state_backup;
127128
struct flb_connection *connection;
128129
struct fw_conn *conn;
129130
struct flb_in_fw_config *ctx;
130131

131132
ctx = in_context;
132133

134+
state_backup = ctx->state;
135+
ctx->state = FW_INSTANCE_STATE_ACCEPTING_CLIENT;
136+
133137
connection = flb_downstream_conn_get(ctx->downstream);
134138

135139
if (connection == NULL) {
136140
flb_plg_error(ctx->ins, "could not accept new connection");
141+
ctx->state = state_backup;
137142

138143
return -1;
139144
}
140145

141146
if (!config->is_ingestion_active) {
142147
flb_downstream_conn_release(connection);
148+
ctx->state = state_backup;
149+
143150
return -1;
144151
}
145152

146153
if(ctx->is_paused) {
147154
flb_downstream_conn_release(connection);
148155
flb_plg_trace(ins, "TCP connection will be closed FD=%i", connection->fd);
156+
ctx->state = state_backup;
157+
149158
return -1;
150159
}
151160

@@ -154,9 +163,17 @@ static int in_fw_collect(struct flb_input_instance *ins,
154163
conn = fw_conn_add(connection, ctx);
155164
if (!conn) {
156165
flb_downstream_conn_release(connection);
166+
ctx->state = state_backup;
167+
157168
return -1;
158169
}
159170

171+
ctx->state = state_backup;
172+
173+
if (ctx->state == FW_INSTANCE_STATE_PAUSED) {
174+
fw_conn_del_all(ctx);
175+
}
176+
160177
return 0;
161178
}
162179

@@ -263,6 +280,7 @@ static int in_fw_init(struct flb_input_instance *ins,
263280
return -1;
264281
}
265282

283+
ctx->state = FW_INSTANCE_STATE_RUNNING;
266284
ctx->coll_fd = -1;
267285
ctx->ins = ins;
268286
mk_list_init(&ctx->connections);
@@ -386,7 +404,10 @@ static void in_fw_pause(void *data, struct flb_config *config)
386404
return;
387405
}
388406

389-
fw_conn_del_all(ctx);
407+
if (ctx->state == FW_INSTANCE_STATE_RUNNING) {
408+
fw_conn_del_all(ctx);
409+
}
410+
390411
ctx->is_paused = FLB_TRUE;
391412
ret = pthread_mutex_unlock(&ctx->conn_mutex);
392413
if (ret != 0) {
@@ -406,6 +427,8 @@ static void in_fw_pause(void *data, struct flb_config *config)
406427
if (config->is_ingestion_active == FLB_FALSE) {
407428
fw_conn_del_all(ctx);
408429
}
430+
431+
ctx->state = FW_INSTANCE_STATE_PAUSED;
409432
}
410433

411434
static void in_fw_resume(void *data, struct flb_config *config) {
@@ -427,6 +450,8 @@ static void in_fw_resume(void *data, struct flb_config *config) {
427450
flb_plg_error(ctx->ins, "cannot unlock collector mutex");
428451
return;
429452
}
453+
454+
ctx->state = FW_INSTANCE_STATE_RUNNING;
430455
}
431456
}
432457

plugins/in_forward/fw.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,12 @@
2525
#include <fluent-bit/flb_log_event_decoder.h>
2626
#include <fluent-bit/flb_log_event_encoder.h>
2727

28+
#define FW_INSTANCE_STATE_RUNNING 0
29+
#define FW_INSTANCE_STATE_ACCEPTING_CLIENT 1
30+
#define FW_INSTANCE_STATE_PROCESSING_PACKET 2
31+
#define FW_INSTANCE_STATE_PAUSED 3
32+
33+
2834
enum {
2935
FW_HANDSHAKE_HELO = 1,
3036
FW_HANDSHAKE_PINGPONG = 2,
@@ -76,6 +82,8 @@ struct flb_in_fw_config {
7682

7783
pthread_mutex_t conn_mutex;
7884

85+
int state;
86+
7987
/* Plugin is paused */
8088
int is_paused;
8189
};

plugins/in_forward/fw_conn.c

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,7 @@
2828
#include "fw_prot.h"
2929
#include "fw_conn.h"
3030

31-
/* Callback invoked every time an event is triggered for a connection */
32-
int fw_conn_event(void *data)
31+
static int fw_conn_event_internal(struct flb_connection *connection)
3332
{
3433
int ret;
3534
int bytes;
@@ -39,9 +38,6 @@ int fw_conn_event(void *data)
3938
struct fw_conn *conn;
4039
struct mk_event *event;
4140
struct flb_in_fw_config *ctx;
42-
struct flb_connection *connection;
43-
44-
connection = (struct flb_connection *) data;
4541

4642
conn = connection->user_data;
4743

@@ -127,6 +123,37 @@ int fw_conn_event(void *data)
127123
return 0;
128124
}
129125

126+
/* Callback invoked every time an event is triggered for a connection */
127+
int fw_conn_event(void *data)
128+
{
129+
struct flb_in_fw_config *ctx;
130+
struct fw_conn *conn;
131+
int result;
132+
struct flb_connection *connection;
133+
int state_backup;
134+
135+
connection = (struct flb_connection *) data;
136+
137+
conn = connection->user_data;
138+
139+
ctx = conn->ctx;
140+
141+
state_backup = ctx->state;
142+
143+
ctx->state = FW_INSTANCE_STATE_PROCESSING_PACKET;
144+
145+
result = fw_conn_event_internal(connection);
146+
147+
if (ctx->state == FW_INSTANCE_STATE_PROCESSING_PACKET) {
148+
ctx->state = state_backup;
149+
}
150+
else if (ctx->state == FW_INSTANCE_STATE_PAUSED) {
151+
fw_conn_del_all(ctx);
152+
}
153+
154+
return result;
155+
}
156+
130157
/* Create a new Forward request instance */
131158
struct fw_conn *fw_conn_add(struct flb_connection *connection, struct flb_in_fw_config *ctx)
132159
{

0 commit comments

Comments
 (0)