@@ -156,6 +156,43 @@ public void run(HoodieTableVersion toVersion, String instantTime) {
156
156
return ;
157
157
}
158
158
159
+ // Check if any handlers in the upgrade/downgrade path need rollback and perform it once before starting
160
+ boolean needsRollback = false ;
161
+ if (fromVersion .versionCode () < toVersion .versionCode ()) {
162
+ // Check upgrade handlers
163
+ HoodieTableVersion checkVersion = fromVersion ;
164
+ while (checkVersion .versionCode () < toVersion .versionCode ()) {
165
+ HoodieTableVersion nextVersion = HoodieTableVersion .fromVersionCode (checkVersion .versionCode () + 1 );
166
+ UpgradeHandler handler = getUpgradeHandlerInstance (checkVersion , nextVersion );
167
+ if (handler .needsRollbackPendingCommitAndCompact ()) {
168
+ needsRollback = true ;
169
+ break ;
170
+ }
171
+ checkVersion = nextVersion ;
172
+ }
173
+ } else {
174
+ // Check downgrade handlers
175
+ HoodieTableVersion checkVersion = fromVersion ;
176
+ while (checkVersion .versionCode () > toVersion .versionCode ()) {
177
+ HoodieTableVersion prevVersion = HoodieTableVersion .fromVersionCode (checkVersion .versionCode () - 1 );
178
+ DowngradeHandler handler = getDowngradeHandlerInstance (checkVersion , prevVersion );
179
+ if (handler .needsRollbackPendingCommitAndCompact ()) {
180
+ needsRollback = true ;
181
+ break ;
182
+ }
183
+ checkVersion = prevVersion ;
184
+ }
185
+ }
186
+
187
+ // Perform rollback once if any handler needs it
188
+ if (needsRollback ) {
189
+ LOG .info ("Performing rollback and compaction before upgrade/downgrade operations" );
190
+ HoodieTable table = upgradeDowngradeHelper .getTable (config , context );
191
+ UpgradeDowngradeUtils .rollbackFailedWritesAndCompact (table , context , config , upgradeDowngradeHelper ,
192
+ HoodieTableType .MERGE_ON_READ .equals (metaClient .getTableType ()),
193
+ metaClient .getTableConfig ().getTableVersion ());
194
+ }
195
+
159
196
// Perform the actual upgrade/downgrade; this has to be idempotent, for now.
160
197
LOG .info ("Attempting to move table from version " + fromVersion + " to " + toVersion );
161
198
Map <ConfigProperty , String > tablePropsToAdd = new Hashtable <>();
@@ -276,4 +313,52 @@ protected Pair<Map<ConfigProperty, String>, List<ConfigProperty>> downgrade(Hood
276
313
throw new HoodieUpgradeDowngradeException (fromVersion .versionCode (), toVersion .versionCode (), false );
277
314
}
278
315
}
316
+
317
+ protected UpgradeHandler getUpgradeHandlerInstance (HoodieTableVersion fromVersion , HoodieTableVersion toVersion ) {
318
+ if (fromVersion == HoodieTableVersion .ZERO && toVersion == HoodieTableVersion .ONE ) {
319
+ return new ZeroToOneUpgradeHandler ();
320
+ } else if (fromVersion == HoodieTableVersion .ONE && toVersion == HoodieTableVersion .TWO ) {
321
+ return new OneToTwoUpgradeHandler ();
322
+ } else if (fromVersion == HoodieTableVersion .TWO && toVersion == HoodieTableVersion .THREE ) {
323
+ return new TwoToThreeUpgradeHandler ();
324
+ } else if (fromVersion == HoodieTableVersion .THREE && toVersion == HoodieTableVersion .FOUR ) {
325
+ return new ThreeToFourUpgradeHandler ();
326
+ } else if (fromVersion == HoodieTableVersion .FOUR && toVersion == HoodieTableVersion .FIVE ) {
327
+ return new FourToFiveUpgradeHandler ();
328
+ } else if (fromVersion == HoodieTableVersion .FIVE && toVersion == HoodieTableVersion .SIX ) {
329
+ return new FiveToSixUpgradeHandler ();
330
+ } else if (fromVersion == HoodieTableVersion .SIX && toVersion == HoodieTableVersion .SEVEN ) {
331
+ return new SixToSevenUpgradeHandler ();
332
+ } else if (fromVersion == HoodieTableVersion .SEVEN && toVersion == HoodieTableVersion .EIGHT ) {
333
+ return new SevenToEightUpgradeHandler ();
334
+ } else if (fromVersion == HoodieTableVersion .EIGHT && toVersion == HoodieTableVersion .NINE ) {
335
+ return new EightToNineUpgradeHandler ();
336
+ } else {
337
+ throw new HoodieUpgradeDowngradeException (fromVersion .versionCode (), toVersion .versionCode (), true );
338
+ }
339
+ }
340
+
341
+ protected DowngradeHandler getDowngradeHandlerInstance (HoodieTableVersion fromVersion , HoodieTableVersion toVersion ) {
342
+ if (fromVersion == HoodieTableVersion .ONE && toVersion == HoodieTableVersion .ZERO ) {
343
+ return new OneToZeroDowngradeHandler ();
344
+ } else if (fromVersion == HoodieTableVersion .TWO && toVersion == HoodieTableVersion .ONE ) {
345
+ return new TwoToOneDowngradeHandler ();
346
+ } else if (fromVersion == HoodieTableVersion .THREE && toVersion == HoodieTableVersion .TWO ) {
347
+ return new ThreeToTwoDowngradeHandler ();
348
+ } else if (fromVersion == HoodieTableVersion .FOUR && toVersion == HoodieTableVersion .THREE ) {
349
+ return new FourToThreeDowngradeHandler ();
350
+ } else if (fromVersion == HoodieTableVersion .FIVE && toVersion == HoodieTableVersion .FOUR ) {
351
+ return new FiveToFourDowngradeHandler ();
352
+ } else if (fromVersion == HoodieTableVersion .SIX && toVersion == HoodieTableVersion .FIVE ) {
353
+ return new SixToFiveDowngradeHandler ();
354
+ } else if (fromVersion == HoodieTableVersion .SEVEN && toVersion == HoodieTableVersion .SIX ) {
355
+ return new SevenToSixDowngradeHandler ();
356
+ } else if (fromVersion == HoodieTableVersion .EIGHT && toVersion == HoodieTableVersion .SEVEN ) {
357
+ return new EightToSevenDowngradeHandler ();
358
+ } else if (fromVersion == HoodieTableVersion .NINE && toVersion == HoodieTableVersion .EIGHT ) {
359
+ return new NineToEightDowngradeHandler ();
360
+ } else {
361
+ throw new HoodieUpgradeDowngradeException (fromVersion .versionCode (), toVersion .versionCode (), false );
362
+ }
363
+ }
279
364
}
0 commit comments