Skip to content

Commit f1ec33f

Browse files
alexr17rahil-c
authored andcommitted
[HUDI-9159] Fix LP validity bug (apache#13277)
1 parent 93feb4b commit f1ec33f

File tree

2 files changed

+44
-36
lines changed

2 files changed

+44
-36
lines changed

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageBasedLockProvider.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ private static Functions.Function3<String, String, TypedProperties, StorageLockC
165165
LOCKS_FOLDER_NAME,
166166
StoragePath.SEPARATOR,
167167
DEFAULT_TABLE_LOCK_FILE_NAME);
168-
this.heartbeatManager = heartbeatManagerLoader.apply(ownerId, heartbeatPollSeconds * 1000, this::renewLock);
168+
this.heartbeatManager = heartbeatManagerLoader.apply(ownerId, TimeUnit.SECONDS.toMillis(heartbeatPollSeconds), this::renewLock);
169169
this.storageLockClient = storageLockClientLoader.apply(ownerId, lockFilePath, properties);
170170
this.ownerId = ownerId;
171171
this.logger = logger;
@@ -282,7 +282,7 @@ public synchronized boolean tryLock() {
282282
}
283283

284284
// Try to acquire the lock
285-
StorageLockData newLockData = new StorageLockData(false, System.currentTimeMillis() + lockValiditySecs, ownerId);
285+
StorageLockData newLockData = new StorageLockData(false, getCurrentEpochMs() + TimeUnit.SECONDS.toMillis(lockValiditySecs), ownerId);
286286
Pair<LockUpsertResult, Option<StorageLockFile>> lockUpdateStatus = this.storageLockClient.tryUpsertLockFile(
287287
newLockData,
288288
latestLock.getRight());
@@ -452,7 +452,7 @@ protected synchronized boolean renewLock() {
452452
// prevents further data corruption by
453453
// letting someone else acquire the lock.
454454
Pair<LockUpsertResult, Option<StorageLockFile>> currentLock = this.storageLockClient.tryUpsertLockFile(
455-
new StorageLockData(false, System.currentTimeMillis() + lockValiditySecs, ownerId),
455+
new StorageLockData(false, getCurrentEpochMs() + TimeUnit.SECONDS.toMillis(lockValiditySecs), ownerId),
456456
Option.of(getLock()));
457457
switch (currentLock.getLeft()) {
458458
case ACQUIRED_BY_OTHERS:
@@ -470,7 +470,7 @@ protected synchronized boolean renewLock() {
470470
// Only positive outcome
471471
this.setLock(currentLock.getRight().get());
472472
logger.info("Owner {}: Lock renewal successful. The renewal completes {} ms before expiration for lock {}.",
473-
ownerId, oldExpirationMs - System.currentTimeMillis(), lockFilePath);
473+
ownerId, oldExpirationMs - getCurrentEpochMs(), lockFilePath);
474474
// Let heartbeat continue to renew lock lease again later.
475475
return true;
476476
default:
@@ -490,8 +490,8 @@ protected synchronized boolean renewLock() {
490490
* Method to calculate whether a timestamp from a distributed source has
491491
* definitively occurred yet.
492492
*/
493-
protected boolean isCurrentTimeCertainlyOlderThanDistributedTime(long epoch) {
494-
return System.currentTimeMillis() > epoch + CLOCK_DRIFT_BUFFER_MS;
493+
protected boolean isCurrentTimeCertainlyOlderThanDistributedTime(long epochMs) {
494+
return getCurrentEpochMs() > epochMs + CLOCK_DRIFT_BUFFER_MS;
495495
}
496496

497497
private String generateLockStateMessage(LockState state) {
@@ -526,4 +526,9 @@ private void logWarnLockState(LockState state, String msg) {
526526
private void logErrorLockState(LockState state, String msg) {
527527
logger.error(LOCK_STATE_LOGGER_MSG_WITH_INFO, ownerId, lockFilePath, Thread.currentThread(), state, msg);
528528
}
529+
530+
@VisibleForTesting
531+
long getCurrentEpochMs() {
532+
return System.currentTimeMillis();
533+
}
529534
}

hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/TestStorageBasedLockProvider.java

Lines changed: 33 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@
5555
import static org.mockito.ArgumentMatchers.any;
5656
import static org.mockito.ArgumentMatchers.anyLong;
5757
import static org.mockito.ArgumentMatchers.eq;
58-
import static org.mockito.ArgumentMatchers.isNull;
58+
import static org.mockito.ArgumentMatchers.refEq;
5959
import static org.mockito.Mockito.atLeastOnce;
6060
import static org.mockito.Mockito.doAnswer;
6161
import static org.mockito.Mockito.doReturn;
@@ -75,7 +75,7 @@ class TestStorageBasedLockProvider {
7575
private HeartbeatManager mockHeartbeatManager;
7676
private Logger mockLogger;
7777
private final String ownerId = UUID.randomUUID().toString();
78-
private static final int DEFAULT_LOCK_VALIDITY_MS = 5000;
78+
private static final int DEFAULT_LOCK_VALIDITY_MS = 10000;
7979

8080
@BeforeEach
8181
void setupLockProvider() {
@@ -174,10 +174,13 @@ void testTryLockForTimeUnitFailsToAcquireLockEventually() throws Exception {
174174

175175
@Test
176176
void testTryLockSuccess() {
177-
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, null));
178-
StorageLockData data = new StorageLockData(false, System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
177+
long t0 = 1_000L;
178+
when(lockProvider.getCurrentEpochMs())
179+
.thenReturn(t0);
180+
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, Option.empty()));
181+
StorageLockData data = new StorageLockData(false, t0 + DEFAULT_LOCK_VALIDITY_MS, ownerId);
179182
StorageLockFile realLockFile = new StorageLockFile(data, "v1");
180-
when(mockLockService.tryUpsertLockFile(any(), isNull()))
183+
when(mockLockService.tryUpsertLockFile(refEq(data), eq(Option.empty())))
181184
.thenReturn(Pair.of(LockUpsertResult.SUCCESS, Option.of(realLockFile)));
182185
when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true);
183186

@@ -189,10 +192,10 @@ void testTryLockSuccess() {
189192

190193
@Test
191194
void testTryLockSuccessButFailureToStartHeartbeat() {
192-
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, null));
195+
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, Option.empty()));
193196
StorageLockData data = new StorageLockData(false, System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
194197
StorageLockFile realLockFile = new StorageLockFile(data, "v1");
195-
when(mockLockService.tryUpsertLockFile(any(), isNull()))
198+
when(mockLockService.tryUpsertLockFile(any(), eq(Option.empty())))
196199
.thenReturn(Pair.of(LockUpsertResult.SUCCESS, Option.of(realLockFile)));
197200
when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(false);
198201
when(mockLockService.tryUpsertLockFile(any(), eq(Option.of(realLockFile))))
@@ -204,10 +207,10 @@ void testTryLockSuccessButFailureToStartHeartbeat() {
204207

205208
@Test
206209
void testTryLockFailsFromOwnerMismatch() {
207-
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, null));
210+
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, Option.empty()));
208211
StorageLockFile returnedLockFile = new StorageLockFile(
209212
new StorageLockData(false, System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, "different-owner"), "v1");
210-
when(mockLockService.tryUpsertLockFile(any(), isNull()))
213+
when(mockLockService.tryUpsertLockFile(any(), eq(Option.empty())))
211214
.thenReturn(Pair.of(LockUpsertResult.SUCCESS, Option.of(returnedLockFile)));
212215

213216
HoodieLockException ex = assertThrows(HoodieLockException.class, () -> lockProvider.tryLock());
@@ -227,15 +230,15 @@ void testTryLockFailsDueToExistingLock() {
227230

228231
@Test
229232
void testTryLockFailsToUpdateFile() {
230-
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, null));
231-
when(mockLockService.tryUpsertLockFile(any(), isNull()))
232-
.thenReturn(Pair.of(LockUpsertResult.ACQUIRED_BY_OTHERS, null));
233+
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, Option.empty()));
234+
when(mockLockService.tryUpsertLockFile(any(), eq(Option.empty())))
235+
.thenReturn(Pair.of(LockUpsertResult.ACQUIRED_BY_OTHERS, Option.empty()));
233236
assertFalse(lockProvider.tryLock());
234237
}
235238

