Skip to content

Conversation

@Hipska
Copy link
Contributor

@Hipska Hipska commented Oct 9, 2025

Summary

NATS Jetstream has support for message acknowledging, so make use of that.

Checklist

  • No AI generated code was used in this PR

Related issues

resolves #17791

@telegraf-tiger telegraf-tiger bot added feat Improvement on an existing feature such as adding a new setting/mode to an existing plugin plugin/input 1. Request for new input plugins 2. Issues/PRs that are related to input plugins labels Oct 9, 2025
Hipska added 2 commits October 9, 2025 13:17
# Conflicts:
#	plugins/inputs/nats_consumer/nats_consumer.go
Copy link
Member

@srebhan srebhan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Hipska for the PR, however, there are major issues!

  1. The n.undelivered list of in-flight metrics is never written to!
  2. Usually I would assume a separate onDelivery function that ACKs the message and removes the message from the sem channel instead of mixing this into the receiver function. The onDelivery function should then just run in its own go-routine.
  3. The n.undelivered list of in-flight metrics requires locking...
  4. Unit-tests are missing!

JsStream string `toml:"jetstream_stream"`
PendingMessageLimit int `toml:"pending_message_limit"`
PendingBytesLimit int `toml:"pending_bytes_limit"`
PendingBytesLimit int `toml:"pending_bytes_limit" deprecated:"1.37.0;1.40.0;unused"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm actually this setting is used in line 160, isn't it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoops, result of wrong merge. Will fix it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm still not fixed I think so unresolving my comment...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where do you still see it used in current code?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well you are removing this feature from the code and then deprecate the function without giving a reason on why you remove this! Furthermore, even if there is a good reason for removal it does not belong to this PR!

@srebhan srebhan self-assigned this Oct 10, 2025
@Hipska Hipska marked this pull request as draft October 10, 2025 09:21
@Hipska
Copy link
Contributor Author

Hipska commented Oct 10, 2025

I based the onDelivery function on how its implemented in inputs.amqp_consumer:

case track := <-acc.Delivered():
if a.onDelivery(track) {
<-sem
}

func (a *AMQPConsumer) onDelivery(track telegraf.DeliveryInfo) bool {
delivery, ok := a.deliveries[track.ID()]
if !ok {
// Added by a previous connection
return false
}
if track.Delivered() {
err := delivery.Ack(false)
if err != nil {
a.Log.Errorf("Unable to ack written delivery: %d: %v", delivery.DeliveryTag, err)
a.conn.Close()
}
} else {
err := delivery.Reject(false)
if err != nil {
a.Log.Errorf("Unable to reject failed delivery: %d: %v", delivery.DeliveryTag, err)
a.conn.Close()
}
}
delete(a.deliveries, track.ID())
return true
}

I added locking as requested, but I see the above mentioned code also doesn't have locking..

@Hipska Hipska marked this pull request as ready for review October 10, 2025 12:29
@Hipska Hipska force-pushed the feat/inputs/nats_consumer/ack_on_delivery branch from 1d31e7e to 935a5eb Compare October 10, 2025 15:05
Copy link
Member

@srebhan srebhan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Hipska! The code reads much better with your update. Please drop removing the pending bytes limit feature as it does NOT belong to this PR. Furthermore, there are some corner-cases to consider, see my comments in the code...

JsStream string `toml:"jetstream_stream"`
PendingMessageLimit int `toml:"pending_message_limit"`
PendingBytesLimit int `toml:"pending_bytes_limit"`
PendingBytesLimit int `toml:"pending_bytes_limit" deprecated:"1.37.0;1.40.0;unused"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm still not fixed I think so unresolving my comment...

