Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog.d/23872_buffer_counter_underflowed.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Fix buffer counter underflowed, caused by the counter has not been updated(increase) timely when new event is coming.

authors: sialais
30 changes: 14 additions & 16 deletions lib/vector-buffers/src/topology/channel/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,16 @@ impl<T: Bufferable> BufferSender<T> {
.as_ref()
.map(|_| (item.event_count(), item.size_of()));

let mut sent_to_base = true;
let mut was_dropped = false;

if let Some(instrumentation) = self.instrumentation.as_ref()
&& let Some((item_count, item_size)) = item_sizing
{
instrumentation.increment_received_event_count_and_byte_size(
item_count as u64,
item_size as u64,
);
}
match self.when_full {
WhenFull::Block => self.base.send(item).await?,
WhenFull::DropNewest => {
Expand All @@ -213,7 +221,7 @@ impl<T: Bufferable> BufferSender<T> {
}
WhenFull::Overflow => {
if let Some(item) = self.base.try_send(item).await? {
sent_to_base = false;
was_dropped = true;
self.overflow
.as_mut()
.unwrap_or_else(|| unreachable!("overflow must exist"))
Expand All @@ -223,23 +231,9 @@ impl<T: Bufferable> BufferSender<T> {
}
}

if (sent_to_base || was_dropped)
&& let (Some(send_duration), Some(send_reference)) =
(self.send_duration.as_ref(), send_reference)
{
send_duration.emit(send_reference.elapsed());
}

if let Some(instrumentation) = self.instrumentation.as_ref()
&& let Some((item_count, item_size)) = item_sizing
{
if sent_to_base {
instrumentation.increment_received_event_count_and_byte_size(
item_count as u64,
item_size as u64,
);
}

if was_dropped {
instrumentation.increment_dropped_event_count_and_byte_size(
item_count as u64,
Expand All @@ -248,6 +242,10 @@ impl<T: Bufferable> BufferSender<T> {
);
}
}
if let Some(send_duration) = self.send_duration.as_ref()
&& let Some(send_reference) = send_reference {
send_duration.emit(send_reference.elapsed());
}

Ok(())
}
Expand Down
Loading