Skip to content

Commit bd9b877

Browse files
sialaisbrucegpront
authored
fix(instrument): Buffer counter underflowed (#23872) (#23973)
* fix(instrument): Buffer counter underflowed (#23872) * Update code to be more clearly with to let Co-authored-by: Bruce Guenter <[email protected]> * format fix to meet format&clippy --------- Co-authored-by: Bruce Guenter <[email protected]> Co-authored-by: Pavlos Rontidis <[email protected]>
1 parent 8c5af62 commit bd9b877

File tree

2 files changed

+22
-23
lines changed

2 files changed

+22
-23
lines changed
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Fix buffer counter underflowed, caused by the counter has not been updated(increase) timely when new event is coming.
2+
3+
authors: sialais

lib/vector-buffers/src/topology/channel/sender.rs

Lines changed: 19 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -202,8 +202,14 @@ impl<T: Bufferable> BufferSender<T> {
202202
.as_ref()
203203
.map(|_| (item.event_count(), item.size_of()));
204204

205-
let mut sent_to_base = true;
206205
let mut was_dropped = false;
206+
207+
if let Some(instrumentation) = self.instrumentation.as_ref()
208+
&& let Some((item_count, item_size)) = item_sizing
209+
{
210+
instrumentation
211+
.increment_received_event_count_and_byte_size(item_count as u64, item_size as u64);
212+
}
207213
match self.when_full {
208214
WhenFull::Block => self.base.send(item).await?,
209215
WhenFull::DropNewest => {
@@ -213,7 +219,7 @@ impl<T: Bufferable> BufferSender<T> {
213219
}
214220
WhenFull::Overflow => {
215221
if let Some(item) = self.base.try_send(item).await? {
216-
sent_to_base = false;
222+
was_dropped = true;
217223
self.overflow
218224
.as_mut()
219225
.unwrap_or_else(|| unreachable!("overflow must exist"))
@@ -223,30 +229,20 @@ impl<T: Bufferable> BufferSender<T> {
223229
}
224230
}
225231

226-
if (sent_to_base || was_dropped)
227-
&& let (Some(send_duration), Some(send_reference)) =
228-
(self.send_duration.as_ref(), send_reference)
229-
{
230-
send_duration.emit(send_reference.elapsed());
231-
}
232-
233232
if let Some(instrumentation) = self.instrumentation.as_ref()
234233
&& let Some((item_count, item_size)) = item_sizing
234+
&& was_dropped
235235
{
236-
if sent_to_base {
237-
instrumentation.increment_received_event_count_and_byte_size(
238-
item_count as u64,
239-
item_size as u64,
240-
);
241-
}
242-
243-
if was_dropped {
244-
instrumentation.increment_dropped_event_count_and_byte_size(
245-
item_count as u64,
246-
item_size as u64,
247-
true,
248-
);
249-
}
236+
instrumentation.increment_dropped_event_count_and_byte_size(
237+
item_count as u64,
238+
item_size as u64,
239+
true,
240+
);
241+
}
242+
if let Some(send_duration) = self.send_duration.as_ref()
243+
&& let Some(send_reference) = send_reference
244+
{
245+
send_duration.emit(send_reference.elapsed());
250246
}
251247

252248
Ok(())

0 commit comments

Comments
 (0)