@@ -107,8 +107,8 @@ impl HeartbeatTransState {
107
107
}
108
108
109
109
fn combine_intervals ( & mut self , new_intervals : Vec < ( i64 , i64 ) > ) {
110
- // Optimized path for ordered inputs
111
- if self . liveness . last ( ) . unwrap ( ) . 0 < new_intervals. first ( ) . unwrap ( ) . 0 {
110
+ // Optimized path for nonoverlapping, ordered inputs
111
+ if self . last < new_intervals. first ( ) . unwrap ( ) . 0 {
112
112
let mut new_intervals = new_intervals. into_iter ( ) ;
113
113
114
114
// Grab the first new interval to check for overlap with the existing data
@@ -1578,4 +1578,52 @@ mod tests {
1578
1578
] ;
1579
1579
assert_eq ! ( serial, expected) ;
1580
1580
}
1581
+
1582
+ #[ pg_test]
1583
+ fn test_rollup_overlap ( ) {
1584
+ Spi :: connect ( |mut client| {
1585
+ client. update ( "SET TIMEZONE to UTC" , None , None ) . unwrap ( ) ;
1586
+
1587
+ client
1588
+ . update (
1589
+ "CREATE TABLE poc(ts TIMESTAMPTZ, batch TIMESTAMPTZ)" ,
1590
+ None ,
1591
+ None ,
1592
+ )
1593
+ . unwrap ( ) ;
1594
+
1595
+ client
1596
+ . update (
1597
+ "INSERT INTO poc VALUES
1598
+ ('1-1-2020 0:50 UTC', '1-1-2020 0:00 UTC'),
1599
+ ('1-1-2020 1:10 UTC', '1-1-2020 0:00 UTC'),
1600
+ ('1-1-2020 1:00 UTC', '1-1-2020 1:00 UTC')" ,
1601
+ None ,
1602
+ None ,
1603
+ )
1604
+ . unwrap ( ) ;
1605
+
1606
+ let output = client
1607
+ . update (
1608
+ "WITH rollups AS (
1609
+ SELECT heartbeat_agg(ts, batch, '2h', '20m')
1610
+ FROM poc
1611
+ GROUP BY batch
1612
+ ORDER BY batch
1613
+ )
1614
+ SELECT live_ranges(rollup(heartbeat_agg))::TEXT
1615
+ FROM rollups" ,
1616
+ None ,
1617
+ None ,
1618
+ )
1619
+ . unwrap ( )
1620
+ . first ( )
1621
+ . get_one :: < String > ( )
1622
+ . unwrap ( ) ;
1623
+
1624
+ let expected = "(\" 2020-01-01 00:50:00+00\" ,\" 2020-01-01 01:30:00+00\" )" ;
1625
+
1626
+ assert_eq ! ( output, Some ( expected. into( ) ) ) ;
1627
+ } ) ;
1628
+ }
1581
1629
}
0 commit comments