@@ -92,6 +92,11 @@ public void testUpgradeOrDowngrade(HoodieTableVersion fromVersion, HoodieTableVe
92
92
93
93
Dataset <Row > originalData = readTableData (originalMetaClient , "before " + operation );
94
94
95
+ // Confirm that there are log files before rollback and compaction operations
96
+ if (isRollbackAndCompactTransition (fromVersion , toVersion )) {
97
+ validateLogFilesCount (originalMetaClient , operation , true );
98
+ }
99
+
95
100
new UpgradeDowngrade (originalMetaClient , config , context (), SparkUpgradeDowngradeHelper .getInstance ())
96
101
.run (toVersion , null );
97
102
@@ -103,13 +108,15 @@ public void testUpgradeOrDowngrade(HoodieTableVersion fromVersion, HoodieTableVe
103
108
assertTableVersionOnDataAndMetadataTable (resultMetaClient , toVersion );
104
109
validateVersionSpecificProperties (resultMetaClient , fromVersion , toVersion );
105
110
validateDataConsistency (originalData , resultMetaClient , "after " + operation );
106
-
107
- // Validate pending commits based on whether this transition performs rollback operations
111
+
112
+ // Validate pending commits based on whether this transition performs rollback and compaction operations
108
113
int finalPendingCommits = resultMetaClient .getCommitsTimeline ().filterPendingExcludingCompaction ().countInstants ();
109
- if (isRollbackTransition (fromVersion , toVersion )) {
114
+ if (isRollbackAndCompactTransition (fromVersion , toVersion )) {
110
115
// Handlers that call rollbackFailedWritesAndCompact() clear all pending commits
111
116
assertEquals (0 , finalPendingCommits ,
112
117
"Pending commits should be cleared to 0 after " + operation );
118
+ // Validate no log files remain after rollback and compaction
119
+ validateLogFilesCount (resultMetaClient , operation , false );
113
120
} else {
114
121
// Other handlers may clean up some pending commits but don't necessarily clear all
115
122
assertTrue (finalPendingCommits <= initialPendingCommits ,
@@ -187,9 +194,13 @@ public void testMetadataTableUpgradeDowngradeFailure(HoodieTableVersion fromVers
187
194
() -> new UpgradeDowngrade (originalMetaClient , cfg , context (), SparkUpgradeDowngradeHelper .getInstance ())
188
195
.run (toVersion , null )
189
196
);
190
-
191
- LOG .info ("Successfully caught expected exception during {} from {} to {}: {}" ,
192
- operation , fromVersion , toVersion , exception .getMessage ());
197
+
198
+ // Verify the specific exception message for metadata table failures
199
+ String expectedMessage = "Upgrade/downgrade for the Hudi metadata table failed. "
200
+ + "Please try again. If the failure repeats for metadata table, it is recommended to disable "
201
+ + "the metadata table so that the upgrade and downgrade can continue for the data table." ;
202
+ assertTrue (exception .getMessage ().contains (expectedMessage ),
203
+ "Exception message should contain metadata table failure message" );
193
204
}
194
205
195
206
/**
@@ -294,16 +305,17 @@ private static Stream<Arguments> upgradeDowngradeVersionPairs() {
294
305
}
295
306
296
307
/**
297
- * Version pairs that should be tested for metadata upgrade/downgrade failures.
298
- * Excludes pairs that trigger rollback operations which delete metadata tables.
308
+ * Version pairs for testing metadata failure when trying to upgrade/downgrade. Note these version pairs
309
+ * are ones that do invoke rollbackFailedWritesAndCompact() which this method causes the metadata table to be disabled
299
310
*/
300
311
private static Stream <Arguments > metadataTableCorruptionTestVersionPairs () {
301
312
return Stream .of (
302
- Arguments .of (HoodieTableVersion .FOUR , HoodieTableVersion .FIVE ), // V4 -> V5
303
- Arguments .of (HoodieTableVersion .FIVE , HoodieTableVersion .FOUR ) // V5 -> V4
304
- // Note: Other pairs like V5->V6, V6->V8 are excluded because they may trigger
305
- // rollback operations in rollbackFailedWritesAndCompact() that disable metadata
306
- // tables, causing automatic deletion before testing can occur
313
+ // Non-rollback upgrade pairs
314
+ Arguments .of (HoodieTableVersion .FOUR , HoodieTableVersion .FIVE ), // V4 -> V5 (works)
315
+ Arguments .of (HoodieTableVersion .FIVE , HoodieTableVersion .SIX ), // V5 -> V6 (works)
316
+
317
+ // Non-rollback downgrade pairs
318
+ Arguments .of (HoodieTableVersion .FIVE , HoodieTableVersion .FOUR ) // V5 -> V4 (works)
307
319
);
308
320
}
309
321
@@ -427,12 +439,64 @@ private boolean isMetadataTablePresent(HoodieTableMetaClient metaClient) throws
427
439
StoragePath metadataTablePath = HoodieTableMetadata .getMetadataTableBasePath (metaClient .getBasePath ());
428
440
return metaClient .getStorage ().exists (metadataTablePath );
429
441
}
430
-
442
+
443
+ /**
444
+ * Validate log files count based on expected scenario.
445
+ * This ensures proper behavior before and after rollback and compaction operations.
446
+ */
447
+ private void validateLogFilesCount (HoodieTableMetaClient metaClient , String operation , boolean expectLogFiles ) {
448
+ String validationPhase = expectLogFiles ? "before" : "after" ;
449
+ LOG .info ("Validating log files {} rollback and compaction during {}" , validationPhase , operation );
450
+
451
+ // Get the latest completed commit to ensure we're looking at a consistent state
452
+ org .apache .hudi .common .table .timeline .HoodieTimeline completedTimeline =
453
+ metaClient .getCommitsTimeline ().filterCompletedInstants ();
454
+ String latestCommit = completedTimeline .lastInstant ()
455
+ .map (instant -> instant .requestedTime ())
456
+ .orElse (null );
457
+
458
+ // Get file system view to check for log files using the latest commit state
459
+ try (org .apache .hudi .common .table .view .HoodieTableFileSystemView fsView =
460
+ org .apache .hudi .common .table .view .HoodieTableFileSystemView .fileListingBasedFileSystemView (
461
+ context (), metaClient , completedTimeline )) {
462
+
463
+ // Get all partition paths using FSUtils
464
+ List <String > partitionPaths = org .apache .hudi .common .fs .FSUtils .getAllPartitionPaths (
465
+ context (), metaClient , false );
466
+
467
+ int totalLogFiles = 0 ;
468
+
469
+ for (String partitionPath : partitionPaths ) {
470
+ // Get latest file slices for this partition
471
+ Stream <org .apache .hudi .common .model .FileSlice > fileSlicesStream = latestCommit != null
472
+ ? fsView .getLatestFileSlicesBeforeOrOn (partitionPath , latestCommit , false )
473
+ : fsView .getLatestFileSlices (partitionPath );
474
+
475
+ for (org .apache .hudi .common .model .FileSlice fileSlice : fileSlicesStream .collect (Collectors .toList ())) {
476
+ int logFileCount = (int ) fileSlice .getLogFiles ().count ();
477
+ totalLogFiles += logFileCount ;
478
+ }
479
+ }
480
+
481
+ if (expectLogFiles ) {
482
+ assertTrue (totalLogFiles > 0 ,
483
+ "Expected log files but found none during " + operation );
484
+ } else {
485
+ assertEquals (0 , totalLogFiles ,
486
+ "No log files should remain after rollback and compaction during " + operation );
487
+ }
488
+ LOG .info ("Log file validation passed: {} log files found (expected: {})" ,
489
+ totalLogFiles , expectLogFiles ? ">0" : "0" );
490
+ } catch (Exception e ) {
491
+ throw new RuntimeException ("Failed to validate log files during " + operation , e );
492
+ }
493
+ }
494
+
431
495
/**
432
496
* Determine if a version transition performs rollback operations that clear all pending commits.
433
497
* These handlers call rollbackFailedWritesAndCompact() which clears pending commits to 0.
434
498
*/
435
- private boolean isRollbackTransition (HoodieTableVersion fromVersion , HoodieTableVersion toVersion ) {
499
+ private boolean isRollbackAndCompactTransition (HoodieTableVersion fromVersion , HoodieTableVersion toVersion ) {
436
500
// Upgrade handlers that perform rollbacks
437
501
if (fromVersion == HoodieTableVersion .SEVEN && toVersion == HoodieTableVersion .EIGHT ) {
438
502
return true ; // SevenToEightUpgradeHandler
0 commit comments