Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ private static Functions.Function3<String, String, TypedProperties, StorageLockC
LOCKS_FOLDER_NAME,
StoragePath.SEPARATOR,
DEFAULT_TABLE_LOCK_FILE_NAME);
this.heartbeatManager = heartbeatManagerLoader.apply(ownerId, heartbeatPollSeconds * 1000, this::renewLock);
this.heartbeatManager = heartbeatManagerLoader.apply(ownerId, TimeUnit.SECONDS.toMillis(heartbeatPollSeconds), this::renewLock);
this.storageLockClient = storageLockClientLoader.apply(ownerId, lockFilePath, properties);
this.ownerId = ownerId;
this.logger = logger;
Expand Down Expand Up @@ -282,7 +282,7 @@ public synchronized boolean tryLock() {
}

// Try to acquire the lock
StorageLockData newLockData = new StorageLockData(false, System.currentTimeMillis() + lockValiditySecs, ownerId);
StorageLockData newLockData = new StorageLockData(false, getCurrentEpochMs() + TimeUnit.SECONDS.toMillis(lockValiditySecs), ownerId);
Pair<LockUpsertResult, Option<StorageLockFile>> lockUpdateStatus = this.storageLockClient.tryUpsertLockFile(
newLockData,
latestLock.getRight());
Expand Down Expand Up @@ -452,7 +452,7 @@ protected synchronized boolean renewLock() {
// prevents further data corruption by
// letting someone else acquire the lock.
Pair<LockUpsertResult, Option<StorageLockFile>> currentLock = this.storageLockClient.tryUpsertLockFile(
new StorageLockData(false, System.currentTimeMillis() + lockValiditySecs, ownerId),
new StorageLockData(false, getCurrentEpochMs() + TimeUnit.SECONDS.toMillis(lockValiditySecs), ownerId),
Option.of(getLock()));
switch (currentLock.getLeft()) {
case ACQUIRED_BY_OTHERS:
Expand All @@ -470,7 +470,7 @@ protected synchronized boolean renewLock() {
// Only positive outcome
this.setLock(currentLock.getRight().get());
logger.info("Owner {}: Lock renewal successful. The renewal completes {} ms before expiration for lock {}.",
ownerId, oldExpirationMs - System.currentTimeMillis(), lockFilePath);
ownerId, oldExpirationMs - getCurrentEpochMs(), lockFilePath);
// Let heartbeat continue to renew lock lease again later.
return true;
default:
Expand All @@ -490,8 +490,8 @@ protected synchronized boolean renewLock() {
* Method to calculate whether a timestamp from a distributed source has
* definitively occurred yet.
*/
protected boolean isCurrentTimeCertainlyOlderThanDistributedTime(long epoch) {
return System.currentTimeMillis() > epoch + CLOCK_DRIFT_BUFFER_MS;
protected boolean isCurrentTimeCertainlyOlderThanDistributedTime(long epochMs) {
return getCurrentEpochMs() > epochMs + CLOCK_DRIFT_BUFFER_MS;
}

private String generateLockStateMessage(LockState state) {
Expand Down Expand Up @@ -526,4 +526,9 @@ private void logWarnLockState(LockState state, String msg) {
private void logErrorLockState(LockState state, String msg) {
logger.error(LOCK_STATE_LOGGER_MSG_WITH_INFO, ownerId, lockFilePath, Thread.currentThread(), state, msg);
}

@VisibleForTesting
long getCurrentEpochMs() {
return System.currentTimeMillis();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.ArgumentMatchers.refEq;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
Expand All @@ -75,7 +75,7 @@ class TestStorageBasedLockProvider {
private HeartbeatManager mockHeartbeatManager;
private Logger mockLogger;
private final String ownerId = UUID.randomUUID().toString();
private static final int DEFAULT_LOCK_VALIDITY_MS = 5000;
private static final int DEFAULT_LOCK_VALIDITY_MS = 10000;

@BeforeEach
void setupLockProvider() {
Expand Down Expand Up @@ -174,10 +174,13 @@ void testTryLockForTimeUnitFailsToAcquireLockEventually() throws Exception {

@Test
void testTryLockSuccess() {
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, null));
StorageLockData data = new StorageLockData(false, System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
long t0 = 1_000L;
when(lockProvider.getCurrentEpochMs())
.thenReturn(t0);
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, Option.empty()));
StorageLockData data = new StorageLockData(false, t0 + DEFAULT_LOCK_VALIDITY_MS, ownerId);
StorageLockFile realLockFile = new StorageLockFile(data, "v1");
when(mockLockService.tryUpsertLockFile(any(), isNull()))
when(mockLockService.tryUpsertLockFile(refEq(data), eq(Option.empty())))
.thenReturn(Pair.of(LockUpsertResult.SUCCESS, Option.of(realLockFile)));
when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true);

Expand All @@ -189,10 +192,10 @@ void testTryLockSuccess() {

@Test
void testTryLockSuccessButFailureToStartHeartbeat() {
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, null));
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, Option.empty()));
StorageLockData data = new StorageLockData(false, System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
StorageLockFile realLockFile = new StorageLockFile(data, "v1");
when(mockLockService.tryUpsertLockFile(any(), isNull()))
when(mockLockService.tryUpsertLockFile(any(), eq(Option.empty())))
.thenReturn(Pair.of(LockUpsertResult.SUCCESS, Option.of(realLockFile)));
when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(false);
when(mockLockService.tryUpsertLockFile(any(), eq(Option.of(realLockFile))))
Expand All @@ -204,10 +207,10 @@ void testTryLockSuccessButFailureToStartHeartbeat() {

@Test
void testTryLockFailsFromOwnerMismatch() {
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, null));
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, Option.empty()));
StorageLockFile returnedLockFile = new StorageLockFile(
new StorageLockData(false, System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, "different-owner"), "v1");
when(mockLockService.tryUpsertLockFile(any(), isNull()))
when(mockLockService.tryUpsertLockFile(any(), eq(Option.empty())))
.thenReturn(Pair.of(LockUpsertResult.SUCCESS, Option.of(returnedLockFile)));

HoodieLockException ex = assertThrows(HoodieLockException.class, () -> lockProvider.tryLock());
Expand All @@ -227,15 +230,15 @@ void testTryLockFailsDueToExistingLock() {

@Test
void testTryLockFailsToUpdateFile() {
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, null));
when(mockLockService.tryUpsertLockFile(any(), isNull()))
.thenReturn(Pair.of(LockUpsertResult.ACQUIRED_BY_OTHERS, null));
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, Option.empty()));
when(mockLockService.tryUpsertLockFile(any(), eq(Option.empty())))
.thenReturn(Pair.of(LockUpsertResult.ACQUIRED_BY_OTHERS, Option.empty()));
assertFalse(lockProvider.tryLock());
}

@Test
void testTryLockFailsDueToUnknownState() {
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.UNKNOWN_ERROR, null));
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.UNKNOWN_ERROR, Option.empty()));
assertFalse(lockProvider.tryLock());
}

