-
Notifications
You must be signed in to change notification settings - Fork 307
Closed
Labels
Description
Problem
I would like to use the gen_id feature to avoid duplicate documents in Elasticsearch. Currently, I employ Fluent Bit on my servers to read and parse logs. These logs are then sent to Fluentd, acting as an aggregator, which subsequently pushes them to the Elasticsearch cluster.
However, I've encountered an issue where the id_key doesn't function as expected on the elasticsearch_data_stream. This results in the addition of a _hash field to the documents in Elasticsearch, but it is not being recognized as the _id field.
Steps to replicate
<system>
log_level info
workers 8
</system>
<source>
@type forward
port 24224
bind 0.0.0.0
</source>
<filter data.**>
@type record_transformer
enable_ruby true
<record>
...
</record>
</filter>
<filter data.**>
@type elasticsearch_genid
use_record_as_seed true
record_keys []
use_entire_record true
hash_type sha1
hash_id_key _hash
separator _
inc_time_as_key false
inc_tag_as_key false
</filter>
<filter stream.**>
@type elasticsearch_genid
use_record_as_seed true
record_keys []
use_entire_record true
hash_type sha1
hash_id_key _hash
separator _
inc_time_as_key false
inc_tag_as_key false
</filter>
<match data.**>
@type copy
<store>
@type prometheus
<metric>
name fluentd_output_status_num_records_total
type counter
desc The total number of processed records for data streams
<labels>
tag ${tag}
hostname ${hostname}
</labels>
</metric>
</store>
<store>
@type elasticsearch_data_stream
host sample_host:9200
scheme https
ssl_verify false
id_key _hash
write_operation create
remove_keys _hash
user sample_user
password sample_pass
include_tag_key true
include_timestamp true
reconnect_on_error true
reload_on_failure true
reload_connections false
data_stream_name data
buffer_type memory
retry_forever true
overflow_action block
<buffer>
@type memory
flush_thread_count 1
flush_interval 10s
chunk_limit_size 4M
total_limit_size 512M
retry_forever true
</buffer>
</store>
</match>
<match stream.**>
@type copy
<store>
@type prometheus
<metric>
name fluentd_output_status_num_records_total
type counter
desc The total number of processed records for data streams
<labels>
tag ${tag}
hostname ${hostname}
</labels>
</metric>
</store>
<store>
@type elasticsearch_data_stream
hosts sample_host:9200
scheme https
ssl_verify false
id_key _hash
write_operation create
remove_keys _hash
user sample_user
password sample_pass
include_tag_key true
reconnect_on_error true
reload_on_failure true
reload_connections false
data_stream_name stream_log
buffer_type memory
retry_forever true
overflow_action block
<buffer>
@type memory
flush_thread_count 1
flush_interval 10s
chunk_limit_size 4M
total_limit_size 512M
retry_forever true
</buffer>
</store>
</match>
Expected Behavior or What you need to ask
Use _hash as id field for checking the uniqueness of docs in elasticsearch but didn't do this.
Using Fluentd and ES plugin versions
- Debian GNU/Linux 11 (bullseye)
- Bare Metal
- fluent-package 5.0.2 fluentd 1.16.3
- ES plugin :
*** LOCAL GEMS ***
abbrev (default: 0.1.1)
addressable (2.8.5)
async (1.31.0)
async-http (0.61.0)
async-io (1.38.0)
async-pool (0.4.0)
aws-eventstream (1.2.0)
aws-partitions (1.785.0)
aws-sdk-core (3.178.0)
aws-sdk-kms (1.71.0)
aws-sdk-s3 (1.129.0)
aws-sdk-sqs (1.61.0)
aws-sigv4 (1.6.0)
base64 (0.2.0, default: 0.1.1)
benchmark (default: 0.2.1)
bigdecimal (default: 3.1.3)
bindata (2.4.15)
bundler (default: 2.5.3, 2.3.26)
cgi (default: 0.3.6)
cmetrics (0.3.3)
concurrent-ruby (1.2.2)
console (1.23.2)
cool.io (1.8.0)
csv (default: 3.2.6)
date (default: 3.3.3)
debug (1.7.1)
delegate (default: 0.3.0)
did_you_mean (default: 1.6.3)
digest (default: 3.1.1)
digest-crc (0.6.5)
digest-murmurhash (1.1.1)
drb (default: 2.1.1)
elastic-transport (8.3.0)
elasticsearch (8.11.0, 8.8.0)
elasticsearch-api (8.11.0, 8.8.0)
english (default: 0.7.2)
erb (default: 4.0.2)
error_highlight (default: 0.5.1)
etc (default: 1.4.2)
excon (0.108.0, 0.104.0)
faraday (2.7.12)
faraday-excon (2.1.0)
faraday-net_http (3.0.2)
faraday_middleware-aws-sigv4 (1.0.1)
fcntl (default: 1.0.2)
ffi (1.15.5)
fiber-annotation (0.2.0)
fiber-local (1.0.0)
fiddle (default: 1.1.1)
fileutils (1.7.2, default: 1.7.0)
find (default: 0.1.1)
fluent-config-regexp-type (1.0.0)
fluent-diagtool (1.0.3)
fluent-logger (0.9.0)
fluent-plugin-calyptia-monitoring (0.1.3)
fluent-plugin-elasticsearch (5.4.2, 5.4.0)
fluent-plugin-flowcounter-simple (0.1.0)
fluent-plugin-kafka (0.19.2)
fluent-plugin-metrics-cmetrics (0.1.2)
fluent-plugin-opensearch (1.1.4)
fluent-plugin-prometheus (2.1.0)
fluent-plugin-prometheus_pushgateway (0.1.1)
fluent-plugin-record-modifier (2.2.0, 2.1.1)
fluent-plugin-rewrite-tag-filter (2.4.0)
fluent-plugin-s3 (1.7.2)
fluent-plugin-sd-dns (0.1.0)
fluent-plugin-systemd (1.0.5)
fluent-plugin-td (1.2.0)
fluent-plugin-utmpx (0.5.0)
fluent-plugin-webhdfs (1.5.0)
fluentd (1.16.3)
forwardable (default: 1.3.3)
getoptlong (default: 0.2.0)
hirb (0.7.3)
http_parser.rb (0.8.0)
httpclient (2.8.3)
io-console (default: 0.6.0)
io-nonblock (default: 0.2.0)
io-wait (default: 0.3.0)
ipaddr (default: 1.2.5)
irb (default: 1.6.2)
jmespath (1.6.2)
json (default: 2.6.3)
linux-utmpx (0.3.0)
logger (default: 1.5.3)
ltsv (0.1.2)
matrix (0.4.2)
mini_portile2 (2.8.2)
minitest (5.16.3)
msgpack (1.7.2)
multi_json (1.15.0)
mutex_m (default: 0.1.2)
net-ftp (0.2.0)
net-http (default: 0.3.2)
net-imap (0.3.4)
net-pop (0.1.2)
net-protocol (default: 0.2.1)
net-smtp (0.3.3)
nio4r (2.6.1)
nkf (default: 0.1.2)
observer (default: 0.1.1)
oj (3.16.1)
open-uri (default: 0.3.0)
open3 (default: 0.1.2)
opensearch-api (2.2.0)
opensearch-ruby (2.1.0)
opensearch-transport (2.1.0)
openssl (default: 3.1.0)
optparse (default: 0.3.1)
ostruct (default: 0.5.5)
parallel (1.20.1)
pathname (default: 0.2.1)
power_assert (2.0.3)
pp (default: 0.4.0)
prettyprint (default: 0.1.1)
prime (0.1.2)
prometheus-client (2.1.0)
protocol-hpack (1.4.2)
protocol-http (0.25.0)
protocol-http1 (0.16.0)
protocol-http2 (0.15.1)
pstore (default: 0.1.2)
psych (default: 5.0.1)
public_suffix (5.0.4)
racc (default: 1.6.2)
rake (13.1.0, 13.0.6)
rbs (2.8.2)
rdkafka (0.12.0)
rdoc (default: 6.5.0)
readline (default: 0.0.3)
readline-ext (default: 0.1.5)
reline (default: 0.3.2)
resolv (default: 0.2.2)
resolv-replace (default: 0.1.1)
rexml (3.2.6, 3.2.5)
rinda (default: 0.1.1)
rss (0.2.9)
ruby-kafka (1.5.0)
ruby-progressbar (1.13.0)
ruby2_keywords (default: 0.0.5)
rubygems-update (3.5.3)
rubyzip (1.3.0)
securerandom (default: 0.2.2)
serverengine (2.3.2)
set (default: 1.0.3)
shellwords (default: 0.1.0)
sigdump (0.2.5)
singleton (default: 0.1.1)
stringio (default: 3.0.4)
strptime (0.2.5)
strscan (default: 3.0.5)
syntax_suggest (default: 1.0.2)
syslog (default: 0.1.1)
systemd-journal (1.4.2)
td (0.17.1)
td-client (1.0.8)
td-logger (0.3.28)
tempfile (default: 0.1.3)
test-unit (3.5.7)
time (default: 0.2.2)
timeout (default: 0.3.1)
timers (4.3.5)
tmpdir (default: 0.1.3)
traces (0.11.1)
tsort (default: 0.1.1)
typeprof (0.21.3)
tzinfo (2.0.6)
tzinfo-data (1.2023.3)
un (default: 0.2.1)
uri (0.12.2, default: 0.12.1)
weakref (default: 0.1.2)
webhdfs (0.10.2)
webrick (1.8.1)
yajl-ruby (1.4.3)
yaml (default: 0.2.1)
zip-zip (0.3)
zlib (default: 3.0.0)
- ES version 8.11.4
guoard, castorsky, AntonioAlfrz and joseacl