Skip to content

Commit 28332ae

Browse files
the-other-tim-browndanny0405
authored andcommitted
[HUDI-9667] Incorporate completion time into restore logic (#13653)
* fix restore sequence to be in completion reverse order, still requested time comparison for compaction * add a custom comparator for the restore instant sort --------- Co-authored-by: danny0405 <[email protected]>
1 parent 0b28d82 commit 28332ae

File tree

16 files changed

+723
-193
lines changed

16 files changed

+723
-193
lines changed

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java

Lines changed: 37 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import org.apache.hudi.common.model.WriteOperationType;
2929
import org.apache.hudi.common.table.HoodieTableMetaClient;
3030
import org.apache.hudi.common.table.HoodieTableVersion;
31-
import org.apache.hudi.common.table.timeline.CompletionTimeQueryView;
3231
import org.apache.hudi.common.table.timeline.HoodieInstant;
3332
import org.apache.hudi.common.table.timeline.HoodieTimeline;
3433
import org.apache.hudi.common.util.Option;
@@ -122,11 +121,10 @@ public List<HoodieRollbackRequest> getRollbackRequests(HoodieInstant instantToRo
122121
return context.flatMap(partitionPaths, partitionPath -> {
123122
List<HoodieRollbackRequest> hoodieRollbackRequests = new ArrayList<>(partitionPaths.size());
124123

125-
Supplier<List<StoragePathInfo>> filesToDelete = () -> {
124+
Supplier<List<StoragePath>> filesToDelete = () -> {
126125
try {
127126
return fetchFilesFromInstant(instantToRollback, partitionPath, metaClient.getBasePath().toString(), baseFileExtension,
128-
metaClient.getStorage(),
129-
commitMetadataOptional, isCommitMetadataCompleted, tableType);
127+
metaClient.getStorage(), commitMetadataOptional, isCommitMetadataCompleted, tableType, metaClient.getTableConfig().getTableVersion());
130128
} catch (IOException e) {
131129
throw new HoodieIOException("Fetching files to delete error", e);
132130
}
@@ -164,11 +162,10 @@ public List<HoodieRollbackRequest> getRollbackRequests(HoodieInstant instantToRo
164162
} else {
165163
// if this is part of a restore operation, we should rollback/delete entire file slice.
166164
// For table version 6, the files can be directly fetched from the instant to rollback
167-
// For table version 8, the files are computed based on completion time. All files completed after
168-
// the requested time of instant to rollback are included
169-
hoodieRollbackRequests.addAll(getHoodieRollbackRequests(partitionPath, isTableVersionLessThanEight ? filesToDelete.get() :
170-
listAllFilesSinceCommit(instantToRollback.requestedTime(), baseFileExtension, partitionPath,
171-
metaClient)));
165+
// For table version 8, the log files are not directly associated with the base file.
166+
// The rollback will iterate in reverse order based on completion time so the log files completed
167+
// after the compaction will already be queued for removal and therefore, only the files from the compaction commit must be deleted.
168+
hoodieRollbackRequests.addAll(getHoodieRollbackRequests(partitionPath, filesToDelete.get()));
172169
}
173170
break;
174171
case HoodieTimeline.DELTA_COMMIT_ACTION:
@@ -280,32 +277,11 @@ public static List<HoodieRollbackRequest> getRollbackRequestToAppendForVersionSi
280277
return hoodieRollbackRequests;
281278
}
282279

283-
private List<StoragePathInfo> listAllFilesSinceCommit(String commit,
284-
String baseFileExtension,
285-
String partitionPath,
286-
HoodieTableMetaClient metaClient) throws IOException {
287-
LOG.info("Collecting files to be cleaned/rolledback up for path " + partitionPath + " and commit " + commit);
288-
CompletionTimeQueryView completionTimeQueryView = metaClient.getTimelineLayout().getTimelineFactory().createCompletionTimeQueryView(metaClient);
289-
StoragePathFilter filter = (path) -> {
290-
if (path.toString().contains(baseFileExtension)) {
291-
String fileCommitTime = FSUtils.getCommitTime(path.getName());
292-
return compareTimestamps(commit, LESSER_THAN_OR_EQUALS,
293-
fileCommitTime);
294-
} else if (FSUtils.isLogFile(path)) {
295-
String fileCommitTime = FSUtils.getDeltaCommitTimeFromLogPath(path);
296-
return completionTimeQueryView.isSlicedAfterOrOn(commit, fileCommitTime);
297-
}
298-
return false;
299-
};
300-
return metaClient.getStorage()
301-
.listDirectEntries(FSUtils.constructAbsolutePath(config.getBasePath(), partitionPath), filter);
302-
}
303-
304280
@NotNull
305-
private List<HoodieRollbackRequest> getHoodieRollbackRequests(String partitionPath, List<StoragePathInfo> filesToDeletedStatus) {
281+
private List<HoodieRollbackRequest> getHoodieRollbackRequests(String partitionPath, List<StoragePath> filesToDeletedStatus) {
306282
return filesToDeletedStatus.stream()
307283
.map(pathInfo -> {
308-
String dataFileToBeDeleted = pathInfo.getPath().toString();
284+
String dataFileToBeDeleted = pathInfo.toString();
309285
return formatDeletePath(dataFileToBeDeleted);
310286
})
311287
.map(s -> new HoodieRollbackRequest(partitionPath, EMPTY_STRING, EMPTY_STRING, Collections.singletonList(s), Collections.emptyMap()))
@@ -317,56 +293,45 @@ private static String formatDeletePath(String path) {
317293
return path.substring(path.indexOf(":") + 1);
318294
}
319295

320-
private List<StoragePathInfo> listBaseFilesToBeDeleted(String commit,
321-
String basefileExtension,
322-
String partitionPath,
323-
HoodieStorage storage) throws IOException {
324-
LOG.info("Collecting files to be cleaned/rolledback up for path " + partitionPath + " and commit " + commit);
296+
private List<StoragePath> listBaseFilesToBeDeleted(String commit,
297+
String basefileExtension,
298+
String partitionPath,
299+
HoodieStorage storage) throws IOException {
300+
LOG.info("Collecting files to be cleaned/rolledback up for path {} and commit {}", partitionPath, commit);
325301
StoragePathFilter filter = (path) -> {
326302
if (path.toString().contains(basefileExtension)) {
327303
String fileCommitTime = FSUtils.getCommitTime(path.getName());
328304
return commit.equals(fileCommitTime);
329305
}
330306
return false;
331307
};
332-
return storage.listDirectEntries(FSUtils.constructAbsolutePath(config.getBasePath(), partitionPath), filter);
308+
return storage.listDirectEntries(FSUtils.constructAbsolutePath(config.getBasePath(), partitionPath), filter).stream().map(StoragePathInfo::getPath).collect(Collectors.toList());
333309
}
334310

335-
private List<StoragePathInfo> fetchFilesFromInstant(HoodieInstant instantToRollback,
336-
String partitionPath, String basePath,
337-
String baseFileExtension, HoodieStorage storage,
338-
Option<HoodieCommitMetadata> commitMetadataOptional,
339-
Boolean isCommitMetadataCompleted,
340-
HoodieTableType tableType) throws IOException {
341-
// go w/ commit metadata only for COW table. for MOR, we need to get associated log files when commit corresponding to base file is rolledback.
342-
if (isCommitMetadataCompleted && tableType == HoodieTableType.COPY_ON_WRITE) {
343-
return fetchFilesFromCommitMetadata(instantToRollback, partitionPath, basePath, commitMetadataOptional.get(),
344-
baseFileExtension, storage);
311+
private List<StoragePath> fetchFilesFromInstant(HoodieInstant instantToRollback,
312+
String partitionPath, String basePath,
313+
String baseFileExtension, HoodieStorage storage,
314+
Option<HoodieCommitMetadata> commitMetadataOptional,
315+
boolean isCommitMetadataCompleted,
316+
HoodieTableType tableType,
317+
HoodieTableVersion tableVersion) throws IOException {
318+
// for MOR tables with version < 8, listing is required to fetch the log files associated with base files added by this commit.
319+
if (isCommitMetadataCompleted && (tableType == HoodieTableType.COPY_ON_WRITE || tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT))) {
320+
return fetchFilesFromCommitMetadata(instantToRollback, partitionPath, basePath, commitMetadataOptional.get(), baseFileExtension);
345321
} else {
346322
return fetchFilesFromListFiles(instantToRollback, partitionPath, basePath, baseFileExtension, storage);
347323
}
348324
}
349325

350-
private List<StoragePathInfo> fetchFilesFromCommitMetadata(HoodieInstant instantToRollback,
351-
String partitionPath,
352-
String basePath,
353-
HoodieCommitMetadata commitMetadata,
354-
String baseFileExtension,
355-
HoodieStorage storage) throws IOException {
326+
private List<StoragePath> fetchFilesFromCommitMetadata(HoodieInstant instantToRollback,
327+
String partitionPath,
328+
String basePath,
329+
HoodieCommitMetadata commitMetadata,
330+
String baseFileExtension) {
356331
StoragePathFilter pathFilter = getPathFilter(baseFileExtension,
357332
instantToRollback.requestedTime());
358-
List<StoragePath> filePaths = getFilesFromCommitMetadata(basePath, commitMetadata, partitionPath)
359-
.filter(entry -> {
360-
try {
361-
return storage.exists(entry);
362-
} catch (Exception e) {
363-
LOG.error("Exists check failed for " + entry.toString(), e);
364-
}
365-
// if any Exception is thrown, do not ignore. let's try to add the file of interest to be deleted. we can't miss any files to be rolled back.
366-
return true;
367-
}).collect(Collectors.toList());
368-
369-
return storage.listDirectEntries(filePaths, pathFilter);
333+
return getFilesFromCommitMetadata(basePath, commitMetadata, partitionPath)
334+
.filter(pathFilter::accept).collect(Collectors.toList());
370335
}
371336

372337
/**
@@ -379,15 +344,15 @@ private List<StoragePathInfo> fetchFilesFromCommitMetadata(HoodieInstant instant
379344
* @return
380345
* @throws IOException
381346
*/
382-
private List<StoragePathInfo> fetchFilesFromListFiles(HoodieInstant instantToRollback,
383-
String partitionPath,
384-
String basePath,
385-
String baseFileExtension,
386-
HoodieStorage storage) throws IOException {
347+
private List<StoragePath> fetchFilesFromListFiles(HoodieInstant instantToRollback,
348+
String partitionPath,
349+
String basePath,
350+
String baseFileExtension,
351+
HoodieStorage storage) throws IOException {
387352
StoragePathFilter pathFilter = getPathFilter(baseFileExtension, instantToRollback.requestedTime());
388353
List<StoragePath> filePaths = listFilesToBeDeleted(basePath, partitionPath);
389354

390-
return storage.listDirectEntries(filePaths, pathFilter);
355+
return storage.listDirectEntries(filePaths, pathFilter).stream().map(StoragePathInfo::getPath).collect(Collectors.toList());
391356
}
392357

393358
private Boolean checkCommitMetadataCompleted(HoodieInstant instantToRollback,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.hudi.table.action.rollback;
21+
22+
import org.apache.hudi.common.model.HoodieTableType;
23+
import org.apache.hudi.common.table.HoodieTableMetaClient;
24+
import org.apache.hudi.common.table.timeline.HoodieInstant;
25+
import org.apache.hudi.common.table.timeline.HoodieTimeline;
26+
import org.apache.hudi.common.table.timeline.InstantComparator;
27+
28+
import java.util.Comparator;
29+
import java.util.Set;
30+
import java.util.stream.Collectors;
31+
import java.util.stream.Stream;
32+
33+
/**
34+
* Comparator specifically for computing the instant order when computing the instants to rollback as part of a restore operation.
35+
* The order relies on the completion time of the instants, except for compaction instants on Merge-on-Read tables. These instants may be completed after a
36+
* delta-commit but should still be considered earlier since the log files from the next delta-commit become associated with the base files from this compaction.
37+
* For example if we have the following sequence of commits (DC=delta-commit, C=compaction):
38+
* ... DC-10 starts -> DC-10 ends -> compaction-1 starts -> delta-commit-11 starts -> delta-commit-11 ends -> compaction-1 ends ...
39+
* If we restore to delta-commit-11, we do not roll back the compaction-1 instant, even though it finished after delta-commit-11.
40+
*/
41+
public class RestoreInstantComparatorFactory {
42+
private static final Set<String> COMPACTION_ACTIONS = Stream.of(HoodieTimeline.COMPACTION_ACTION, HoodieTimeline.COMMIT_ACTION).collect(Collectors.toSet());
43+
44+
public static Comparator<HoodieInstant> createComparator(HoodieTableMetaClient metaClient) {
45+
InstantComparator instantComparator = metaClient.getTimelineLayout().getInstantComparator();
46+
HoodieTableType tableType = metaClient.getTableType();
47+
if (tableType == HoodieTableType.COPY_ON_WRITE) {
48+
return (o1, o2) -> instantComparator.completionTimeOrderedComparator().compare(o1, o2);
49+
} else {
50+
return (o1, o2) -> {
51+
// Do to special handling of compaction instants, we need to use requested time based comparator for compaction instants
52+
// but completion time based comparator for others
53+
if (COMPACTION_ACTIONS.contains(o1.getAction()) || COMPACTION_ACTIONS.contains(o2.getAction())) {
54+
return instantComparator.requestedTimeOrderedComparator().compare(o1, o2);
55+
} else {
56+
return instantComparator.completionTimeOrderedComparator().compare(o1, o2);
57+
}
58+
};
59+
}
60+
}
61+
}

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RestorePlanActionExecutor.java

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,30 +22,34 @@
2222
import org.apache.hudi.avro.model.HoodieInstantInfo;
2323
import org.apache.hudi.avro.model.HoodieRestorePlan;
2424
import org.apache.hudi.common.engine.HoodieEngineContext;
25+
import org.apache.hudi.common.table.HoodieTableMetaClient;
26+
import org.apache.hudi.common.table.timeline.CompletionTimeQueryView;
2527
import org.apache.hudi.common.table.timeline.HoodieInstant;
2628
import org.apache.hudi.common.table.timeline.HoodieTimeline;
2729
import org.apache.hudi.common.util.ClusteringUtils;
2830
import org.apache.hudi.common.util.Option;
2931
import org.apache.hudi.config.HoodieWriteConfig;
30-
import org.apache.hudi.exception.HoodieIOException;
32+
import org.apache.hudi.exception.HoodieException;
3133
import org.apache.hudi.table.HoodieTable;
3234
import org.apache.hudi.table.action.BaseActionExecutor;
3335

3436
import org.slf4j.Logger;
3537
import org.slf4j.LoggerFactory;
3638

39+
import java.util.Comparator;
3740
import java.util.List;
41+
import java.util.function.Predicate;
3842
import java.util.stream.Collectors;
3943
import java.util.stream.Stream;
4044

4145
import static org.apache.hudi.common.table.timeline.InstantComparison.GREATER_THAN;
46+
import static org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP;
4247

4348
/**
4449
* Plans the restore action and add a restore.requested meta file to timeline.
4550
*/
4651
public class RestorePlanActionExecutor<T, I, K, O> extends BaseActionExecutor<T, I, K, O, Option<HoodieRestorePlan>> {
4752

48-
4953
private static final Logger LOG = LoggerFactory.getLogger(RestorePlanActionExecutor.class);
5054

5155
public static final Integer RESTORE_PLAN_VERSION_1 = 1;
@@ -65,7 +69,8 @@ public RestorePlanActionExecutor(HoodieEngineContext context,
6569
@Override
6670
public Option<HoodieRestorePlan> execute() {
6771
final HoodieInstant restoreInstant = instantGenerator.createNewInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.RESTORE_ACTION, instantTime);
68-
try {
72+
HoodieTableMetaClient metaClient = table.getMetaClient();
73+
try (CompletionTimeQueryView completionTimeQueryView = metaClient.getTimelineLayout().getTimelineFactory().createCompletionTimeQueryView(metaClient)) {
6974
// Get all the commits on the timeline after the provided commit time
7075
// rollback pending clustering instants first before other instants (See HUDI-3362)
7176
List<HoodieInstant> pendingClusteringInstantsToRollback = table.getActiveTimeline().filterPendingReplaceOrClusteringTimeline()
@@ -75,10 +80,14 @@ public Option<HoodieRestorePlan> execute() {
7580
.filter(instant -> GREATER_THAN.test(instant.requestedTime(), savepointToRestoreTimestamp))
7681
.collect(Collectors.toList());
7782

78-
// Get all the commits on the timeline after the provided commit time
83+
// Get all the commits on the timeline after the provided commit's completion time unless it is the SOLO_COMMIT_TIMESTAMP which indicates there are no commits for the table
84+
String completionTime = savepointToRestoreTimestamp.equals(SOLO_COMMIT_TIMESTAMP) ? savepointToRestoreTimestamp : completionTimeQueryView.getCompletionTime(savepointToRestoreTimestamp)
85+
.orElseThrow(() -> new HoodieException("Unable to find completion time for instant: " + savepointToRestoreTimestamp));
86+
87+
Predicate<HoodieInstant> instantFilter = constructInstantFilter(completionTime);
7988
List<HoodieInstant> commitInstantsToRollback = table.getActiveTimeline().getWriteTimeline()
80-
.getReverseOrderedInstants()
81-
.filter(instant -> GREATER_THAN.test(instant.requestedTime(), savepointToRestoreTimestamp))
89+
.getReverseOrderedInstantsByCompletionTime()
90+
.filter(instantFilter)
8291
.filter(instant -> !pendingClusteringInstantsToRollback.contains(instant))
8392
.collect(Collectors.toList());
8493

@@ -90,11 +99,17 @@ public Option<HoodieRestorePlan> execute() {
9099
HoodieRestorePlan restorePlan = new HoodieRestorePlan(instantsToRollback, LATEST_RESTORE_PLAN_VERSION, savepointToRestoreTimestamp);
91100
table.getActiveTimeline().saveToRestoreRequested(restoreInstant, restorePlan);
92101
table.getMetaClient().reloadActiveTimeline();
93-
LOG.info("Requesting Restore with instant time " + restoreInstant);
102+
LOG.info("Requesting Restore with instant time {}", restoreInstant);
94103
return Option.of(restorePlan);
95-
} catch (HoodieIOException e) {
96-
LOG.error("Got exception when saving restore requested file", e);
97-
throw e;
104+
} catch (Exception e) {
105+
throw new HoodieException("Unable to restore to instant: " + savepointToRestoreTimestamp, e);
98106
}
99107
}
108+
109+
private Predicate<HoodieInstant> constructInstantFilter(String completionTime) {
110+
HoodieInstant instantToRestoreTo = table.getMetaClient().getInstantGenerator()
111+
.createNewInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.RESTORE_ACTION, savepointToRestoreTimestamp, completionTime);
112+
Comparator<HoodieInstant> comparator = RestoreInstantComparatorFactory.createComparator(table.getMetaClient());
113+
return instant -> comparator.compare(instant, instantToRestoreTo) > 0;
114+
}
100115
}

0 commit comments

Comments
 (0)