Expand All @@ -257,10 +260,10 @@ void testTryLockSucceedsWhenExistingLockExpiredByTime() {

@Test
void testTryLockReentrancySucceeds() {
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, null));
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, Option.empty()));
StorageLockData data = new StorageLockData(false, System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
StorageLockFile realLockFile = new StorageLockFile(data, "v1");
when(mockLockService.tryUpsertLockFile(any(), isNull()))
when(mockLockService.tryUpsertLockFile(any(), eq(Option.empty())))
.thenReturn(Pair.of(LockUpsertResult.SUCCESS, Option.of(realLockFile)));
when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true);

Expand All @@ -284,11 +287,11 @@ void testTryLockReentrancyAfterLockExpiredByTime() {
StorageLockData data = new StorageLockData(false, System.currentTimeMillis() - DEFAULT_LOCK_VALIDITY_MS, ownerId);
StorageLockFile expiredLock = new StorageLockFile(data, "v1");
doReturn(expiredLock).when(lockProvider).getLock();
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, null));
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, Option.empty()));
StorageLockData validData = new StorageLockData(false, System.currentTimeMillis() - DEFAULT_LOCK_VALIDITY_MS,
ownerId);
StorageLockFile validLock = new StorageLockFile(validData, "v2");
when(mockLockService.tryUpsertLockFile(any(), isNull()))
when(mockLockService.tryUpsertLockFile(any(), eq(Option.empty())))
.thenReturn(Pair.of(LockUpsertResult.SUCCESS, Option.of(validLock)));
when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true);

