Skip to content

Commit 08dceb4

Browse files
authored
Refine cold-start, window delay, and task updates (#1496)
- Skip checkpoint writes during historical/run once - Add retryOnConflict to task updates to dodge version clashes - Switch window-delay calculation from 20 % padding (gap × 1.2) to a bucket-based approach. Motivation: the multiplicative cushion scaled with absolute lag, so a multi-hour ingest gap could inflate the delay into days, causing cold start failures. Tying the delay to config intervals (plus one safety bucket) keeps it proportional and restores prompt results. Testing done: * added IT: https://tinyurl.com/5n98z3ue Signed-off-by: Kaituo Li <[email protected]>
1 parent f99a347 commit 08dceb4

File tree

6 files changed

+52
-9
lines changed

6 files changed

+52
-9
lines changed

release-notes/opensearch-anomaly-detection.release-notes-3.1.0.0.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,5 @@ Compatible with OpenSearch 3.1.0
99
### Bug Fixes
1010
- Fix incorrect task state handling in ForecastRunOnceTransportAction ([#1489](https://github.com/opensearch-project/anomaly-detection/pull/1489))
1111
- Fix incorrect task state handling in ForecastRunOnceTransportAction ([#1493](https://github.com/opensearch-project/anomaly-detection/pull/1493))
12-
12+
- Refine cold-start, window delay, and task updates ([#1496](https://github.com/opensearch-project/anomaly-detection/pull/1496))
1313

src/main/java/org/opensearch/ad/ml/ADColdStart.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -304,8 +304,10 @@ protected List<AnomalyResult> trainModelFromDataSegments(
304304

305305
entityState.setLastUsedTime(clock.instant());
306306

307-
// save to checkpoint
308-
checkpointWriteWorker.write(entityState, true, RequestPriority.MEDIUM);
307+
// save to checkpoint for real time only
308+
if (null == taskId) {
309+
checkpointWriteWorker.write(entityState, true, RequestPriority.MEDIUM);
310+
}
309311

310312
return results;
311313
}

src/main/java/org/opensearch/forecast/ml/ForecastCheckpointDao.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -391,7 +391,8 @@ protected ModelState<RCFCaster> fromSingleStreamModelCheckpoint(Map<String, Obje
391391
private RCFCaster loadRCFCaster(Map<String, Object> checkpoint, String modelId) {
392392
String model = (String) checkpoint.get(CommonName.FIELD_MODEL);
393393
if (model == null || model.length() > maxCheckpointBytes) {
394-
logger.warn(new ParameterizedMessage("[{}]'s model too large: [{}] bytes", modelId, model.length()));
394+
logger
395+
.warn(new ParameterizedMessage("[{}]'s model empty or too large: [{}] bytes", modelId, model == null ? 0 : model.length()));
395396
return null;
396397
}
397398
return toRCFCaster(model);

src/main/java/org/opensearch/timeseries/ml/ModelColdStart.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,7 @@ private void coldStart(
233233
}
234234

235235
if (lastThrottledColdStartTime.plus(Duration.ofMinutes(coolDownMinutes)).isAfter(clock.instant())) {
236+
logger.info("Still in cool down.");
236237
listener.onResponse(null);
237238
return;
238239
}
@@ -290,8 +291,11 @@ private void coldStart(
290291
logger.info("Not enough data to train model: {}, currently we have {}", modelId, dataSize);
291292

292293
trainingData.forEach(modelState::addSample);
293-
// save to checkpoint
294-
checkpointWriteWorker.write(modelState, true, RequestPriority.MEDIUM);
294+
// save to checkpoint for real time only
295+
if (null == coldStartRequest.getTaskId()) {
296+
checkpointWriteWorker.write(modelState, true, RequestPriority.MEDIUM);
297+
}
298+
295299
listener.onResponse(null);
296300
}
297301
} else {
@@ -376,7 +380,6 @@ private void getColdStartData(String configId, FeatureRequest coldStartRequest,
376380
// [current start, current end] for training. So we fetch training data ending at current start
377381
long endTimeMs = coldStartRequest.getDataStartTimeMillis();
378382
int numberOfSamples = selectNumberOfSamples(config);
379-
380383
// we start with round 0
381384
getFeatures(
382385
listener,

src/main/java/org/opensearch/timeseries/task/TaskManager.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -747,6 +747,8 @@ public void updateTask(String taskId, Map<String, Object> updatedFields, ActionL
747747
updatedContent.put(TimeSeriesTask.LAST_UPDATE_TIME_FIELD, Instant.now().toEpochMilli());
748748
updateRequest.doc(updatedContent);
749749
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
750+
// OpenSearch will transparently re‑read the doc and retry up to 2 times.
751+
updateRequest.retryOnConflict(2);
750752
client.update(updateRequest, listener);
751753
}
752754

src/main/java/org/opensearch/timeseries/transport/BaseSuggestConfigParamTransportAction.java

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
import org.opensearch.timeseries.rest.handler.HistorySuggest;
4242
import org.opensearch.timeseries.rest.handler.IntervalCalculation;
4343
import org.opensearch.timeseries.rest.handler.LatestTimeRetriever;
44-
import org.opensearch.timeseries.settings.TimeSeriesSettings;
4544
import org.opensearch.timeseries.util.ParseUtils;
4645
import org.opensearch.timeseries.util.SecurityClientUtil;
4746
import org.opensearch.transport.TransportService;
@@ -249,8 +248,44 @@ protected void suggestWindowDelay(Config config, User user, TimeValue timeout, A
249248
// we may get future date (e.g., in testing)
250249
long currentMillis = clock.millis();
251250

251+
// ---------------------------------------------------------------------------
252+
// Adaptive window-delay calculation
253+
// ---------------------------------------------------------------------------
254+
// Goal: pick a delay long enough that all data for the current query
255+
// window has been ingested, so the config never sees “future gaps”.
256+
//
257+
// Algorithm
258+
// 1. Compute the raw lag (`gapMs`) between now and the newest document.
259+
// 2. Convert that lag into whole config-interval “buckets” (ceil).
260+
// We use the integer-math identity
261+
// ceil(a / b) = (a + b − 1) / b for positive a, b
262+
// so we never under-estimate the number of missing buckets.
263+
// 3. Add one extra “safety” bucket to cover clock skew / network jitter.
264+
// 4. Transform the final bucket count back to milliseconds.
265+
//
266+
// ---------------------------------------------------------------------------
267+
252268
if (currentMillis > latestTime.get()) {
253-
windowDelayMillis = (long) Math.ceil((currentMillis - latestTime.get()) * TimeSeriesSettings.WINDOW_DELAY_RATIO);
269+
270+
// Milliseconds we are behind real time
271+
long gapMs = currentMillis - latestTime.get();
272+
273+
// Length of one bucket (config interval)
274+
long bucketMs = config.getIntervalInMilliseconds();
275+
276+
/* Missing buckets (ceiling division)
277+
Example: gap = 15 000 ms, bucket = 10 000 ms
278+
bucketsBehind = (15 000 + 10 000 − 1) / 10 000
279+
= 24 999 / 10 000
280+
= 2 ← correct (15 000 ms spans 2 full buckets)
281+
*/
282+
long bucketsBehind = (gapMs + bucketMs - 1) / bucketMs;
283+
284+
// Always keep one extra bucket as a cushion
285+
long safetyBuckets = 1;
286+
287+
// Convert back to milliseconds
288+
windowDelayMillis = (bucketsBehind + safetyBuckets) * bucketMs;
254289
}
255290

256291
// in case windowDelayMillis is small, we want at least 1 minute

0 commit comments

Comments
 (0)