@@ -166,6 +166,7 @@ enum State {
166
166
NoLedger , // There is no metadata ledger open for writing
167
167
Open , // Metadata ledger is ready
168
168
SwitchingLedger , // The metadata ledger is being switched
169
+ Closing , // The managed cursor is closing
169
170
Closed // The managed cursor has been closed
170
171
}
171
172
@@ -282,6 +283,15 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac
282
283
283
284
// Read the last entry in the ledger
284
285
long lastEntryInLedger = lh .getLastAddConfirmed ();
286
+
287
+ if (lastEntryInLedger < 0 ) {
288
+ log .warn ("[{}] Error reading from metadata ledger {} for consumer {}: No entries in ledger" ,
289
+ ledger .getName (), ledgerId , name );
290
+ // Rewind to last cursor snapshot available
291
+ initialize (getRollbackPosition (info ), callback );
292
+ return ;
293
+ }
294
+
285
295
lh .asyncReadEntries (lastEntryInLedger , lastEntryInLedger , (rc1 , lh1 , seq , ctx1 ) -> {
286
296
if (log .isDebugEnabled ()) {
287
297
log .debug ("[{}} readComplete rc={} entryId={}" , ledger .getName (), rc1 , lh1 .getLastAddConfirmed ());
@@ -430,7 +440,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
430
440
public void asyncReadEntries (final int numberOfEntriesToRead , final ReadEntriesCallback callback ,
431
441
final Object ctx ) {
432
442
checkArgument (numberOfEntriesToRead > 0 );
433
- if (STATE_UPDATER . get ( this ) == State . Closed ) {
443
+ if (isClosed () ) {
434
444
callback .readEntriesFailed (new ManagedLedgerException ("Cursor was already closed" ), ctx );
435
445
return ;
436
446
}
@@ -480,7 +490,7 @@ public void readEntryComplete(Entry entry, Object ctx) {
480
490
public void asyncGetNthEntry (int n , IndividualDeletedEntries deletedEntries , ReadEntryCallback callback ,
481
491
Object ctx ) {
482
492
checkArgument (n > 0 );
483
- if (STATE_UPDATER . get ( this ) == State . Closed ) {
493
+ if (isClosed () ) {
484
494
callback .readEntryFailed (new ManagedLedgerException ("Cursor was already closed" ), ctx );
485
495
return ;
486
496
}
@@ -545,7 +555,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
545
555
@ Override
546
556
public void asyncReadEntriesOrWait (int numberOfEntriesToRead , ReadEntriesCallback callback , Object ctx ) {
547
557
checkArgument (numberOfEntriesToRead > 0 );
548
- if (STATE_UPDATER . get ( this ) == State . Closed ) {
558
+ if (isClosed () ) {
549
559
callback .readEntriesFailed (new CursorAlreadyClosedException ("Cursor was already closed" ), ctx );
550
560
return ;
551
561
}
@@ -619,6 +629,10 @@ public void asyncReadEntriesOrWait(int numberOfEntriesToRead, ReadEntriesCallbac
619
629
}
620
630
}
621
631
632
+ private boolean isClosed () {
633
+ return state == State .Closed || state == State .Closing ;
634
+ }
635
+
622
636
@ Override
623
637
public boolean cancelPendingReadRequest () {
624
638
if (log .isDebugEnabled ()) {
@@ -1342,7 +1356,7 @@ public void asyncMarkDelete(final Position position, Map<String, Long> propertie
1342
1356
checkNotNull (position );
1343
1357
checkArgument (position instanceof PositionImpl );
1344
1358
1345
- if (STATE_UPDATER . get ( this ) == State . Closed ) {
1359
+ if (isClosed () ) {
1346
1360
callback .markDeleteFailed (new ManagedLedgerException ("Cursor was already closed" ), ctx );
1347
1361
return ;
1348
1362
}
@@ -1558,7 +1572,7 @@ public void deleteFailed(ManagedLedgerException exception, Object ctx) {
1558
1572
1559
1573
@ Override
1560
1574
public void asyncDelete (Iterable <Position > positions , AsyncCallbacks .DeleteCallback callback , Object ctx ) {
1561
- if (state == State . Closed ) {
1575
+ if (isClosed () ) {
1562
1576
callback .deleteFailed (new ManagedLedgerException ("Cursor was already closed" ), ctx );
1563
1577
return ;
1564
1578
}
@@ -1876,6 +1890,14 @@ private boolean shouldPersistUnackRangesToLedger() {
1876
1890
1877
1891
private void persistPositionMetaStore (long cursorsLedgerId , PositionImpl position , Map <String , Long > properties ,
1878
1892
MetaStoreCallback <Void > callback , boolean persistIndividualDeletedMessageRanges ) {
1893
+ if (state == State .Closed ) {
1894
+ ledger .getExecutor ().execute (safeRun (() -> {
1895
+ callback .operationFailed (new MetaStoreException (
1896
+ new ManagedLedgerException .CursorAlreadyClosedException (name + " cursor already closed" )));
1897
+ }));
1898
+ return ;
1899
+ }
1900
+
1879
1901
// When closing we store the last mark-delete position in the z-node itself, so we won't need the cursor ledger,
1880
1902
// hence we write it as -1. The cursor ledger is deleted once the z-node write is confirmed.
1881
1903
ManagedCursorInfo .Builder info = ManagedCursorInfo .newBuilder () //
@@ -1910,13 +1932,14 @@ public void operationFailed(MetaStoreException e) {
1910
1932
1911
1933
@ Override
1912
1934
public void asyncClose (final AsyncCallbacks .CloseCallback callback , final Object ctx ) {
1913
- State oldState = STATE_UPDATER .getAndSet (this , State .Closed );
1914
- if (oldState == State .Closed ) {
1935
+ State oldState = STATE_UPDATER .getAndSet (this , State .Closing );
1936
+ if (oldState == State .Closed || oldState == State . Closing ) {
1915
1937
log .info ("[{}] [{}] State is already closed" , ledger .getName (), name );
1916
1938
callback .closeComplete (ctx );
1917
1939
return ;
1918
1940
}
1919
1941
persistPosition (-1 , lastMarkDeleteEntry .newPosition , lastMarkDeleteEntry .properties , callback , ctx );
1942
+ STATE_UPDATER .set (this , State .Closed );
1920
1943
}
1921
1944
1922
1945
/**
@@ -2059,7 +2082,7 @@ public void deleteComplete(int rc, Object ctx) {
2059
2082
});
2060
2083
}));
2061
2084
}, Collections .emptyMap ());
2062
-
2085
+
2063
2086
}
2064
2087
2065
2088
private List <LongProperty > buildPropertiesMap (Map <String , Long > properties ) {
@@ -2166,7 +2189,7 @@ boolean shouldCloseLedger(LedgerHandle lh) {
2166
2189
long now = clock .millis ();
2167
2190
if ((lh .getLastAddConfirmed () >= config .getMetadataMaxEntriesPerLedger ()
2168
2191
|| lastLedgerSwitchTimestamp < (now - config .getLedgerRolloverTimeout () * 1000 ))
2169
- && STATE_UPDATER .get (this ) != State .Closed ) {
2192
+ && ( STATE_UPDATER .get (this ) != State .Closed && STATE_UPDATER . get ( this ) != State . Closing ) ) {
2170
2193
// It's safe to modify the timestamp since this method will be only called from a callback, implying that
2171
2194
// calls will be serialized on one single thread
2172
2195
lastLedgerSwitchTimestamp = now ;
0 commit comments