236239
@Test
237240
void testTryLockFailsDueToUnknownState() {
238-
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.UNKNOWN_ERROR, null));
241+
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.UNKNOWN_ERROR, Option.empty()));
239242
assertFalse(lockProvider.tryLock());
240243
}
241244

@@ -257,10 +260,10 @@ void testTryLockSucceedsWhenExistingLockExpiredByTime() {
257260

258261
@Test
259262
void testTryLockReentrancySucceeds() {
260-
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, null));
263+
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, Option.empty()));
261264
StorageLockData data = new StorageLockData(false, System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
262265
StorageLockFile realLockFile = new StorageLockFile(data, "v1");
263-
when(mockLockService.tryUpsertLockFile(any(), isNull()))
266+
when(mockLockService.tryUpsertLockFile(any(), eq(Option.empty())))
264267
.thenReturn(Pair.of(LockUpsertResult.SUCCESS, Option.of(realLockFile)));
265268
when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true);
266269

@@ -284,11 +287,11 @@ void testTryLockReentrancyAfterLockExpiredByTime() {
284287
StorageLockData data = new StorageLockData(false, System.currentTimeMillis() - DEFAULT_LOCK_VALIDITY_MS, ownerId);
285288
StorageLockFile expiredLock = new StorageLockFile(data, "v1");
286289
doReturn(expiredLock).when(lockProvider).getLock();
287-
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, null));
290+
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, Option.empty()));
288291
StorageLockData validData = new StorageLockData(false, System.currentTimeMillis() - DEFAULT_LOCK_VALIDITY_MS,
289292
ownerId);
290293
StorageLockFile validLock = new StorageLockFile(validData, "v2");
291-
when(mockLockService.tryUpsertLockFile(any(), isNull()))
294+
when(mockLockService.tryUpsertLockFile(any(), eq(Option.empty())))
292295
.thenReturn(Pair.of(LockUpsertResult.SUCCESS, Option.of(validLock)));
293296
when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true);
294297

