-
Notifications
You must be signed in to change notification settings - Fork 2.4k
[HUDI-9667] Incorporate completion time into restore logic #13653
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-9667] Incorporate completion time into restore logic #13653
Conversation
…n TestSavepointRestoreMergeOnRead
return instantComparator.completionTimeOrderedComparator().compare(o1, o2); | ||
} else { | ||
// Do to special handling of compaction instants, we need to use requested time based comparator for compaction instants but completion time based comparator for others | ||
if (o1.getAction().equals(HoodieTimeline.COMMIT_ACTION) || o2.getAction().equals(HoodieTimeline.COMMIT_ACTION)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is true only for MOR table. I didn't quite get this part, the timeline filterCompletedAndCompactionInstants
can include pending compactions actually so the action could be COMPACTION
for mor table, for completed compations, the action is COMMIT_ACTION
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The issue here is around the special handling for compactions on v6 tables in the planner. The planner will retain a compaction that completed after the instant we are restoring to if the compaction started before the instant we are restoring to. The "last" instant on the timeline would be compaction in this case if we use completion time ordering and then the assertion will fail since the restore was targeting a delta commit that started after the compaction but finished before the compaction completed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes. this is not intuitive to understand. can we add some simple illustration.
t1.dc,.... t2.dc, t11.compaction.req, t12.dc, t11.commit, t13.dc ... dc15.
If we are looking to restore to t12, and as we ordering the commits to rollback based on completion time, we would rollback t11 compaction as well (since t11 completed after t12 completed). but we can't do that. and hence the special handling.
but trying to understand why this special handling is not required for table 8 and above.
how are we handling this case for v8 table w/o special handling.
From https://github.com/apache/hudi/pull/13653/files#r2246930654, I only see we account for completed instant time or requested instant time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In v8, the delta commit is not directly tied to the base file commit time so that is why we don't require this. In v8 if we remove the compaction in the timeline described above, we can still safely query the table.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In v8, the delta commit is not directly tied to the base file commit time so that is why we don't require this. In v8 if we remove the compaction in the timeline described above, we can still safely query the table.
This is not true, our assumption for file slice is the newer file slice will cover all the dataset in history, if we restore the compaction base files, the log files in this file silce will just be kept in the file slice and there is no base file to merge for read, then we would got a data loss(unless you keep the requested compaction metadata file on the timeline but it seems not the case).
For example we have
t1.dc.req, t1.dc, t2.dc.req, t2.dc, t3.compaction.req, t4.dc.req, t4.dc, t5.dc.req, t5.dc, t3.commit.
Now we want to restore to t5, if we also restore t3.commit for V8 table, the file slice that includes t4 logs will only have logs from t4, the history dataset in the compaction would be lost.
So we should always use requested time comparison for compactions regardless of the table versions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't the file slice computed at runtime for v8? The slice would just use t1 as the base and the log files would all be present preventing data loss.
Even if the above is accurate, if we want to just keep it consistent between versions for ease of operation, I am fine with that as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The slice would just use t1 as the base and the log files would all be present preventing data loss.
This is true, but the data loss comes from the deleted base file which contains all the history dataset, not very related with file slicing though, the table before V8 also has correct file slicing because the file name contains the base instant time.
HoodieTableType tableType, | ||
HoodieTableVersion tableVersion) throws IOException { | ||
// for MOR tables with version < 8, listing is required to fetch the log files associated with base files added by this commit. | ||
if (isCommitMetadataCompleted && (tableType == HoodieTableType.COPY_ON_WRITE || tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice catch~
}).collect(Collectors.toList()); | ||
|
||
return storage.listDirectEntries(filePaths, pathFilter); | ||
return getFilesFromCommitMetadata(basePath, commitMetadata, partitionPath) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The file existence check seems unnecessary because we can delete a file that does not exist, cc @nsivabalan for the background of this check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a test for this as well and confirmed that it handles the missing file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah. our rollback execution should be fine even if file does not exist. we are good to remove this.
@@ -75,10 +80,19 @@ public Option<HoodieRestorePlan> execute() { | |||
.filter(instant -> GREATER_THAN.test(instant.requestedTime(), savepointToRestoreTimestamp)) | |||
.collect(Collectors.toList()); | |||
|
|||
// Get all the commits on the timeline after the provided commit time | |||
// Get all the commits on the timeline after the provided commit's completion time unless it is the SOLO_COMMIT_TIMESTAMP which indicates there are no commits for the table | |||
String completionTime = savepointToRestoreTimestamp.equals(SOLO_COMMIT_TIMESTAMP) ? savepointToRestoreTimestamp : completionTimeQueryView.getCompletionTime(savepointToRestoreTimestamp) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in which case we restore to SOLO_COMMIT_TIMESTAMP
? is it valid in production?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was only occurring in the upgrade/downgrade testing on tables with no commits
.getReverseOrderedInstantsByCompletionTime() | ||
.filter(instant -> { | ||
// For compaction on tables with version less than 8, if the compaction started before the target of the restore, it must not be removed since the log files will reference this commit | ||
if (instant.getAction().equals(HoodieTimeline.COMMIT_ACTION) && !metaClient.getTableConfig().getTableVersion().greaterThanOrEquals(HoodieTableVersion.EIGHT) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we define a filter func out of the stream loop because the table version and table type is kind of deterministic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
!greaterThanOrEquals -> lessThan
@@ -60,7 +60,7 @@ public interface CompletionTimeQueryView extends AutoCloseable { | |||
* | |||
* @return The completion time if the instant finished or empty if it is still pending. | |||
*/ | |||
Option<String> getCompletionTime(String beginTime); | |||
Option<String> getCompletionTime(String instantTime); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or requestedTime if you like
}).collect(Collectors.toList()); | ||
|
||
return storage.listDirectEntries(filePaths, pathFilter); | ||
return getFilesFromCommitMetadata(basePath, commitMetadata, partitionPath) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah. our rollback execution should be fine even if file does not exist. we are good to remove this.
HoodieTableType tableType, | ||
HoodieTableVersion tableVersion) throws IOException { | ||
// for MOR tables with version < 8, listing is required to fetch the log files associated with base files added by this commit. | ||
if (isCommitMetadataCompleted && (tableType == HoodieTableType.COPY_ON_WRITE || tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch. this can only work in table version 8 and above, if we ordering the commits based on completion time.
guess that fix simplified this.
@@ -49,13 +53,15 @@ public static void deleteSavepoint(HoodieTable table, String savepointTime) { | |||
public static void validateSavepointRestore(HoodieTable table, String savepointTime) { | |||
// Make sure the restore was successful | |||
table.getMetaClient().reloadActiveTimeline(); | |||
Option<HoodieInstant> lastInstant = table.getActiveTimeline() | |||
Option<HoodieInstant> lastInstant = Option.fromJavaOptional(table.getActiveTimeline() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor. can we add java docs to L52 to call out what are we looking to validate
return instantComparator.completionTimeOrderedComparator().compare(o1, o2); | ||
} else { | ||
// Do to special handling of compaction instants, we need to use requested time based comparator for compaction instants but completion time based comparator for others | ||
if (o1.getAction().equals(HoodieTimeline.COMMIT_ACTION) || o2.getAction().equals(HoodieTimeline.COMMIT_ACTION)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes. this is not intuitive to understand. can we add some simple illustration.
t1.dc,.... t2.dc, t11.compaction.req, t12.dc, t11.commit, t13.dc ... dc15.
If we are looking to restore to t12, and as we ordering the commits to rollback based on completion time, we would rollback t11 compaction as well (since t11 completed after t12 completed). but we can't do that. and hence the special handling.
but trying to understand why this special handling is not required for table 8 and above.
how are we handling this case for v8 table w/o special handling.
From https://github.com/apache/hudi/pull/13653/files#r2246930654, I only see we account for completed instant time or requested instant time.
@@ -79,6 +79,15 @@ public CompletionTimeBasedComparator(Map<String, String> comparableActions) { | |||
|
|||
@Override | |||
public int compare(HoodieInstant instant1, HoodieInstant instant2) { | |||
if (instant1.getCompletionTime() == null && instant2.getCompletionTime() != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did we add UTs for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just added one
} | ||
} | ||
|
||
private void upsertBatch(SparkRDDWriteClient client, List<HoodieRecord> baseRecordsToUpdate) throws IOException { | ||
@Test | ||
void rollbackWithAsyncServices_compactionCompletesDuringCommit() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we parametrize this for version 6 and 8 ?
w/ all special handling we are doing in restore, its worth adding tests for both table versions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In v6 you cannot schedule compaction while there is an in-flight delta commit so it is not a valid case
} | ||
|
||
@Test | ||
void rollbackWithAsyncServices_commitCompletesDuringCompaction() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here. parametrize w/ both 6 and 8
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly here, the compaction cannot be scheduled for v6
} | ||
} | ||
|
||
private void validateFilesMetadata(HoodieWriteConfig writeConfig) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this validation def gives us good confidence now.
.getReverseOrderedInstants() | ||
.filter(instant -> GREATER_THAN.test(instant.requestedTime(), savepointToRestoreTimestamp)) | ||
.getReverseOrderedInstantsByCompletionTime() | ||
.filter(constructInstantFilter(metaClient.getTableConfig(), completionTime)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the filter creation can be moved to line 88.
} | ||
} | ||
|
||
private Predicate<HoodieInstant> constructInstantFilter(HoodieTableConfig tableConfig, String completionTime) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can reuse this comparactor part with SavepointHelpers.validateSavepointRestore
so that we can maintain this intelligenable logic in one place and doc it well with some illustrations.
) * fix restore sequence to be in completion reverse order, still requested time comparison for compaction * add a custom comparator for the restore instant sort --------- Co-authored-by: danny0405 <[email protected]>
) * fix restore sequence to be in completion reverse order, still requested time comparison for compaction * add a custom comparator for the restore instant sort --------- Co-authored-by: danny0405 <[email protected]>
) * fix restore sequence to be in completion reverse order, still requested time comparison for compaction * add a custom comparator for the restore instant sort --------- Co-authored-by: danny0405 <[email protected]>
) * fix restore sequence to be in completion reverse order, still requested time comparison for compaction * add a custom comparator for the restore instant sort --------- Co-authored-by: danny0405 <[email protected]>
Change Logs
Restore planning:
Test Updates:
Impact
Risk level (write none, low medium or high below)
Low
Documentation Update
Describe any necessary documentation update if there is any new feature, config, or user-facing change. If not, put "none".
ticket number here and follow the instruction to make
changes to the website.
Contributor's checklist