-
Notifications
You must be signed in to change notification settings - Fork 307
Closed
Description
Problem
After upgrade fluentd from 1.15.2 to 1.17.0, fluentd (es output) encountered memory reclaim issue in complex fault scenarios.
memory is not released when ES status is crashloopback and fluentd is retring to push buffered log.
Steps to replicate
- env prepare
- start elastic cluster, nginx proxy for elastic, fluentd with ES output (fluentd ES outputs to nginx ip, nginx reserve-proxy to ES cluster)
- fault simulation
- a. config nginx to return 502 and reload nginx. (or stop nginx)
- b. wait fluentd to generate at least 2 buffer files (buffer.b..., buffer.qaaa, bufer.qbbb...)
- c. quickly revert step "a" and tail nginx log
- d. when fluentd begin to retry pushing buffered log (see nginx _bulk http 200 log): redo step "a"
- e. fluentd begin to output lots of error logs (for each log entry) and the memory of fluentd (v1.17.0) keeps increasing until OOM or container restart (liveness-probe: healthcheck failed), but old-version fluentd works well.
Configs
main nginx(openresty) config
server { listen 80;server_name _; set $xforwaredfor $remote_addr; proxy_set_header Host $http_host; proxy_set_header X-Forwarded-For $xforwaredfor; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection $connection_upgrade; proxy_http_version 1.1; proxy_connect_timeout 5s; proxy_send_timeout 30s; proxy_read_timeout 30s; proxy_buffers 64 8k; client_max_body_size 100m; location / { access_by_lua_block { ---ngx.sleep(math.random(14.5,16)); ---ngx.exit(502); } proxy_pass http://127.0.0.1:9200; } location /_bulk { access_by_lua_block { --- ngx.sleep(math.random(14.5,18)) --- ngx.exit(502); } proxy_pass http://127.0.0.1:9200; }
}
fluentd config
<ROOT> <label @FLUENT_LOG> <match **> @type null @id ignore_fluent_logs </match> </label> <source> @type gc_stat tag "gc.stat" </source> <match gc.stat> @type stdout </match> <source> @type tail @id fluentd_test_log @label @SOURCE path "/path/to/request.log" pos_file "/path/to/request.log.pos" tag "fluentd_test_log" read_from_head true follow_inodes true emit_unmatched_lines true <parse> @type "none" unmatched_lines true </parse> </source> <label @SOURCE> <filter **> @type record_transformer <record> _cluster xx _tag ${tag} _hostname fluentd-test-xxxx </record> </filter> <filter fluentd_test_log> @type grep <exclude> key "message" pattern /api1/ </exclude> </filter> <filter fluentd_test_log> @type grep <exclude> key "message" pattern /api2/ </exclude> </filter> <match **> @type relabel @label @DISPATCH </match> </label> <label @DISPATCH> <filter **> @type prometheus <metric> name fluentd_input_status_num_records_total type counter desc The total number of incoming records <labels> tag ${tag} hostname ${hostname} </labels> </metric> </filter> <match fluentd_test_log> @type relabel @label @OUTPUT__fluentd_test_log </match> </label> <label @OUTPUT__fluentd_test_log> <filter **> @type elasticsearch_genid hash_id_key "_hash" </filter> <match **> @type copy <store> @type "elasticsearch" hosts "nginx:80" path "" user "elastic" password xxxxxx index_name "fluentd-test" include_timestamp true request_timeout 15s bulk_message_request_threshold 20MB write_operation "create" id_key "_hash" remove_keys "_hash" <buffer> @type "file" path "/var/log/fluent/buf/elasticsearch/fluentd_test_log" path_suffix ".buf" total_limit_size 64GB flush_interval 5s flush_at_shutdown true retry_forever false retry_timeout 8h retry_max_times 486 retry_max_interval 60s </buffer> </store> </match> </label> <source> @type prometheus @id in_prometheus bind "0.0.0.0" port 24231 metrics_path "/metrics" </source> <source> @type prometheus_monitor @id in_prometheus_monitor </source> <source> @type prometheus_output_monitor @id in_prometheus_output_monitor </source> </ROOT>
error logs
# flood output (v1.17.0 + v1.15.2) 2024-10-09 20:10:41 +0800 [warn]: #0 dump an error event: error_class=Fluent::Plugin::ElasticsearchOutput::RecoverableRequestFailure error="could not push logs to Elasticsearch cluster ({:host=>\"10.x.x.x\", :port=>80, :scheme=>\"http\", :user=>\"elastic\", :password=>\"obfuscated\", :path=>\"\"}): Connection refused - connect(2) for 10.x.x.x:80 (Errno::ECONNREFUSED)" location="/fluentd/vendor/bundle/ruby/3.2.0/gems/fluent-plugin-elasticsearch-5.2.5/lib/fluent/plugin/out_elasticsearch.rb:1146:in `rescue in send_bulk'" tag="fluentd_test_log" time=2024-10-09 19:46:42.084289025 +0800 record={"message"=>"a long log text", "_cluster"=>"yyy", "_tag"=>"fluentd_test_log", "_hostname"=>"fluentd-test-6ddddf5c6-gdxmw", "_node_ip"=>"10.y.y.y", "_pool_name"=>"xxx_fluentd-test-yyy", "@timestamp"=>"2024-10-09T19:46:42.084289025+08:00"} 2024-10-09 20:10:41 +0800 [warn]: #0 dump an error event: error_class=Fluent::Plugin::ElasticsearchOutput::RecoverableRequestFailure error="could not push logs to Elasticsearch cluster ({:host=>\"10.x.x.x\", :port=>80, :scheme=>\"http\", :user=>\"elastic\", :password=>\"obfuscated\", :path=>\"\"}): Connection refused - connect(2) for 10.x.x.x:80 (Errno::ECONNREFUSED)" location="/fluentd/vendor/bundle/ruby/3.2.0/gems/fluent-plugin-elasticsearch-5.2.5/lib/fluent/plugin/out_elasticsearch.rb:1146:in `rescue in send_bulk'" tag="fluentd_test_log" time=2024-10-09 19:46:42.085269889 +0800 record={"message"=>"2024-10-09 19:46:42.085119371+08:00", "_cluster"=>"yyy", "_tag"=>"fluentd_test_log", "_hostname"=>"fluentd-test-6ddddf5c6-gdxmw", "_node_ip"=>"10.y.y.y", "_pool_name"=>"xxx_fluentd-test-yyy", "@timestamp"=>"2024-10-09T19:46:42.085269889+08:00"} 2024-10-09 20:10:41 +0800 [warn]: #0 dump an error event: error_class=Fluent::Plugin::ElasticsearchOutput::RecoverableRequestFailure error="could not push logs to Elasticsearch cluster ({:host=>\"10.x.x.x\", :port=>80, :scheme=>\"http\", :user=>\"elastic\", :password=>\"obfuscated\", :path=>\"\"}): Connection refused - connect(2) for 10.x.x.x:80 (Errno::ECONNREFUSED)" location="/fluentd/vendor/bundle/ruby/3.2.0/gems/fluent-plugin-elasticsearch-5.2.5/lib/fluent/plugin/out_elasticsearch.rb:1146:in `rescue in send_bulk'" tag="fluentd_test_log" time=2024-10-09 19:46:42.086607166 +0800 record={"message"=>"a long log text", "_cluster"=>"yyy", "_tag"=>"fluentd_test_log", "_hostname"=>"fluentd-test-6ddddf5c6-gdxmw", "_node_ip"=>"10.y.y.y", "_pool_name"=>"xxx_fluentd-test-yyy", "@timestamp"=>"2024-10-09T19:46:42.086607166+08:00"} 2024-10-09 20:10:41 +0800 [warn]: #0 dump an error event: error_class=Fluent::Plugin::ElasticsearchOutput::RecoverableRequestFailure error="could not push logs to Elasticsearch cluster ({:host=>\"10.x.x.x\", :port=>80, :scheme=>\"http\", :user=>\"elastic\", :password=>\"obfuscated\", :path=>\"\"}): Connection refused - connect(2) for 10.x.x.x:80 (Errno::ECONNREFUSED)" location="/fluentd/vendor/bundle/ruby/3.2.0/gems/fluent-plugin-elasticsearch-5.2.5/lib/fluent/plugin/out_elasticsearch.rb:1146:in `rescue in send_bulk'" tag="fluentd_test_log" time=2024-10-09 19:46:42.087835513 +0800 ^C2024-10-09 20:10:41 +0800 [warn]: #0 dump an error event: error_class=Fluent::Plugin::ElasticsearchOutput::RecoverableRequestFailure error="could not push logs to Elasticsearch cluster ({:host=>\"10.x.x.x\", :port=>80, :scheme=>\"http\", :user=>\"elastic\", :password=>\"obfuscated\", :path=>\"\"}): Connection refused - connect(2) for 10.x.x.x:80 (Errno::ECONNREFUSED)" location="/fluentd/vendor/bundle/ruby/3.2.0/gems/fluent-plugin-elasticsearch-5.2.5/lib/fluent/plugin/out_elasticsearch.rb:1146:in `rescue in send_bulk'" tag="fluentd_test_log" time=2024-10-09 19:46:42.089250369 +0800 record={"message"=>"a long log text", "_cluster"=>"yyy", "_tag"=>"fluentd_test_log", "_hostname"=>"fluentd-test-6ddddf5c6-gdxmw", "_node_ip"=>"10.y.y.y", "_pool_name"=>"xxx_fluentd-test-yyy", "@timestamp"=>"2024-10-09T19:46:42.089250369+08:00"} 2024-10-09 20:10:41 +0800 [warn]: #0 dump an error event: error_class=Fluent::Plugin::ElasticsearchOutput::RecoverableRequestFailure error="could not push logs to Elasticsearch cluster ({:host=>\"10.x.x.x\", :port=>80, :scheme=>\"http\", :user=>\"elastic\", :password=>\"obfuscated\", :path=>\"\"}): Connection refused - connect(2) for 10.x.x.x:80 (Errno::ECONNREFUSED)" location="/fluentd/vendor/bundle/ruby/3.2.0/gems/fluent-plugin-elasticsearch-5.2.5/lib/fluent/plugin/out_elasticsearch.rb:1146:in `rescue in send_bulk'" tag="fluentd_test_log" time=2024-10-09 19:46:42.090581442 +0800 record={"message"=>"2024-10-09 19:46:42.090401205+08:00", "_cluster"=>"yyy", "_tag"=>"fluentd_test_log", "_hostname"=>"fluentd-test-6ddddf5c6-gdxmw", "_node_ip"=>"10.y.y.y", "_pool_name"=>"xxx_fluentd-test-yyy", "@timestamp"=>"2024-10-09T19:46:42.090581442+08:00"}
log generator
while true;do date --rfc-3339=ns >> request.log; cat data.txt >> request.log; done &
Expected Behavior or What you need to ask
Why
Using Fluentd and ES plugin versions
- OS version:
almalinux 9.4
- Bare Metal or within Docker or Kubernetes or others:
k8s 1.26
- Fluentd version:
fluentd 1.17.0 & fluentd 1.15.2
- ES plugin version:
# 1.15.2
elasticsearch (7.17.1)
elasticsearch-api (7.17.1)
elasticsearch-transport (7.17.1)
elasticsearch-xpack (7.17.1)
fluent-plugin-elasticsearch (5.1.5)
# 1.17.0
elasticsearch (7.17.11)
elasticsearch-api (7.17.11)
elasticsearch-transport (7.17.11)
elasticsearch-xpack (7.17.11)
fluent-plugin-elasticsearch (5.2.5)
- ES version:
7.17.6
- Pod resource:
requests 256Mi, limits 512Mi