@@ -63,10 +63,10 @@ public class UpgradeDowngrade {
63
63
Pair .of (8 , 9 ) // EightToNineUpgradeHandler
64
64
));
65
65
66
- private static final Set <Pair <Integer , Integer >> DOWNGRADE_HANDLERS_REQUIRING_ROLLBACK_AND_COMPACT = new HashSet <>(Arrays .asList (
66
+ private static final Set <Pair <Integer , Integer >> DOWNGRADE_HANDLERS_REQUIRING_ROLLBACK_ANDCOMPACT = new HashSet <>(Arrays .asList (
67
67
Pair .of (8 , 7 ), // EightToSevenDowngradeHandler
68
68
Pair .of (9 , 8 ), // NineToEightDowngradeHandler
69
- Pair .of (6 , 5 ) // SixToFiveDowngradeHadler
69
+ Pair .of (6 , 5 ) // SixToFiveDowngradeHandler
70
70
));
71
71
72
72
private final SupportsUpgradeDowngrade upgradeDowngradeHelper ;
@@ -147,12 +147,22 @@ public boolean needsUpgrade(HoodieTableVersion toWriteVersion) {
147
147
public void run (HoodieTableVersion toVersion , String instantTime ) {
148
148
// Fetch version from property file and current version
149
149
HoodieTableVersion fromVersion = metaClient .getTableConfig ().getTableVersion ();
150
+ // Determine if we are upgrading or downgrading
151
+ boolean isUpgrade = fromVersion .versionCode () < toVersion .versionCode ();
152
+ if (isUpgrade && !config .autoUpgrade ()) {
153
+ // if we are attempting to upgrade and auto-upgrade is disabled
154
+ // we set the write config table version to bounded by the current hudi table version
155
+ // and then exit out the upgrade process
156
+ LOG .warn ("AUTO_UPGRADE_VERSION was explicitly disabled, skipping table version upgrade process" );
157
+ return ;
158
+ }
159
+
150
160
if (!needsUpgradeOrDowngrade (toVersion )) {
151
161
return ;
152
162
}
153
163
154
- // Perform rollback and compaction if any handlers need it - this must happen at the very beginning
155
- rollbackAndCompactIfNeeded (fromVersion , toVersion );
164
+ // Perform rollback and compaction only if a specific handler requires it, before upgrade/downgrade process
165
+ performRollbackAndCompactionIfRequired (fromVersion , toVersion , isUpgrade );
156
166
157
167
// Change metadata table version automatically
158
168
if (toVersion .versionCode () >= HoodieTableVersion .FOUR .versionCode ()) {
@@ -179,8 +189,7 @@ public void run(HoodieTableVersion toVersion, String instantTime) {
179
189
LOG .info ("Attempting to move table from version " + fromVersion + " to " + toVersion );
180
190
Map <ConfigProperty , String > tablePropsToAdd = new Hashtable <>();
181
191
List <ConfigProperty > tablePropsToRemove = new ArrayList <>();
182
- boolean isDowngrade = false ;
183
- if (fromVersion .versionCode () < toVersion .versionCode ()) {
192
+ if (isUpgrade ) {
184
193
// upgrade
185
194
while (fromVersion .versionCode () < toVersion .versionCode ()) {
186
195
HoodieTableVersion nextVersion = HoodieTableVersion .fromVersionCode (fromVersion .versionCode () + 1 );
@@ -189,7 +198,6 @@ public void run(HoodieTableVersion toVersion, String instantTime) {
189
198
}
190
199
} else {
191
200
// downgrade
192
- isDowngrade = true ;
193
201
while (fromVersion .versionCode () > toVersion .versionCode ()) {
194
202
HoodieTableVersion prevVersion = HoodieTableVersion .fromVersionCode (fromVersion .versionCode () - 1 );
195
203
Pair <Map <ConfigProperty , String >, List <ConfigProperty >> tablePropsToAddAndRemove = downgrade (fromVersion , prevVersion , instantTime );
@@ -222,7 +230,7 @@ public void run(HoodieTableVersion toVersion, String instantTime) {
222
230
HoodieTableConfig .update (metaClient .getStorage (),
223
231
metaClient .getMetaPath (), metaClient .getTableConfig ().getProps ());
224
232
225
- if (metaClient .getTableConfig ().isMetadataTableAvailable () && toVersion .equals (HoodieTableVersion .SIX ) && isDowngrade ) {
233
+ if (metaClient .getTableConfig ().isMetadataTableAvailable () && toVersion .equals (HoodieTableVersion .SIX ) && ! isUpgrade ) {
226
234
// NOTE: Add empty deltacommit to metadata table. The compaction instant format has changed in version 8.
227
235
// It no longer has a suffix of "001" for the compaction instant. Due to that, the timeline instant
228
236
// comparison logic in metadata table will fail after LSM timeline downgrade.
@@ -297,22 +305,20 @@ protected Pair<Map<ConfigProperty, String>, List<ConfigProperty>> downgrade(Hood
297
305
}
298
306
299
307
/**
300
- * Checks if any handlers in the upgrade/downgrade path need rollback and compaction and performs it once before starting.
301
- * This ensures rollback and compaction happens only when needed and only once at the very beginning of the process.
308
+ * Checks if any handlers in the upgrade/downgrade path require running rollback and compaction before starting process.
302
309
*
303
310
* @param fromVersion the current table version
304
311
* @param toVersion the target table version
305
312
*/
306
- private void rollbackAndCompactIfNeeded (HoodieTableVersion fromVersion , HoodieTableVersion toVersion ) {
307
- // Check if any handlers in the upgrade/downgrade path need rollback and compaction
308
- boolean needsRollbackAndCompact = false ;
309
- if (fromVersion .versionCode () < toVersion .versionCode ()) {
313
+ private void performRollbackAndCompactionIfRequired (HoodieTableVersion fromVersion , HoodieTableVersion toVersion , boolean isUpgrade ) {
314
+ boolean requireRollbackAndCompaction = false ;
315
+ if (isUpgrade ) {
310
316
// Check upgrade handlers
311
317
HoodieTableVersion checkVersion = fromVersion ;
312
318
while (checkVersion .versionCode () < toVersion .versionCode ()) {
313
319
HoodieTableVersion nextVersion = HoodieTableVersion .fromVersionCode (checkVersion .versionCode () + 1 );
314
320
if (UPGRADE_HANDLERS_REQUIRING_ROLLBACK_AND_COMPACT .contains (Pair .of (checkVersion .versionCode (), nextVersion .versionCode ()))) {
315
- needsRollbackAndCompact = true ;
321
+ requireRollbackAndCompaction = true ;
316
322
break ;
317
323
}
318
324
checkVersion = nextVersion ;
@@ -322,21 +328,17 @@ private void rollbackAndCompactIfNeeded(HoodieTableVersion fromVersion, HoodieTa
322
328
HoodieTableVersion checkVersion = fromVersion ;
323
329
while (checkVersion .versionCode () > toVersion .versionCode ()) {
324
330
HoodieTableVersion prevVersion = HoodieTableVersion .fromVersionCode (checkVersion .versionCode () - 1 );
325
- if (DOWNGRADE_HANDLERS_REQUIRING_ROLLBACK_AND_COMPACT .contains (Pair .of (checkVersion .versionCode (), prevVersion .versionCode ()))) {
326
- needsRollbackAndCompact = true ;
331
+ if (DOWNGRADE_HANDLERS_REQUIRING_ROLLBACK_ANDCOMPACT .contains (Pair .of (checkVersion .versionCode (), prevVersion .versionCode ()))) {
332
+ requireRollbackAndCompaction = true ;
327
333
break ;
328
334
}
329
335
checkVersion = prevVersion ;
330
336
}
331
337
}
332
-
333
- // Perform rollback and compaction once if any handler needs it
334
- if (needsRollbackAndCompact ) {
335
- LOG .info ("Performing rollback and compaction before upgrade/downgrade operations" );
336
- HoodieTable table = upgradeDowngradeHelper .getTable (config , context );
337
- UpgradeDowngradeUtils .rollbackFailedWritesAndCompact (table , context , config , upgradeDowngradeHelper ,
338
- HoodieTableType .MERGE_ON_READ .equals (metaClient .getTableType ()),
339
- metaClient .getTableConfig ().getTableVersion ());
338
+ if (requireRollbackAndCompaction ) {
339
+ LOG .info ("Rolling back failed writes and compacting table before upgrade/downgrade" );
340
+ UpgradeDowngradeUtils .rollbackFailedWritesAndCompact (upgradeDowngradeHelper .getTable (config , context ),
341
+ context , config , upgradeDowngradeHelper , HoodieTableType .MERGE_ON_READ .equals (metaClient .getTableType ()), metaClient .getTableConfig ().getTableVersion ());
340
342
}
341
343
}
342
344
}
0 commit comments