@@ -56,7 +56,7 @@ pub struct RollupTransState {
56
56
compact : bool ,
57
57
}
58
58
59
- #[ derive( Debug , Clone , Serialize , Deserialize ) ]
59
+ #[ derive( Debug , Clone , PartialEq , Eq , Serialize , Deserialize ) ]
60
60
struct OwnedCompactStateAgg {
61
61
durations : Vec < DurationInState > ,
62
62
combined_durations : Vec < TimeInState > ,
@@ -80,14 +80,22 @@ impl OwnedCompactStateAgg {
80
80
"can't merge aggs with different state types"
81
81
) ;
82
82
83
- let ( earlier, later) = match self . first_time . cmp ( & other. first_time ) {
83
+ let ( earlier, later) = match self . cmp ( & other) {
84
84
Ordering :: Less => ( self , other) ,
85
85
Ordering :: Greater => ( other, self ) ,
86
- Ordering :: Equal => panic ! ( "can't merge overlapping aggregates (same start time)" ) ,
86
+ Ordering :: Equal => panic ! (
87
+ "can't merge overlapping aggregates (same start time: {})" ,
88
+ self . first_time
89
+ ) ,
87
90
} ;
91
+
88
92
assert ! (
89
93
earlier. last_time <= later. first_time,
90
- "can't merge overlapping aggregates"
94
+ "can't merge overlapping aggregates (earlier={}-{}, later={}-{})" ,
95
+ earlier. first_time,
96
+ earlier. last_time,
97
+ later. first_time,
98
+ later. last_time,
91
99
) ;
92
100
assert_ne ! (
93
101
later. durations. len( ) ,
@@ -97,7 +105,7 @@ impl OwnedCompactStateAgg {
97
105
assert_ne ! (
98
106
earlier. durations. len( ) ,
99
107
0 ,
100
- "later aggregate must be non-empty"
108
+ "earlier aggregate must be non-empty"
101
109
) ;
102
110
103
111
let later_states =
@@ -108,25 +116,37 @@ impl OwnedCompactStateAgg {
108
116
109
117
let earlier_len = earlier. combined_durations . len ( ) ;
110
118
111
- let mut added_entries = 0 ;
112
- for dis in later. durations . iter ( ) {
113
- let merged_duration_to_update = merged_durations. iter_mut ( ) . find ( |merged_dis| {
114
- merged_dis. state . materialize ( & merged_states) == dis. state . materialize ( & later_states)
115
- } ) ;
116
- if let Some ( merged_duration_to_update) = merged_duration_to_update {
117
- merged_duration_to_update. duration += dis. duration ;
118
- } else {
119
- let state = dis
120
- . state
121
- . materialize ( & later_states)
122
- . entry ( & mut merged_states) ;
123
- merged_durations. push ( DurationInState {
124
- state,
125
- duration : dis. duration ,
126
- } ) ;
127
- added_entries += 1 ;
119
+ let mut merged_last_state = None ;
120
+ for ( later_idx, dis) in later. durations . iter ( ) . enumerate ( ) {
121
+ let materialized_dis = dis. state . materialize ( & later_states) ;
122
+ let merged_duration_info =
123
+ merged_durations
124
+ . iter_mut ( )
125
+ . enumerate ( )
126
+ . find ( |( _, merged_dis) | {
127
+ merged_dis. state . materialize ( & merged_states) == materialized_dis
128
+ } ) ;
129
+
130
+ let merged_idx =
131
+ if let Some ( ( merged_idx, merged_duration_to_update) ) = merged_duration_info {
132
+ merged_duration_to_update. duration += dis. duration ;
133
+ merged_idx
134
+ } else {
135
+ let state = materialized_dis. entry ( & mut merged_states) ;
136
+ merged_durations. push ( DurationInState {
137
+ state,
138
+ duration : dis. duration ,
139
+ } ) ;
140
+ merged_durations. len ( ) - 1
141
+ } ;
142
+
143
+ if later_idx == later. last_state as usize {
144
+ // this is the last state
145
+ merged_last_state = Some ( merged_idx) ;
128
146
} ;
129
147
}
148
+ let merged_last_state =
149
+ merged_last_state. expect ( "later last_state not in later.durations" ) as u32 ;
130
150
131
151
let mut combined_durations = earlier
132
152
. combined_durations
@@ -142,22 +162,36 @@ impl OwnedCompactStateAgg {
142
162
143
163
let gap = later. first_time - earlier. last_time ;
144
164
assert ! ( gap >= 0 ) ;
145
- merged_durations[ earlier. last_state as usize ] . duration += gap;
165
+ merged_durations
166
+ . get_mut ( earlier. last_state as usize )
167
+ . expect ( "earlier.last_state doesn't point to a state" )
168
+ . duration += gap;
146
169
147
170
// ensure combined_durations covers the whole range of time
148
171
if !earlier. compact {
149
- if combined_durations[ earlier_len - 1 ]
172
+ if combined_durations
173
+ . get_mut ( earlier_len - 1 )
174
+ . expect ( "invalid combined_durations: nothing at end of earlier" )
150
175
. state
151
176
. materialize ( & merged_states)
152
- == combined_durations[ earlier_len]
177
+ == combined_durations
178
+ . get ( earlier_len)
179
+ . expect ( "invalid combined_durations: nothing at start of earlier" )
153
180
. state
154
181
. materialize ( & merged_states)
155
182
{
156
- combined_durations[ earlier_len - 1 ] . end_time =
157
- combined_durations. remove ( earlier_len) . end_time ;
183
+ combined_durations
184
+ . get_mut ( earlier_len - 1 )
185
+ . expect ( "invalid combined_durations (nothing at earlier_len - 1, equal)" )
186
+ . end_time = combined_durations. remove ( earlier_len) . end_time ;
158
187
} else {
159
- combined_durations[ earlier_len - 1 ] . end_time =
160
- combined_durations[ earlier_len] . start_time ;
188
+ combined_durations
189
+ . get_mut ( earlier_len - 1 )
190
+ . expect ( "invalid combined_durations (nothing at earlier_len - 1, not equal)" )
191
+ . end_time = combined_durations
192
+ . get ( earlier_len)
193
+ . expect ( "invalid combined_durations (nothing at earlier_len, not equal)" )
194
+ . start_time ;
161
195
}
162
196
}
163
197
@@ -169,8 +203,8 @@ impl OwnedCompactStateAgg {
169
203
170
204
first_time : earlier. first_time ,
171
205
last_time : later. last_time ,
172
- first_state : earlier. first_state ,
173
- last_state : added_entries + later . last_state ,
206
+ first_state : earlier. first_state , // indexes into earlier durations are same for merged_durations
207
+ last_state : merged_last_state ,
174
208
175
209
// these values are always the same for both
176
210
compact : earlier. compact ,
@@ -216,8 +250,23 @@ impl<'a> From<CompactStateAgg<'a>> for OwnedCompactStateAgg {
216
250
}
217
251
}
218
252
253
+ impl PartialOrd for OwnedCompactStateAgg {
254
+ fn partial_cmp ( & self , other : & Self ) -> Option < Ordering > {
255
+ Some ( self . cmp ( other) )
256
+ }
257
+ }
258
+
259
+ impl Ord for OwnedCompactStateAgg {
260
+ fn cmp ( & self , other : & Self ) -> Ordering {
261
+ // compare using first time (OwnedCompactStateAgg::merge will handle any overlap)
262
+ self . first_time . cmp ( & other. first_time )
263
+ }
264
+ }
265
+
219
266
impl RollupTransState {
220
267
fn merge ( & mut self ) {
268
+ // OwnedCompactStateAgg::merge can't merge overlapping aggregates
269
+ self . values . sort ( ) ;
221
270
self . values = self
222
271
. values
223
272
. drain ( ..)
@@ -417,4 +466,108 @@ mod tests {
417
466
418
467
r2. merge ( r1) ;
419
468
}
469
+
470
+ #[ test]
471
+ fn merges_compact_aggs_correctly ( ) {
472
+ let s1 = OwnedCompactStateAgg {
473
+ durations : vec ! [
474
+ DurationInState {
475
+ duration: 500 ,
476
+ state: StateEntry :: from_integer( 555_2 ) ,
477
+ } ,
478
+ DurationInState {
479
+ duration: 400 ,
480
+ state: StateEntry :: from_integer( 555_1 ) ,
481
+ } ,
482
+ ] ,
483
+ combined_durations : vec ! [ ] ,
484
+ first_time : 100 ,
485
+ last_time : 1000 ,
486
+ first_state : 1 ,
487
+ last_state : 0 ,
488
+ states : vec ! [ ] ,
489
+ compact : true ,
490
+ integer_states : true ,
491
+ } ;
492
+ let s2 = OwnedCompactStateAgg {
493
+ durations : vec ! [
494
+ DurationInState {
495
+ duration: 500 ,
496
+ state: StateEntry :: from_integer( 555_2 ) ,
497
+ } ,
498
+ DurationInState {
499
+ duration: 400 ,
500
+ state: StateEntry :: from_integer( 555_1 ) ,
501
+ } ,
502
+ ] ,
503
+ combined_durations : vec ! [ ] ,
504
+ first_time : 1000 + 12345 ,
505
+ last_time : 1900 + 12345 ,
506
+ first_state : 1 ,
507
+ last_state : 0 ,
508
+ states : vec ! [ ] ,
509
+ compact : true ,
510
+ integer_states : true ,
511
+ } ;
512
+ let s3 = OwnedCompactStateAgg {
513
+ durations : vec ! [
514
+ DurationInState {
515
+ duration: 500 ,
516
+ state: StateEntry :: from_integer( 555_2 ) ,
517
+ } ,
518
+ DurationInState {
519
+ duration: 400 ,
520
+ state: StateEntry :: from_integer( 555_1 ) ,
521
+ } ,
522
+ ] ,
523
+ combined_durations : vec ! [ ] ,
524
+ first_time : 1900 + 12345 ,
525
+ last_time : 1900 + 12345 + 900 ,
526
+ first_state : 1 ,
527
+ last_state : 0 ,
528
+ states : vec ! [ ] ,
529
+ compact : true ,
530
+ integer_states : true ,
531
+ } ;
532
+ let expected = OwnedCompactStateAgg {
533
+ durations : vec ! [
534
+ DurationInState {
535
+ duration: 500 * 3 + 12345 ,
536
+ state: StateEntry :: from_integer( 555_2 ) ,
537
+ } ,
538
+ DurationInState {
539
+ duration: 400 * 3 ,
540
+ state: StateEntry :: from_integer( 555_1 ) ,
541
+ } ,
542
+ ] ,
543
+ combined_durations : vec ! [ ] ,
544
+ first_time : 100 ,
545
+ last_time : 1900 + 12345 + 900 ,
546
+ first_state : 1 ,
547
+ last_state : 0 ,
548
+ states : vec ! [ ] ,
549
+ compact : true ,
550
+ integer_states : true ,
551
+ } ;
552
+ let merged = s1. clone ( ) . merge ( s2. clone ( ) . merge ( s3. clone ( ) ) ) ;
553
+ assert_eq ! ( merged, expected) ;
554
+ let merged = s3. clone ( ) . merge ( s2. clone ( ) . merge ( s1. clone ( ) ) ) ;
555
+ assert_eq ! ( merged, expected) ;
556
+
557
+ let mut trans_state = RollupTransState {
558
+ values : vec ! [ s1. clone( ) , s2. clone( ) , s3. clone( ) ] ,
559
+ compact : true ,
560
+ } ;
561
+ trans_state. merge ( ) ;
562
+ assert_eq ! ( trans_state. values. len( ) , 1 ) ;
563
+ assert_eq ! ( trans_state. values[ 0 ] , expected. clone( ) ) ;
564
+
565
+ let mut trans_state = RollupTransState {
566
+ values : vec ! [ s3. clone( ) , s1. clone( ) , s2. clone( ) ] ,
567
+ compact : true ,
568
+ } ;
569
+ trans_state. merge ( ) ;
570
+ assert_eq ! ( trans_state. values. len( ) , 1 ) ;
571
+ assert_eq ! ( trans_state. values[ 0 ] , expected. clone( ) ) ;
572
+ }
420
573
}
0 commit comments