Skip to content

Commit 94557c2

Browse files
committed
Properly compute last_state in merged agg
1 parent 4845c74 commit 94557c2

File tree

1 file changed

+31
-19
lines changed

1 file changed

+31
-19
lines changed

extension/src/state_aggregate/rollup.rs

Lines changed: 31 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -112,25 +112,37 @@ impl OwnedCompactStateAgg {
112112

113113
let earlier_len = earlier.combined_durations.len();
114114

115-
let mut added_entries = 0;
116-
for dis in later.durations.iter() {
117-
let merged_duration_to_update = merged_durations.iter_mut().find(|merged_dis| {
118-
merged_dis.state.materialize(&merged_states) == dis.state.materialize(&later_states)
119-
});
120-
if let Some(merged_duration_to_update) = merged_duration_to_update {
121-
merged_duration_to_update.duration += dis.duration;
122-
} else {
123-
let state = dis
124-
.state
125-
.materialize(&later_states)
126-
.entry(&mut merged_states);
127-
merged_durations.push(DurationInState {
128-
state,
129-
duration: dis.duration,
130-
});
131-
added_entries += 1;
115+
let mut merged_last_state = None;
116+
for (later_idx, dis) in later.durations.iter().enumerate() {
117+
let materialized_dis = dis.state.materialize(&later_states);
118+
let merged_duration_info =
119+
merged_durations
120+
.iter_mut()
121+
.enumerate()
122+
.find(|(_, merged_dis)| {
123+
merged_dis.state.materialize(&merged_states) == materialized_dis
124+
});
125+
126+
let merged_idx =
127+
if let Some((merged_idx, merged_duration_to_update)) = merged_duration_info {
128+
merged_duration_to_update.duration += dis.duration;
129+
merged_idx
130+
} else {
131+
let state = materialized_dis.entry(&mut merged_states);
132+
merged_durations.push(DurationInState {
133+
state,
134+
duration: dis.duration,
135+
});
136+
merged_durations.len() - 1
137+
};
138+
139+
if later_idx == later.last_state as usize {
140+
// this is the last state
141+
merged_last_state = Some(merged_idx);
132142
};
133143
}
144+
let merged_last_state =
145+
merged_last_state.expect("later last_state not in later.durations") as u32;
134146

135147
let mut combined_durations = earlier
136148
.combined_durations
@@ -187,8 +199,8 @@ impl OwnedCompactStateAgg {
187199

188200
first_time: earlier.first_time,
189201
last_time: later.last_time,
190-
first_state: earlier.first_state,
191-
last_state: added_entries + later.last_state,
202+
first_state: earlier.first_state, // indexes into earlier durations are same for merged_durations
203+
last_state: merged_last_state,
192204

193205
// these values are always the same for both
194206
compact: earlier.compact,

0 commit comments

Comments
 (0)