@@ -309,11 +312,11 @@ void testTryLockReentrancyAfterLockSetExpired() {
309312
StorageLockData data = new StorageLockData(true, System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
310313
StorageLockFile expiredLock = new StorageLockFile(data, "v1");
311314
doReturn(expiredLock).when(lockProvider).getLock();
312-
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, null));
315+
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, Option.empty()));
313316
StorageLockData validData = new StorageLockData(false, System.currentTimeMillis() - DEFAULT_LOCK_VALIDITY_MS,
314317
ownerId);
315318
StorageLockFile validLock = new StorageLockFile(validData, "v2");
316-
when(mockLockService.tryUpsertLockFile(any(), isNull()))
319+
when(mockLockService.tryUpsertLockFile(any(), eq(Option.empty())))
317320
.thenReturn(Pair.of(LockUpsertResult.SUCCESS, Option.of(validLock)));
318321
when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true);
319322

@@ -334,17 +337,17 @@ void testTryLockHeartbeatStillActive() {
334337
StorageLockData data = new StorageLockData(true, System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
335338
StorageLockFile expiredLock = new StorageLockFile(data, "v1");
336339
doReturn(expiredLock).when(lockProvider).getLock();
337-
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, null));
340+
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, Option.empty()));
338341
when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(true);
339342
assertThrows(HoodieLockException.class, () -> lockProvider.tryLock());
340343
}
341344