Expand All @@ -309,11 +312,11 @@ void testTryLockReentrancyAfterLockSetExpired() {
StorageLockData data = new StorageLockData(true, System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
StorageLockFile expiredLock = new StorageLockFile(data, "v1");
doReturn(expiredLock).when(lockProvider).getLock();
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, null));
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, Option.empty()));
StorageLockData validData = new StorageLockData(false, System.currentTimeMillis() - DEFAULT_LOCK_VALIDITY_MS,
ownerId);
StorageLockFile validLock = new StorageLockFile(validData, "v2");
when(mockLockService.tryUpsertLockFile(any(), isNull()))
when(mockLockService.tryUpsertLockFile(any(), eq(Option.empty())))
.thenReturn(Pair.of(LockUpsertResult.SUCCESS, Option.of(validLock)));
when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true);

Expand All @@ -334,17 +337,17 @@ void testTryLockHeartbeatStillActive() {
StorageLockData data = new StorageLockData(true, System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
StorageLockFile expiredLock = new StorageLockFile(data, "v1");
doReturn(expiredLock).when(lockProvider).getLock();
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, null));
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, Option.empty()));
when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(true);
assertThrows(HoodieLockException.class, () -> lockProvider.tryLock());
}

@Test
void testUnlockSucceedsAndReentrancy() {
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, null));
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, Option.empty()));
StorageLockData data = new StorageLockData(false, System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
StorageLockFile realLockFile = new StorageLockFile(data, "v1");
when(mockLockService.tryUpsertLockFile(any(), isNull()))
when(mockLockService.tryUpsertLockFile(any(), eq(Option.empty())))
.thenReturn(Pair.of(LockUpsertResult.SUCCESS, Option.of(realLockFile)));
when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true);
when(mockHeartbeatManager.stopHeartbeat(true)).thenReturn(true);
Expand All @@ -363,10 +366,10 @@ void testUnlockSucceedsAndReentrancy() {

@Test
void testUnlockFailsToStopHeartbeat() {
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, null));
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, Option.empty()));
StorageLockData data = new StorageLockData(false, System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
StorageLockFile realLockFile = new StorageLockFile(data, "v1");
when(mockLockService.tryUpsertLockFile(any(), isNull()))
when(mockLockService.tryUpsertLockFile(any(), eq(Option.empty())))
.thenReturn(Pair.of(LockUpsertResult.SUCCESS, Option.of(realLockFile)));
when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true);
assertTrue(lockProvider.tryLock());
Expand All @@ -378,10 +381,10 @@ void testUnlockFailsToStopHeartbeat() {

@Test
void testCloseFailsToStopHeartbeat() {
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, null));
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, Option.empty()));
StorageLockData data = new StorageLockData(false, System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
StorageLockFile realLockFile = new StorageLockFile(data, "v1");
when(mockLockService.tryUpsertLockFile(any(), isNull()))
when(mockLockService.tryUpsertLockFile(any(), eq(Option.empty())))
.thenReturn(Pair.of(LockUpsertResult.SUCCESS, Option.of(realLockFile)));
when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true);
assertTrue(lockProvider.tryLock());
Expand Down Expand Up @@ -427,7 +430,7 @@ void testRenewLockUnableToUpsertLockFileButNotFatal() {
// Signal the upsert attempt failed, but may be transient. See interface for
// more details.
when(mockLockService.tryUpsertLockFile(any(), eq(Option.of(lockFile))))
.thenReturn(Pair.of(LockUpsertResult.UNKNOWN_ERROR, null));
.thenReturn(Pair.of(LockUpsertResult.UNKNOWN_ERROR, Option.empty()));
assertTrue(lockProvider.renewLock());
}

Expand All @@ -439,7 +442,7 @@ void testRenewLockUnableToUpsertLockFileFatal() {
// Signal the upsert attempt failed, but may be transient. See interface for
// more details.
when(mockLockService.tryUpsertLockFile(any(), eq(Option.of(lockFile))))
.thenReturn(Pair.of(LockUpsertResult.UNKNOWN_ERROR, null));
.thenReturn(Pair.of(LockUpsertResult.UNKNOWN_ERROR, Option.empty()));
// renewLock return true so it will be retried.
assertTrue(lockProvider.renewLock());

Expand Down Expand Up @@ -522,10 +525,10 @@ void testCloseWithErrorForHeartbeatManager() throws Exception {

@Test
public void testShutdownHookViaReflection() throws Exception {
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, null));
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, Option.empty()));
StorageLockData data = new StorageLockData(false, System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
StorageLockFile realLockFile = new StorageLockFile(data, "v1");
when(mockLockService.tryUpsertLockFile(any(), isNull()))
when(mockLockService.tryUpsertLockFile(any(), eq(Option.empty())))
.thenReturn(Pair.of(LockUpsertResult.SUCCESS, Option.of(realLockFile)));
when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true);

Expand Down
Loading