Skip to content

Commit 03b261a

Browse files
authored
fix: advance past current interval & anchor on now (#1528)
Problem -------- * `nextNiceInterval()` used a “≥” check, so when the next “nice” value equalled `currentMin` it returned the **same** interval. The interval‑explorer treats an unchanged return as a terminal condition, so exploration stopped and `suggestForecast` failed on sample‑log data. * Interval calculation anchored on the first **future** timestamp if one existed, whereas run‑once / real‑time forecasting anchors on the current time—causing the two paths to disagree on data sufficiency. Fix --- * Change comparison in `nextNiceInterval()` from `>=` to `>` so it always returns the next larger interval, letting the explorer continue. * Anchor interval calculation on the current time (`now`) instead of any future date, making all forecast modes consistent. Tests ----- * Added IT Signed-off-by: Kaituo Li <[email protected]>
1 parent e30a14e commit 03b261a

File tree

15 files changed

+635
-37
lines changed

15 files changed

+635
-37
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)
1111

1212
### Bug Fixes
1313
- Fixing concurrency bug on writer ([#1508](https://github.com/opensearch-project/anomaly-detection/pull/1508))
14+
- fix(forecast): advance past current interval & anchor on now ([#1528](https://github.com/opensearch-project/anomaly-detection/pull/1528))
1415

1516
### Infrastructure
1617
### Documentation

build.gradle

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1003,7 +1003,6 @@ List<String> jacocoExclusions = [
10031003

10041004
// TODO: add test coverage (kaituo)
10051005
'org.opensearch.forecast.*',
1006-
'org.opensearch.timeseries.transport.ValidateConfigRequest',
10071006
'org.opensearch.timeseries.transport.ResultProcessor.PageListener.1',
10081007
'org.opensearch.ad.transport.ADHCImputeRequest',
10091008
'org.opensearch.timeseries.transport.BaseDeleteConfigTransportAction.1',

src/main/java/org/opensearch/timeseries/AnalysisType.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@
77

88
public enum AnalysisType {
99
AD,
10-
FORECAST;
10+
FORECAST,
11+
// for test
12+
UNKNOWN;
1113

1214
public boolean isForecast() {
1315
return this == FORECAST;

src/main/java/org/opensearch/timeseries/rest/handler/AggregationPrep.java

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,41 @@ public AggregationPrep(SearchFeatureDao searchFeatureDao, TimeValue requestTimeo
5151
this.config = config;
5252
}
5353

54+
/**
55+
* Returns the time‑range bounds using this detector’s **default history length**
56+
* (i.e., the value provided by {@link #getNumberOfSamples()}).
57+
*
58+
* <p>The method delegates to
59+
* {@link #getTimeRangeBounds(IntervalTimeConfiguration, long, int)} and is a
60+
* convenience overload for callers that do not need to specify a custom
61+
* sample count.</p>
62+
*
63+
* @param interval sampling interval configuration (e.g., “5m”, “1h”)
64+
* @param endMillis exclusive upper bound of the time range, expressed in epoch ms
65+
* @return {@code LongBounds} where {@code getMin()} is the computed
66+
* start time and {@code getMax()} equals {@code endMillis}
67+
*/
5468
public LongBounds getTimeRangeBounds(IntervalTimeConfiguration interval, long endMillis) {
69+
return getTimeRangeBounds(interval, endMillis, getNumberOfSamples());
70+
}
71+
72+
/**
73+
* Returns the time‑range bounds using an **explicitly supplied history length**.
74+
*
75+
* <p>The start time is computed as {@code endMillis − (numberOfSamples × interval)}.
76+
* Use this overload when the caller wants full control over how many historical
77+
* samples are considered in the query window.</p>
78+
*
79+
* @param interval sampling interval configuration (e.g., “5m”, “1h”)
80+
* @param endMillis exclusive upper bound of the time range, expressed in epoch ms
81+
* @param numberOfSamples number of historical samples to include; must be &gt; 0
82+
* @return {@code LongBounds} with {@code getMin()} equal to the
83+
* calculated start time and {@code getMax()} equal to {@code endMillis}
84+
* @throws IllegalArgumentException if {@code numberOfSamples} is non‑positive
85+
*/
86+
public LongBounds getTimeRangeBounds(IntervalTimeConfiguration interval, long endMillis, int numberOfSamples) {
5587
long intervalInMillis = IntervalTimeConfiguration.getIntervalInMinute(interval) * 60000;
56-
Long startMillis = endMillis - (getNumberOfSamples() * intervalInMillis);
88+
Long startMillis = endMillis - (numberOfSamples * intervalInMillis);
5789
return new LongBounds(startMillis, endMillis);
5890
}
5991

src/main/java/org/opensearch/timeseries/rest/handler/IntervalCalculation.java

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ public class IntervalCalculation {
6464
private final Map<String, Object> topEntity;
6565
private final long endMillis;
6666
private final Config config;
67+
// how many intervals to look back when exploring intervals
68+
private final int lookBackWindows;
6769

6870
public IntervalCalculation(
6971
Config config,
@@ -75,7 +77,8 @@ public IntervalCalculation(
7577
Clock clock,
7678
SearchFeatureDao searchFeatureDao,
7779
long latestTime,
78-
Map<String, Object> topEntity
80+
Map<String, Object> topEntity,
81+
boolean validate
7982
) {
8083
this.aggregationPrep = new AggregationPrep(searchFeatureDao, requestTimeout, config);
8184
this.client = client;
@@ -86,12 +89,19 @@ public IntervalCalculation(
8689
this.topEntity = topEntity;
8790
this.endMillis = latestTime;
8891
this.config = config;
92+
if (validate) {
93+
lookBackWindows = config.getHistoryIntervals();
94+
} else {
95+
// look back more when history is not finalized yet yet
96+
lookBackWindows = config.getDefaultHistory() * 2;
97+
}
8998
}
9099

91100
public void findInterval(ActionListener<IntervalTimeConfiguration> listener) {
92101
ActionListener<IntervalTimeConfiguration> minimumIntervalListener = ActionListener.wrap(minInterval -> {
102+
logger.debug("minimum interval found: {}", minInterval);
93103
if (minInterval == null) {
94-
logger.info("Fail to find minimum interval");
104+
logger.debug("Fail to find minimum interval");
95105
listener.onResponse(null);
96106
} else {
97107
// starting exploring whether minimum or larger interval satisfy density requirement
@@ -105,7 +115,7 @@ private void getBucketAggregates(IntervalTimeConfiguration minimumInterval, Acti
105115
throws IOException {
106116

107117
try {
108-
LongBounds timeStampBounds = aggregationPrep.getTimeRangeBounds(minimumInterval, endMillis);
118+
LongBounds timeStampBounds = aggregationPrep.getTimeRangeBounds(minimumInterval, endMillis, lookBackWindows);
109119
SearchRequest searchRequest = aggregationPrep.createSearchRequest(minimumInterval, timeStampBounds, topEntity, 0);
110120
ActionListener<IntervalTimeConfiguration> intervalListener = ActionListener
111121
.wrap(interval -> listener.onResponse(interval), exception -> {
@@ -120,6 +130,7 @@ private void getBucketAggregates(IntervalTimeConfiguration minimumInterval, Acti
120130
);
121131
// using the original context in listener as user roles have no permissions for internal operations like fetching a
122132
// checkpoint
133+
logger.debug("Interval explore search request: {}", searchRequest);
123134
clientUtil
124135
.<SearchRequest, SearchResponse>asyncRequestWithInjectedSecurity(
125136
searchRequest,
@@ -160,20 +171,25 @@ public IntervalRecommendationListener(
160171

161172
@Override
162173
public void onResponse(SearchResponse resp) {
174+
logger.debug("interval explorer response: {}", resp);
163175
try {
164176
long shingles = aggregationPrep.getShingleCount(resp);
177+
logger.debug("number of shingles: {}", shingles);
165178

166179
if (shingles >= TimeSeriesSettings.NUM_MIN_SAMPLES) { // dense enough
167180
intervalListener.onResponse(currentIntervalToTry);
168181
return;
169182
}
170183

171184
if (++attempts > 10) { // retry budget exhausted
185+
logger.debug("number of attempts: {}", attempts);
172186
intervalListener.onResponse(null);
173187
return;
174188
}
175189

176-
if (clock.millis() > expirationEpochMs) { // timeout
190+
long nowMillis = clock.millis();
191+
if (nowMillis > expirationEpochMs) { // timeout
192+
logger.debug("Timed out: now={}, expires={}", nowMillis, expirationEpochMs);
177193
intervalListener
178194
.onFailure(
179195
new ValidationException(
@@ -186,7 +202,8 @@ public void onResponse(SearchResponse resp) {
186202
}
187203

188204
int nextMin = nextNiceInterval((int) currentIntervalToTry.getInterval());
189-
if (nextMin == currentIntervalToTry.getInterval()) { // cannot grow further
205+
if (nextMin <= currentIntervalToTry.getInterval()) { // cannot grow further
206+
logger.debug("Cannot grow interval further: next={}, current={}", nextMin, currentIntervalToTry.getInterval());
190207
intervalListener.onResponse(null);
191208
return;
192209
}
@@ -199,12 +216,14 @@ public void onResponse(SearchResponse resp) {
199216

200217
private void searchWithDifferentInterval(int newIntervalMinuteValue) {
201218
this.currentIntervalToTry = new IntervalTimeConfiguration(newIntervalMinuteValue, ChronoUnit.MINUTES);
202-
this.currentTimeStampBounds = aggregationPrep.getTimeRangeBounds(currentIntervalToTry, endMillis);
219+
this.currentTimeStampBounds = aggregationPrep.getTimeRangeBounds(currentIntervalToTry, endMillis, lookBackWindows);
203220
// using the original context in listener as user roles have no permissions for internal operations like fetching a
204221
// checkpoint
222+
SearchRequest searchRequest = aggregationPrep.createSearchRequest(currentIntervalToTry, currentTimeStampBounds, topEntity, 0);
223+
logger.debug("next search request: {}", searchRequest);
205224
clientUtil
206225
.<SearchRequest, SearchResponse>asyncRequestWithInjectedSecurity(
207-
aggregationPrep.createSearchRequest(currentIntervalToTry, currentTimeStampBounds, topEntity, 0),
226+
searchRequest,
208227
client::search,
209228
user,
210229
client,
@@ -339,7 +358,7 @@ public void findMedianIntervalAdaptive(ActionListener<IntervalTimeConfiguration>
339358
long totalDocs = r.getHits().getTotalHits() == null ? 0L : r.getHits().getTotalHits().value();
340359

341360
if (totalDocs < 2) {
342-
logger.info("Exit early due to few docs");
361+
logger.debug("Exit early due to few docs");
343362
listener.onResponse(null);
344363
return;
345364
}
@@ -501,8 +520,9 @@ public void refineGap(
501520
src.aggregation(hist);
502521

503522
SearchRequest searchRequest = new SearchRequest(config.getIndices().toArray(new String[0])).source(src);
523+
logger.debug("Minimum interval search request: {}", searchRequest);
504524
client.search(searchRequest, ActionListener.wrap(resp -> {
505-
525+
logger.debug("Minimum interval search response: {}", resp);
506526
double gap = Double.NaN;
507527
boolean hasEmptyBuckets = false;
508528
Histogram histogram = resp.getAggregations().get("dyn");
@@ -753,7 +773,7 @@ private static long nextPowerOfTwo(long n) {
753773
/* ------------------------------------------------------------------ */
754774
private static int nextNiceInterval(int currentMin) {
755775
for (int step : INTERVAL_LADDER) {
756-
if (step >= currentMin) {
776+
if (step > currentMin) {
757777
return step;
758778
}
759779
}

src/main/java/org/opensearch/timeseries/rest/handler/LatestTimeRetriever.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ public class LatestTimeRetriever {
5959
private final User user;
6060
private final AnalysisType context;
6161
private final SearchFeatureDao searchFeatureDao;
62+
// whether we should convert future date to now if future data exists
63+
private final boolean convertFutureDatetoNow;
6264

6365
public LatestTimeRetriever(
6466
Config config,
@@ -67,7 +69,8 @@ public LatestTimeRetriever(
6769
Client client,
6870
User user,
6971
AnalysisType context,
70-
SearchFeatureDao searchFeatureDao
72+
SearchFeatureDao searchFeatureDao,
73+
boolean convertFutureDatetoNow
7174
) {
7275
this.config = config;
7376
this.aggregationPrep = new AggregationPrep(searchFeatureDao, requestTimeout, config);
@@ -76,6 +79,7 @@ public LatestTimeRetriever(
7679
this.user = user;
7780
this.context = context;
7881
this.searchFeatureDao = searchFeatureDao;
82+
this.convertFutureDatetoNow = convertFutureDatetoNow;
7983
}
8084

8185
/**
@@ -93,6 +97,10 @@ public void checkIfHC(ActionListener<Pair<Optional<Long>, Map<String, Object>>>
9397
long timeRangeEnd = latestTime.get();
9498
if (currentEpochMillis < timeRangeEnd) {
9599
logger.info(new ParameterizedMessage("Future date is detected: [{}]", latestTime.get()));
100+
if (convertFutureDatetoNow) {
101+
logger.info("Convert future date to now");
102+
timeRangeEnd = currentEpochMillis;
103+
}
96104
}
97105

98106
if (config.isHighCardinality()) {

src/main/java/org/opensearch/timeseries/rest/handler/ModelValidationActionHandler.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,17 @@ public ModelValidationActionHandler(
130130
this.context = context;
131131
// calculate the bounds in a lazy manner
132132
this.timeRangeToSearchForConfiguredInterval = null;
133-
this.latestTimeRetriever = new LatestTimeRetriever(config, requestTimeout, clientUtil, client, user, context, searchFeatureDao);
133+
// validate window delay depends on detection of future date (which will set window delay to 0)
134+
this.latestTimeRetriever = new LatestTimeRetriever(
135+
config,
136+
requestTimeout,
137+
clientUtil,
138+
client,
139+
user,
140+
context,
141+
searchFeatureDao,
142+
false
143+
);
134144
this.intervalIssueType = intervalIssueType;
135145
this.aggregationPrep = new AggregationPrep(searchFeatureDao, requestTimeout, config);
136146
}
@@ -180,7 +190,8 @@ private void getSampleRangesForValidationChecks(
180190
clock,
181191
searchFeatureDao,
182192
latestTime.get(),
183-
topEntity
193+
topEntity,
194+
true
184195
)
185196
.findInterval(
186197
ActionListener.wrap(interval -> processIntervalRecommendation(interval, latestTime.get(), topEntity), listener::onFailure)

src/main/java/org/opensearch/timeseries/transport/BaseSuggestConfigParamTransportAction.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,9 @@ protected void suggestInterval(
126126
client,
127127
user,
128128
context,
129-
searchFeatureDao
129+
searchFeatureDao,
130+
// simulate run once and real time scenario where we operate relative to now
131+
true
130132
);
131133

132134
ActionListener<Pair<Optional<Long>, Map<String, Object>>> latestTimeListener = ActionListener.wrap(latestEntityAttributes -> {
@@ -142,7 +144,8 @@ protected void suggestInterval(
142144
clock,
143145
searchFeatureDao,
144146
latestTime.get(),
145-
latestEntityAttributes.getRight()
147+
latestEntityAttributes.getRight(),
148+
false
146149
);
147150
intervalCalculation
148151
.findInterval(
@@ -237,7 +240,9 @@ protected void suggestWindowDelay(Config config, User user, TimeValue timeout, A
237240
client,
238241
user,
239242
context,
240-
searchFeatureDao
243+
searchFeatureDao,
244+
// if future date is found, just set window delay to 0
245+
false
241246
);
242247
ActionListener<Pair<Optional<Long>, Map<String, Object>>> latestTimeListener = ActionListener.wrap(latestEntityAttributes -> {
243248
Optional<Long> latestTime = latestEntityAttributes.getLeft();

src/test/java/org/opensearch/ad/e2e/SingleStreamModelPerfIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,6 @@ private void bulkIndexTestData(List<JsonObject> data, String datasetName, int tr
219219
ImmutableList.of(new BasicHeader(HttpHeaders.USER_AGENT, "Kibana"))
220220
);
221221
Thread.sleep(1_000);
222-
waitAllSyncheticDataIngested(data.size(), datasetName, client);
222+
waitAllSyncheticDataIngestedOrdered(data.size(), datasetName, client);
223223
}
224224
}

0 commit comments

Comments
 (0)