342345
@Test
343346
void testUnlockSucceedsAndReentrancy() {
344-
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, null));
347+
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, Option.empty()));
345348
StorageLockData data = new StorageLockData(false, System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
346349
StorageLockFile realLockFile = new StorageLockFile(data, "v1");
347-
when(mockLockService.tryUpsertLockFile(any(), isNull()))
350+
when(mockLockService.tryUpsertLockFile(any(), eq(Option.empty())))
348351
.thenReturn(Pair.of(LockUpsertResult.SUCCESS, Option.of(realLockFile)));
349352
when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true);
350353
when(mockHeartbeatManager.stopHeartbeat(true)).thenReturn(true);
@@ -363,10 +366,10 @@ void testUnlockSucceedsAndReentrancy() {
363366

364367
@Test
365368
void testUnlockFailsToStopHeartbeat() {
366-
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, null));
369+
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, Option.empty()));
367370
StorageLockData data = new StorageLockData(false, System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
368371
StorageLockFile realLockFile = new StorageLockFile(data, "v1");
369-
when(mockLockService.tryUpsertLockFile(any(), isNull()))
372+
when(mockLockService.tryUpsertLockFile(any(), eq(Option.empty())))
370373
.thenReturn(Pair.of(LockUpsertResult.SUCCESS, Option.of(realLockFile)));
371374
when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true);
372375
assertTrue(lockProvider.tryLock());
@@ -378,10 +381,10 @@ void testUnlockFailsToStopHeartbeat() {
378381

379382
@Test
380383
void testCloseFailsToStopHeartbeat() {
381-
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, null));
384+
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, Option.empty()));
382385
StorageLockData data = new StorageLockData(false, System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
383386
StorageLockFile realLockFile = new StorageLockFile(data, "v1");
384-
when(mockLockService.tryUpsertLockFile(any(), isNull()))
387+
when(mockLockService.tryUpsertLockFile(any(), eq(Option.empty())))
385388
.thenReturn(Pair.of(LockUpsertResult.SUCCESS, Option.of(realLockFile)));
386389
when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true);
387390
assertTrue(lockProvider.tryLock());
@@ -427,7 +430,7 @@ void testRenewLockUnableToUpsertLockFileButNotFatal() {
427430
// Signal the upsert attempt failed, but may be transient. See interface for
428431
// more details.
429432
when(mockLockService.tryUpsertLockFile(any(), eq(Option.of(lockFile))))
430-
.thenReturn(Pair.of(LockUpsertResult.UNKNOWN_ERROR, null));
433+
.thenReturn(Pair.of(LockUpsertResult.UNKNOWN_ERROR, Option.empty()));
431434
assertTrue(lockProvider.renewLock());
432435
}
433436

@@ -439,7 +442,7 @@ void testRenewLockUnableToUpsertLockFileFatal() {
439442
// Signal the upsert attempt failed, but may be transient. See interface for
440443
// more details.
441444
when(mockLockService.tryUpsertLockFile(any(), eq(Option.of(lockFile))))
442-
.thenReturn(Pair.of(LockUpsertResult.UNKNOWN_ERROR, null));
445+
.thenReturn(Pair.of(LockUpsertResult.UNKNOWN_ERROR, Option.empty()));
443446
// renewLock return true so it will be retried.
444447
assertTrue(lockProvider.renewLock());
445448

@@ -522,10 +525,10 @@ void testCloseWithErrorForHeartbeatManager() throws Exception {
522525

523526
@Test
524527
public void testShutdownHookViaReflection() throws Exception {
525-
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, null));
528+
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, Option.empty()));
526529
StorageLockData data = new StorageLockData(false, System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
527530
StorageLockFile realLockFile = new StorageLockFile(data, "v1");
528-
when(mockLockService.tryUpsertLockFile(any(), isNull()))
531+
when(mockLockService.tryUpsertLockFile(any(), eq(Option.empty())))
529532
.thenReturn(Pair.of(LockUpsertResult.SUCCESS, Option.of(realLockFile)));
530533
when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true);
531534

0 commit comments

Comments
 (0)