func (n *NatsConsumer) Start(acc telegraf.Accumulator) error {
n.sem = make(semaphore, n.MaxUndeliveredMessages)
n.acc = acc.WithTracking(n.MaxUndeliveredMessages)
n.undelivered = make(map[telegraf.TrackingID]*nats.Msg)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about

Suggested change
n.undelivered = make(map[telegraf.TrackingID]*nats.Msg)
n.undelivered = make(map[telegraf.TrackingID]*nats.Msg, PendingMessageLimit)

Copy link
Contributor Author

@Hipska Hipska Oct 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not very sure, I would use MaxUndeliveredMessages instead if needed.

And I'm starting to think they are actually meaning the same..

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well I wonder if those two parameters don't mean the same thing. But yeah if the mean different things I agree to use MaxUndeliveredMessages as this is what is meant here.

Comment on lines 256 to 292
func (n *NatsConsumer) waitForDelivery(parentCtx context.Context) {
for {
select {
case <-parentCtx.Done():
return
case track := <-n.acc.Delivered():
<-n.sem
msg := n.removeDelivered(track.ID())

if msg != nil {
if track.Delivered() {
err := msg.Ack()
if err != nil {
n.Log.Errorf("Failed to Ack message on subject %s: %v", msg.Subject, err)
}
} else {
err := msg.Nak()
if err != nil {
n.Log.Errorf("Failed to Nak message on subject %s: %v", msg.Subject, err)
}
}
}
}
}
}

func (n *NatsConsumer) removeDelivered(id telegraf.TrackingID) *nats.Msg {
n.Lock()
defer n.Unlock()

msg, ok := n.undelivered[id]
if !ok {
return nil
}
delete(n.undelivered, id)
return msg
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about

Suggested change
func (n *NatsConsumer) waitForDelivery(parentCtx context.Context) {
for {
select {
case <-parentCtx.Done():
return
case track := <-n.acc.Delivered():
<-n.sem
msg := n.removeDelivered(track.ID())
if msg != nil {
if track.Delivered() {
err := msg.Ack()
if err != nil {
n.Log.Errorf("Failed to Ack message on subject %s: %v", msg.Subject, err)
}
} else {
err := msg.Nak()
if err != nil {
n.Log.Errorf("Failed to Nak message on subject %s: %v", msg.Subject, err)
}
}
}
}
}
}
func (n *NatsConsumer) removeDelivered(id telegraf.TrackingID) *nats.Msg {
n.Lock()
defer n.Unlock()
msg, ok := n.undelivered[id]
if !ok {
return nil
}
delete(n.undelivered, id)
return msg
}
func (n *NatsConsumer) handleDelivery(ctx context.Context) {
for {
select {
case <-ctx.Done():
// Plugin is stopping
return
case track := <-n.acc.Delivered():
// Get the delivered message and remove it from the internal tracking
// mechanism
n.Lock()
msg, found := n.undelivered[id]
delete(n.undelivered, id)
defer n.Unlock()
// Make space for a new message to be received despite any error case
<-n.sem
if !found {
n.Log.Errorf("received delivery event for unknown message %v", id)
continue
}
// Acknowledge the message
if track.Delivered() {
if err := msg.Ack(); err != nil {
n.Log.Errorf("Failed to Ack message on subject %s: %v", msg.Subject, err)
}
} else {
if err := msg.Nak(); err != nil {
n.Log.Errorf("Failed to Nak message on subject %s: %v", msg.Subject, err)
}
}
}
}
}

I'm not sure we should NAK the message here if this means the message is re-queued to be honest. The background is that if only a single metric is rejected (for whatever reason) the whole NATS message is NAK'ed. If the underlying reason for rejecting the metric is permanent, this may trigger an infinite loop for the message as it will always be NAK'ed and thus may render the input dysfunctional if enough of those messages are consumed (i.e. more than the allowed max pending).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't that the whole purpose of tracking metrics?

Copy link
Member

@srebhan srebhan Oct 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it is. The purpose of tracking metrics is to allow an input to know if a message was delivered or is "in flight". If you receive a delivery event it means that the metric was processed by the endpoint (it might be a processor as well, just for clarity). The Delivered function tells you if all messages were delivered successfully.

Now there are different ways to handle "unsuccessful" messages. In PR #15796 we implemented NAK'ing messages for AMQP if delivery fails ensuring that the messages are not re-enqueued in the topic.

Coming back to this PR: If delivery fails, i.e. the metric could not be handled by the output plugin (e.g. serialization fails, format is invalid etc or the endpoint got the metric but something went wrong, and you NAK the message, you must make sure that this message is not sent again, otherwise you end up in an infinite loop of getting the message -> delivery fails -> NAK message - back to square one. Now if you not only have one of those metrics but they do appear sporadically, you will end up with a dysfunctional input only being busy with NAK'ed messages...

So please either make sure the NAK'ed messages are not re-enqueued in the topic or do not NAK those messages!

@telegraf-tiger
Copy link
Contributor

@Hipska Hipska requested a review from srebhan October 20, 2025 08:48
Copy link
Member

@srebhan srebhan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Hipska! There are still some issues here:

  1. Why do you remove the PendingBytesLimit option? You did not provide any reason for doing so and this does not belong to this PR. Please keep the option!
  2. You are leaking messages which are not acknowledged if they are non-Jetstream.
  3. NAK'ing messages might re-queue the failed message ending up in an infinite loop blocking the plugin.

Please address those issues!

func (n *NatsConsumer) Start(acc telegraf.Accumulator) error {
n.sem = make(semaphore, n.MaxUndeliveredMessages)
n.acc = acc.WithTracking(n.MaxUndeliveredMessages)
n.undelivered = make(map[telegraf.TrackingID]*nats.Msg)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
n.undelivered = make(map[telegraf.TrackingID]*nats.Msg)
n.undelivered = make(map[telegraf.TrackingID]*nats.Msg, n.MaxUndeliveredMessages)

Comment on lines -137 to -138
// set the subscription pending limits
err = sub.SetPendingLimits(n.PendingMessageLimit, n.PendingBytesLimit)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you remove this feature?

Comment on lines +228 to 233
for _, s := range n.jsSubs {
if msg.Sub == s {
n.handleJetstreamMessage(msg)
break L
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need this? It wasn't required before... This reads like yet another thing folded into this PR!

Comment on lines +295 to +298
err := msg.Nak()
if err != nil {
n.Log.Errorf("Failed to Nak JetStream message on subject %s: %v", msg.Subject, err)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It won't make my concerns go away if you do not address them. :-) How can we make sure that messages with permanent errors are not re-enqueued/re-send by NATS?

Comment on lines +253 to +256
for _, m := range metrics {
m.AddTag("subject", msg.Subject)
}
return n.acc.AddTrackingMetricGroup(metrics), nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are not adding the tracking ID to the n.undelivered lookup so you will never be able to ACK this message!

@srebhan srebhan added the waiting for response waiting for response from contributor label Oct 23, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

feat Improvement on an existing feature such as adding a new setting/mode to an existing plugin plugin/input 1. Request for new input plugins 2. Issues/PRs that are related to input plugins waiting for response waiting for response from contributor

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[[inputs.nats_consumer]] Don't ack messages not delivered to output successfully

2 participants