Skip to content

Commit 4ab4877

Browse files
committed
Fix issues with acknowledgment timeouts and data loss with checkpointing acknowledgments
Signed-off-by: Taylor Gray <[email protected]>
1 parent 415c4c3 commit 4ab4877

File tree

5 files changed

+25
-7
lines changed

5 files changed

+25
-7
lines changed

data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/partition/StreamPartition.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition;
1010
import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.state.StreamProgressState;
1111

12+
import java.util.Objects;
1213
import java.util.Optional;
1314

1415
public class StreamPartition extends EnhancedSourcePartition<StreamProgressState> {
@@ -61,4 +62,16 @@ public String getStreamArn() {
6162
public String getShardId() {
6263
return shardId;
6364
}
65+
66+
@Override
67+
public boolean equals(Object o) {
68+
if (o == null || getClass() != o.getClass()) return false;
69+
StreamPartition that = (StreamPartition) o;
70+
return Objects.equals(shardId, that.shardId);
71+
}
72+
73+
@Override
74+
public int hashCode() {
75+
return Objects.hashCode(shardId);
76+
}
6477
}

data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManager.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,12 @@ void monitorAcknowledgments(final Consumer<StreamPartition> stopWorkerConsumer)
8585
if (exit) {
8686
break;
8787
}
88+
89+
try {
90+
Thread.sleep(2000);
91+
} catch (InterruptedException e) {
92+
throw new RuntimeException(e);
93+
}
8894
}
8995

9096
LOG.info("Exiting acknowledgment manager");
@@ -111,6 +117,9 @@ boolean runMonitorAcknowledgmentLoop(final Consumer<StreamPartition> stopWorkerC
111117

112118
if (checkpointStatuses.peek().isPositiveAcknowledgement()) {
113119
latestCheckpointForShard = checkpointStatuses.poll();
120+
if (latestCheckpointForShard != null) {
121+
ackStatuses.get(streamPartition).remove(latestCheckpointForShard.getSequenceNumber());
122+
}
114123
} else if (checkpointStatuses.peek().isNegativeAcknowledgement()
115124
|| checkpointStatuses.peek().isExpired(dynamoDBSourceConfig.getShardAcknowledgmentTimeout())) {
116125
handleFailure(streamPartition, streamProgressState, latestCheckpointForShard);

data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,7 @@ public void run() {
257257
if (shardIterator == null) {
258258
// End of Shard
259259
if (shardAcknowledgementManager != null && !createdFinalAcknowledgmentSetForShard) {
260-
final AcknowledgementSet finalAcknowledgmentSet = shardAcknowledgementManager.createAcknowledgmentSet(streamPartition, sequenceNumber, true);
260+
final AcknowledgementSet finalAcknowledgmentSet = shardAcknowledgementManager.createAcknowledgmentSet(streamPartition, "END_OF_SHARD", true);
261261
finalAcknowledgmentSet.complete();
262262
}
263263
LOG.debug("Reached end of shard");

data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactory.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,11 +142,10 @@ public String getShardIterator(String streamArn, String shardId, String sequence
142142

143143
if (sequenceNumber != null && !sequenceNumber.isEmpty()) {
144144
LOG.debug("Get Shard Iterator at {}", sequenceNumber);
145-
// There may be an overlap for 1 record
146145
getShardIteratorRequest = GetShardIteratorRequest.builder()
147146
.shardId(shardId)
148147
.streamArn(streamArn)
149-
.shardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER)
148+
.shardIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER)
150149
.sequenceNumber(sequenceNumber)
151150
.build();
152151
} else {

data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactoryTest.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import org.mockito.Mock;
1414
import org.mockito.junit.jupiter.MockitoExtension;
1515
import org.opensearch.dataprepper.metrics.PluginMetrics;
16-
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
1716
import org.opensearch.dataprepper.model.buffer.Buffer;
1817
import org.opensearch.dataprepper.model.event.Event;
1918
import org.opensearch.dataprepper.model.record.Record;
@@ -131,8 +130,6 @@ public void test_create_shardConsumer_correctly() {
131130

132131
@Test
133132
public void test_create_shardConsumer_correctly_with_is_disable_checkpointing_enabled_starts_from_trim_horizon() {
134-
135-
final AcknowledgementSet acknowledgementSet = mock(AcknowledgementSet.class);
136133
when(streamConfig.isDisableCheckpointing()).thenReturn(true);
137134
StreamProgressState state = new StreamProgressState();
138135
state.setWaitForExport(false);
@@ -141,7 +138,7 @@ public void test_create_shardConsumer_correctly_with_is_disable_checkpointing_en
141138
streamPartition = new StreamPartition(streamArn, shardId, Optional.of(state));
142139

143140
ShardConsumerFactory consumerFactory = new ShardConsumerFactory(coordinator, dynamoDbStreamsClient, pluginMetrics, dynamoDBSourceAggregateMetrics, buffer, streamConfig);
144-
Runnable consumer = consumerFactory.createConsumer(streamPartition, acknowledgementSet, null);
141+
Runnable consumer = consumerFactory.createConsumer(streamPartition, Duration.ofMinutes(1), mock(ShardAcknowledgementManager.class));
145142
assertThat(consumer, notNullValue());
146143

147144
final ArgumentCaptor<GetShardIteratorRequest> captor = ArgumentCaptor.forClass(GetShardIteratorRequest.class);

0 commit comments

Comments
 (0)