@@ -241,6 +241,12 @@ enum SharePartitionState {
241
241
*/
242
242
private final AcquisitionLockTimeoutHandler timeoutHandler ;
243
243
244
+ /**
245
+ * The replica manager is used to check to see if any delayed share fetch request can be completed because of data
246
+ * availability due to acquisition lock timeout.
247
+ */
248
+ private final ReplicaManager replicaManager ;
249
+
244
250
/**
245
251
* The share partition start offset specifies the partition start offset from which the records
246
252
* are cached in the cachedState of the sharePartition.
@@ -295,12 +301,6 @@ enum SharePartitionState {
295
301
*/
296
302
private long fetchLockIdleDurationMs ;
297
303
298
- /**
299
- * The replica manager is used to check to see if any delayed share fetch request can be completed because of data
300
- * availability due to acquisition lock timeout.
301
- */
302
- private final ReplicaManager replicaManager ;
303
-
304
304
SharePartition (
305
305
String groupId ,
306
306
TopicIdPartition topicIdPartition ,
@@ -1245,10 +1245,7 @@ private boolean archivePerOffsetBatchRecords(InFlightBatch inFlightBatch,
1245
1245
continue ;
1246
1246
}
1247
1247
1248
- offsetState .getValue ().archive (EMPTY_MEMBER_ID );
1249
- if (initialState == RecordState .ACQUIRED ) {
1250
- offsetState .getValue ().cancelAndClearAcquisitionLockTimeoutTask ();
1251
- }
1248
+ offsetState .getValue ().archive ();
1252
1249
isAnyOffsetArchived = true ;
1253
1250
}
1254
1251
return isAnyOffsetArchived ;
@@ -1263,10 +1260,7 @@ private boolean archiveCompleteBatch(InFlightBatch inFlightBatch, RecordState in
1263
1260
log .trace ("Archiving complete batch: {} for the share partition: {}-{}" , inFlightBatch , groupId , topicIdPartition );
1264
1261
if (inFlightBatch .batchState () == initialState ) {
1265
1262
// Change the state of complete batch since the same state exists for the entire inFlight batch.
1266
- inFlightBatch .archiveBatch (EMPTY_MEMBER_ID );
1267
- if (initialState == RecordState .ACQUIRED ) {
1268
- inFlightBatch .cancelAndClearAcquisitionLockTimeoutTask ();
1269
- }
1263
+ inFlightBatch .archiveBatch ();
1270
1264
return true ;
1271
1265
}
1272
1266
} finally {
@@ -1799,6 +1793,12 @@ private Optional<Throwable> acknowledgeBatchRecords(
1799
1793
if (throwable .isPresent ()) {
1800
1794
return throwable ;
1801
1795
}
1796
+
1797
+ if (inFlightBatch .batchHasOngoingStateTransition ()) {
1798
+ log .debug ("The batch has on-going transition, batch: {} for the share "
1799
+ + "partition: {}-{}" , inFlightBatch , groupId , topicIdPartition );
1800
+ return Optional .of (new InvalidRecordStateException ("The record state is invalid. The acknowledgement of delivery could not be completed." ));
1801
+ }
1802
1802
}
1803
1803
1804
1804
// Determine if the in-flight batch is a full match from the request batch.
@@ -1899,7 +1899,15 @@ private Optional<Throwable> acknowledgePerOffsetBatchRecords(
1899
1899
+ " partition: {}-{}" , offsetState .getKey (), inFlightBatch , groupId ,
1900
1900
topicIdPartition );
1901
1901
return Optional .of (new InvalidRecordStateException (
1902
- "The batch cannot be acknowledged. The offset is not acquired." ));
1902
+ "The offset cannot be acknowledged. The offset is not acquired." ));
1903
+ }
1904
+
1905
+ if (offsetState .getValue ().hasOngoingStateTransition ()) {
1906
+ log .debug ("The offset has on-going transition, offset: {} batch: {} for the share"
1907
+ + " partition: {}-{}" , offsetState .getKey (), inFlightBatch , groupId ,
1908
+ topicIdPartition );
1909
+ return Optional .of (new InvalidRecordStateException (
1910
+ "The record state is invalid. The acknowledgement of delivery could not be completed." ));
1903
1911
}
1904
1912
1905
1913
// Check if member id is the owner of the offset.
@@ -2044,7 +2052,12 @@ void rollbackOrProcessStateUpdates(
2044
2052
// Log in DEBUG to avoid flooding of logs for a faulty client.
2045
2053
log .debug ("Request failed for updating state, rollback any changed state"
2046
2054
+ " for the share partition: {}-{}" , groupId , topicIdPartition );
2047
- updatedStates .forEach (state -> state .completeStateTransition (false ));
2055
+ updatedStates .forEach (state -> {
2056
+ state .completeStateTransition (false );
2057
+ if (state .state () == RecordState .AVAILABLE ) {
2058
+ updateFindNextFetchOffset (true );
2059
+ }
2060
+ });
2048
2061
future .completeExceptionally (throwable );
2049
2062
return ;
2050
2063
}
@@ -2067,7 +2080,14 @@ void rollbackOrProcessStateUpdates(
2067
2080
if (exception != null ) {
2068
2081
log .debug ("Failed to write state to persister for the share partition: {}-{}" ,
2069
2082
groupId , topicIdPartition , exception );
2070
- updatedStates .forEach (state -> state .completeStateTransition (false ));
2083
+ // In case of failure when transition state is rolled back then it should be rolled
2084
+ // back to ACQUIRED state, unless acquisition lock for the state has expired.
2085
+ updatedStates .forEach (state -> {
2086
+ state .completeStateTransition (false );
2087
+ if (state .state () == RecordState .AVAILABLE ) {
2088
+ updateFindNextFetchOffset (true );
2089
+ }
2090
+ });
2071
2091
future .completeExceptionally (exception );
2072
2092
return ;
2073
2093
}
@@ -2076,8 +2096,6 @@ void rollbackOrProcessStateUpdates(
2076
2096
groupId , topicIdPartition );
2077
2097
updatedStates .forEach (state -> {
2078
2098
state .completeStateTransition (true );
2079
- // Cancel the acquisition lock timeout task for the state since it is acknowledged/released successfully.
2080
- state .cancelAndClearAcquisitionLockTimeoutTask ();
2081
2099
if (state .state () == RecordState .AVAILABLE ) {
2082
2100
updateFindNextFetchOffset (true );
2083
2101
}
@@ -2389,10 +2407,18 @@ private AcquisitionLockTimerTask acquisitionLockTimerTask(
2389
2407
}
2390
2408
2391
2409
private AcquisitionLockTimeoutHandler releaseAcquisitionLockOnTimeout () {
2392
- return (memberId , firstOffset , lastOffset ) -> {
2410
+ return (memberId , firstOffset , lastOffset , timerTask ) -> {
2393
2411
List <PersisterStateBatch > stateBatches ;
2394
2412
lock .writeLock ().lock ();
2395
2413
try {
2414
+ // Check if timer task is already cancelled. This can happen when concurrent requests
2415
+ // happen to acknowledge in-flight state and timeout handler is waiting for the lock
2416
+ // but already cancelled.
2417
+ if (timerTask .isCancelled ()) {
2418
+ log .debug ("Timer task is already cancelled, not executing further." );
2419
+ return ;
2420
+ }
2421
+
2396
2422
Map .Entry <Long , InFlightBatch > floorOffset = cachedState .floorEntry (firstOffset );
2397
2423
if (floorOffset == null ) {
2398
2424
log .error ("Base offset {} not found for share partition: {}-{}" , firstOffset , groupId , topicIdPartition );
0 commit comments