Skip to content

Commit 7ad3cdd

Browse files
authored
[7.13] Always handle non-complete AsyncActionStep execution (#73796) (#73853)
This commit ensures that `IndexLifecycleRunner` will always handle the execution of an `AsyncActionStep`, regardless of it's returned status. In the event that it does not complete, the index will move into the ERROR step with an exception, rather than being stuck in the step without a way to execute the AsyncActionStep. This also updates `CreateSnapshotStep` and `RolloverStep` to call `onFailure` with more helpful exceptions rather than potentially returning `false` to the runner. Resolves #73794
1 parent 11064f9 commit 7ad3cdd

File tree

4 files changed

+96
-7
lines changed

4 files changed

+96
-7
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/CreateSnapshotStep.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
import org.apache.logging.log4j.LogManager;
1010
import org.apache.logging.log4j.Logger;
11+
import org.elasticsearch.ElasticsearchException;
1112
import org.elasticsearch.action.ActionListener;
1213
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest;
1314
import org.elasticsearch.client.Client;
@@ -16,6 +17,8 @@
1617
import org.elasticsearch.common.Strings;
1718
import org.elasticsearch.snapshots.SnapshotInfo;
1819

20+
import java.util.Locale;
21+
1922
import static org.elasticsearch.xpack.core.ilm.LifecycleExecutionState.fromIndexMetadata;
2023

2124
/**
@@ -76,8 +79,12 @@ void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentCl
7679
} else {
7780
int failures = snapInfo.failedShards();
7881
int total = snapInfo.totalShards();
79-
logger.warn("failed to create snapshot successfully, {} failures out of {} total shards failed", failures, total);
80-
listener.onResponse(false);
82+
String message = String.format(Locale.ROOT,
83+
"failed to create snapshot successfully, %s failures out of %s total shards failed", failures, total);
84+
logger.warn(message);
85+
ElasticsearchException failure = new ElasticsearchException(message,
86+
snapInfo.shardFailures().stream().findFirst().orElse(null));
87+
listener.onFailure(failure);
8188
}
8289
}, listener::onFailure));
8390
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/RolloverStep.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,11 @@ public void performAction(IndexMetadata indexMetadata, ClusterState currentClust
9797
getClient().admin().indices().rolloverIndex(rolloverRequest,
9898
ActionListener.wrap(response -> {
9999
assert response.isRolledOver() : "the only way this rollover call should fail is with an exception";
100-
listener.onResponse(response.isRolledOver());
100+
if (response.isRolledOver()) {
101+
listener.onResponse(true);
102+
} else {
103+
listener.onFailure(new IllegalStateException("unexepected exception on unconditional rollover"));
104+
}
101105
}, listener::onFailure));
102106
}
103107

x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunner.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,12 @@ public void onResponse(Boolean complete) {
303303
// index since it will be... deleted.
304304
registerDeleteOperation(indexMetadata);
305305
}
306+
} else {
307+
// All steps *should* return true for complete, or invoke listener.onFailure
308+
// with a useful exception. In the case that they don't, we move to error
309+
// step here with a generic exception
310+
moveToErrorStep(indexMetadata.getIndex(), policy, currentStep.getKey(),
311+
new IllegalStateException("unknown exception for step " + currentStep.getKey() + " in policy " + policy));
306312
}
307313
}
308314

x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunnerTests.java

Lines changed: 76 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
111111
public void prepare() {
112112
threadPool = new TestThreadPool("test");
113113
noopClient = new NoOpClient(threadPool);
114-
historyStore = new NoOpHistoryStore();
114+
historyStore = new NoOpHistoryStore(noopClient);
115115
}
116116

117117
@After
@@ -552,6 +552,77 @@ public void testRunStateChangePolicyWithAsyncActionNextStep() throws Exception {
552552
"\"state\":{\"phase\":\"phase\",\"action\":\"action\",\"step\":\"async_action_step\",\"step_time\":\"0\"}}"));
553553
}
554554

555+
public void testRunAsyncActionReturningFalseEntersError() throws Exception {
556+
String policyName = "foo";
557+
StepKey stepKey = new StepKey("phase", "action", "async_action_step");
558+
StepKey nextStepKey = new StepKey("phase", "action", "cluster_state_action_step");
559+
MockAsyncActionStep step = new MockAsyncActionStep(stepKey, nextStepKey);
560+
MockClusterStateActionStep nextStep = new MockClusterStateActionStep(nextStepKey, null);
561+
MockPolicyStepsRegistry stepRegistry = createMultiStepPolicyStepRegistry(policyName, Arrays.asList(step, nextStep));
562+
stepRegistry.setResolver((i, k) -> {
563+
if (stepKey.equals(k)) {
564+
return step;
565+
} else if (nextStepKey.equals(k)) {
566+
return nextStep;
567+
} else {
568+
fail("should not try to retrieve different step");
569+
return null;
570+
}
571+
});
572+
LifecycleExecutionState les = LifecycleExecutionState.builder()
573+
.setPhase("phase")
574+
.setAction("action")
575+
.setStep("async_action_step")
576+
.build();
577+
IndexMetadata indexMetadata = IndexMetadata.builder("test")
578+
.settings(Settings.builder()
579+
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
580+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
581+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
582+
.put(LifecycleSettings.LIFECYCLE_NAME, policyName))
583+
.putCustom(LifecycleExecutionState.ILM_CUSTOM_METADATA_KEY, les.asMap())
584+
.build();
585+
ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool);
586+
DiscoveryNode node = clusterService.localNode();
587+
IndexLifecycleMetadata ilm = new IndexLifecycleMetadata(Collections.emptyMap(), OperationMode.RUNNING);
588+
ClusterState state = ClusterState.builder(new ClusterName("cluster"))
589+
.metadata(Metadata.builder()
590+
.put(indexMetadata, true)
591+
.putCustom(IndexLifecycleMetadata.TYPE, ilm))
592+
.nodes(DiscoveryNodes.builder()
593+
.add(node)
594+
.masterNodeId(node.getId())
595+
.localNodeId(node.getId()))
596+
.build();
597+
ClusterServiceUtils.setState(clusterService, state);
598+
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, clusterService, threadPool, () -> 0L);
599+
600+
CountDownLatch latch = new CountDownLatch(1);
601+
step.setLatch(latch);
602+
step.setWillComplete(false);
603+
ClusterState before = clusterService.state();
604+
runner.maybeRunAsyncAction(before, indexMetadata, policyName, stepKey);
605+
606+
// Wait for the action step to finish
607+
awaitLatch(latch, 5, TimeUnit.SECONDS);
608+
609+
assertThat(step.getExecuteCount(), equalTo(1L));
610+
611+
try {
612+
assertBusy(() -> {
613+
ILMHistoryItem historyItem = historyStore.getItems().stream()
614+
.findFirst()
615+
.orElseThrow(() -> new AssertionError("failed to register ILM history"));
616+
assertThat(historyItem.toString(),
617+
containsString("\"{\\\"type\\\":\\\"illegal_state_exception\\\",\\\"reason\\\":" +
618+
"\\\"unknown exception for step {\\\\\\\"phase\\\\\\\":\\\\\\\"phase\\\\\\\",\\\\\\\"action" +
619+
"\\\\\\\":\\\\\\\"action\\\\\\\",\\\\\\\"name\\\\\\\":\\\\\\\"async_action_step\\\\\\\"} in policy foo\\\""));
620+
});
621+
} finally {
622+
clusterService.close();
623+
}
624+
}
625+
555626
public void testRunPeriodicStep() throws Exception {
556627
String policyName = "foo";
557628
StepKey stepKey = new StepKey("phase", "action", "cluster_state_action_step");
@@ -880,7 +951,7 @@ public static void assertClusterStateOnNextStep(ClusterState oldClusterState, In
880951
static class MockAsyncActionStep extends AsyncActionStep {
881952

882953
private Exception exception;
883-
private boolean willComplete;
954+
private boolean willComplete = true;
884955
private boolean indexSurvives = true;
885956
private long executeCount = 0;
886957
private CountDownLatch latch;
@@ -1178,11 +1249,12 @@ public static MockPolicyStepsRegistry createMultiStepPolicyStepRegistry(String p
11781249
return new MockPolicyStepsRegistry(lifecyclePolicyMap, firstStepMap, stepMap, REGISTRY, client);
11791250
}
11801251

1181-
private class NoOpHistoryStore extends ILMHistoryStore {
1252+
private static class NoOpHistoryStore extends ILMHistoryStore {
1253+
private static final Logger logger = LogManager.getLogger(NoOpHistoryStore.class);
11821254

11831255
private final List<ILMHistoryItem> items = new ArrayList<>();
11841256

1185-
NoOpHistoryStore() {
1257+
NoOpHistoryStore(Client noopClient) {
11861258
super(Settings.EMPTY, noopClient, null, null);
11871259
}
11881260

0 commit comments

Comments
 (0)