Skip to content
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
270d4e9
add debug logs, start adding tests
the-other-tim-brown Jul 28, 2025
3c3f031
add more test cases, add helpers, update comparator
the-other-tim-brown Jul 28, 2025
4148a09
get baseline test working, start removing special handling
the-other-tim-brown Jul 28, 2025
7072680
only completed commits
the-other-tim-brown Jul 28, 2025
14ef59d
make sure incomplete commits are included in plan
the-other-tim-brown Jul 28, 2025
12798ab
fix test setup, reduce num commits
the-other-tim-brown Jul 29, 2025
a81452e
start reading data as part of test
the-other-tim-brown Jul 29, 2025
a7a17a2
fix minor test issuses
the-other-tim-brown Jul 29, 2025
f6d4d81
all table v8+ pass
the-other-tim-brown Jul 30, 2025
2ab328d
get all v6 flows working, update stats index test base to use restore…
the-other-tim-brown Jul 30, 2025
65ef8d8
fix RLI test setup to restore to before clean commits, expand cases i…
the-other-tim-brown Jul 30, 2025
26b8db8
avoid file listing if using commit metadata
the-other-tim-brown Jul 30, 2025
7695785
handle clustering with log files added to clustered FG
the-other-tim-brown Jul 30, 2025
69f8319
fix style
the-other-tim-brown Jul 30, 2025
785169e
handle solo commit timestamp in restore
the-other-tim-brown Jul 30, 2025
54843ed
add unit test for comparator changes, address other pr feedback
the-other-tim-brown Aug 1, 2025
416bec7
add custom comparator with documentation, reuse that in both planning…
the-other-tim-brown Aug 4, 2025
a739ec1
fix style
the-other-tim-brown Aug 4, 2025
ab9a9da
add license to new file
the-other-tim-brown Aug 4, 2025
06b2878
fix the rollback instant comparator to be a factory
danny0405 Aug 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.timeline.CompletionTimeQueryView;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
Expand Down Expand Up @@ -122,11 +121,10 @@ public List<HoodieRollbackRequest> getRollbackRequests(HoodieInstant instantToRo
return context.flatMap(partitionPaths, partitionPath -> {
List<HoodieRollbackRequest> hoodieRollbackRequests = new ArrayList<>(partitionPaths.size());

Supplier<List<StoragePathInfo>> filesToDelete = () -> {
Supplier<List<StoragePath>> filesToDelete = () -> {
try {
return fetchFilesFromInstant(instantToRollback, partitionPath, metaClient.getBasePath().toString(), baseFileExtension,
metaClient.getStorage(),
commitMetadataOptional, isCommitMetadataCompleted, tableType);
metaClient.getStorage(), commitMetadataOptional, isCommitMetadataCompleted, tableType, metaClient.getTableConfig().getTableVersion());
} catch (IOException e) {
throw new HoodieIOException("Fetching files to delete error", e);
}
Expand Down Expand Up @@ -164,11 +162,10 @@ public List<HoodieRollbackRequest> getRollbackRequests(HoodieInstant instantToRo
} else {
// if this is part of a restore operation, we should rollback/delete entire file slice.
// For table version 6, the files can be directly fetched from the instant to rollback
// For table version 8, the files are computed based on completion time. All files completed after
// the requested time of instant to rollback are included
hoodieRollbackRequests.addAll(getHoodieRollbackRequests(partitionPath, isTableVersionLessThanEight ? filesToDelete.get() :
listAllFilesSinceCommit(instantToRollback.requestedTime(), baseFileExtension, partitionPath,
metaClient)));
// For table version 8, the log files are not directly associated with the base file.
// The rollback will iterate in reverse order based on completion time so the log files completed
// after the compaction will already be queued for removal and therefore, only the files from the compaction commit must be deleted.
hoodieRollbackRequests.addAll(getHoodieRollbackRequests(partitionPath, filesToDelete.get()));
}
break;
case HoodieTimeline.DELTA_COMMIT_ACTION:
Expand Down Expand Up @@ -280,32 +277,11 @@ public static List<HoodieRollbackRequest> getRollbackRequestToAppendForVersionSi
return hoodieRollbackRequests;
}

private List<StoragePathInfo> listAllFilesSinceCommit(String commit,
String baseFileExtension,
String partitionPath,
HoodieTableMetaClient metaClient) throws IOException {
LOG.info("Collecting files to be cleaned/rolledback up for path " + partitionPath + " and commit " + commit);
CompletionTimeQueryView completionTimeQueryView = metaClient.getTableFormat().getTimelineFactory().createCompletionTimeQueryView(metaClient);
StoragePathFilter filter = (path) -> {
if (path.toString().contains(baseFileExtension)) {
String fileCommitTime = FSUtils.getCommitTime(path.getName());
return compareTimestamps(commit, LESSER_THAN_OR_EQUALS,
fileCommitTime);
} else if (FSUtils.isLogFile(path)) {
String fileCommitTime = FSUtils.getDeltaCommitTimeFromLogPath(path);
return completionTimeQueryView.isSlicedAfterOrOn(commit, fileCommitTime);
}
return false;
};
return metaClient.getStorage()
.listDirectEntries(FSUtils.constructAbsolutePath(config.getBasePath(), partitionPath), filter);
}

@NotNull
private List<HoodieRollbackRequest> getHoodieRollbackRequests(String partitionPath, List<StoragePathInfo> filesToDeletedStatus) {
private List<HoodieRollbackRequest> getHoodieRollbackRequests(String partitionPath, List<StoragePath> filesToDeletedStatus) {
return filesToDeletedStatus.stream()
.map(pathInfo -> {
String dataFileToBeDeleted = pathInfo.getPath().toString();
String dataFileToBeDeleted = pathInfo.toString();
return formatDeletePath(dataFileToBeDeleted);
})
.map(s -> new HoodieRollbackRequest(partitionPath, EMPTY_STRING, EMPTY_STRING, Collections.singletonList(s), Collections.emptyMap()))
Expand All @@ -317,56 +293,45 @@ private static String formatDeletePath(String path) {
return path.substring(path.indexOf(":") + 1);
}

private List<StoragePathInfo> listBaseFilesToBeDeleted(String commit,
String basefileExtension,
String partitionPath,
HoodieStorage storage) throws IOException {
LOG.info("Collecting files to be cleaned/rolledback up for path " + partitionPath + " and commit " + commit);
private List<StoragePath> listBaseFilesToBeDeleted(String commit,
String basefileExtension,
String partitionPath,
HoodieStorage storage) throws IOException {
LOG.info("Collecting files to be cleaned/rolledback up for path {} and commit {}", partitionPath, commit);
StoragePathFilter filter = (path) -> {
if (path.toString().contains(basefileExtension)) {
String fileCommitTime = FSUtils.getCommitTime(path.getName());
return commit.equals(fileCommitTime);
}
return false;
};
return storage.listDirectEntries(FSUtils.constructAbsolutePath(config.getBasePath(), partitionPath), filter);
return storage.listDirectEntries(FSUtils.constructAbsolutePath(config.getBasePath(), partitionPath), filter).stream().map(StoragePathInfo::getPath).collect(Collectors.toList());
}

private List<StoragePathInfo> fetchFilesFromInstant(HoodieInstant instantToRollback,
String partitionPath, String basePath,
String baseFileExtension, HoodieStorage storage,
Option<HoodieCommitMetadata> commitMetadataOptional,
Boolean isCommitMetadataCompleted,
HoodieTableType tableType) throws IOException {
// go w/ commit metadata only for COW table. for MOR, we need to get associated log files when commit corresponding to base file is rolledback.
if (isCommitMetadataCompleted && tableType == HoodieTableType.COPY_ON_WRITE) {
return fetchFilesFromCommitMetadata(instantToRollback, partitionPath, basePath, commitMetadataOptional.get(),
baseFileExtension, storage);
private List<StoragePath> fetchFilesFromInstant(HoodieInstant instantToRollback,
String partitionPath, String basePath,
String baseFileExtension, HoodieStorage storage,
Option<HoodieCommitMetadata> commitMetadataOptional,
boolean isCommitMetadataCompleted,
HoodieTableType tableType,
HoodieTableVersion tableVersion) throws IOException {
// for MOR tables with version < 8, listing is required to fetch the log files associated with base files added by this commit.
if (isCommitMetadataCompleted && (tableType == HoodieTableType.COPY_ON_WRITE || tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT))) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice catch~

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch. this can only work in table version 8 and above, if we ordering the commits based on completion time.
guess that fix simplified this.

return fetchFilesFromCommitMetadata(instantToRollback, partitionPath, basePath, commitMetadataOptional.get(), baseFileExtension);
} else {
return fetchFilesFromListFiles(instantToRollback, partitionPath, basePath, baseFileExtension, storage);
}
}

private List<StoragePathInfo> fetchFilesFromCommitMetadata(HoodieInstant instantToRollback,
String partitionPath,
String basePath,
HoodieCommitMetadata commitMetadata,
String baseFileExtension,
HoodieStorage storage) throws IOException {
private List<StoragePath> fetchFilesFromCommitMetadata(HoodieInstant instantToRollback,
String partitionPath,
String basePath,
HoodieCommitMetadata commitMetadata,
String baseFileExtension) {
StoragePathFilter pathFilter = getPathFilter(baseFileExtension,
instantToRollback.requestedTime());
List<StoragePath> filePaths = getFilesFromCommitMetadata(basePath, commitMetadata, partitionPath)
.filter(entry -> {
try {
return storage.exists(entry);
} catch (Exception e) {
LOG.error("Exists check failed for " + entry.toString(), e);
}
// if any Exception is thrown, do not ignore. let's try to add the file of interest to be deleted. we can't miss any files to be rolled back.
return true;
}).collect(Collectors.toList());

return storage.listDirectEntries(filePaths, pathFilter);
return getFilesFromCommitMetadata(basePath, commitMetadata, partitionPath)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The file existence check seems unnecessary because we can delete a file that does not exist, cc @nsivabalan for the background of this check.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a test for this as well and confirmed that it handles the missing file

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah. our rollback execution should be fine even if file does not exist. we are good to remove this.

.filter(pathFilter::accept).collect(Collectors.toList());
}

/**
Expand All @@ -379,15 +344,15 @@ private List<StoragePathInfo> fetchFilesFromCommitMetadata(HoodieInstant instant
* @return
* @throws IOException
*/
private List<StoragePathInfo> fetchFilesFromListFiles(HoodieInstant instantToRollback,
String partitionPath,
String basePath,
String baseFileExtension,
HoodieStorage storage) throws IOException {
private List<StoragePath> fetchFilesFromListFiles(HoodieInstant instantToRollback,
String partitionPath,
String basePath,
String baseFileExtension,
HoodieStorage storage) throws IOException {
StoragePathFilter pathFilter = getPathFilter(baseFileExtension, instantToRollback.requestedTime());
List<StoragePath> filePaths = listFilesToBeDeleted(basePath, partitionPath);

return storage.listDirectEntries(filePaths, pathFilter);
return storage.listDirectEntries(filePaths, pathFilter).stream().map(StoragePathInfo::getPath).collect(Collectors.toList());
}

private Boolean checkCommitMetadataCompleted(HoodieInstant instantToRollback,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,36 @@
import org.apache.hudi.avro.model.HoodieInstantInfo;
import org.apache.hudi.avro.model.HoodieRestorePlan;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.timeline.CompletionTimeQueryView;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.BaseActionExecutor;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.hudi.common.table.timeline.InstantComparison.GREATER_THAN;
import static org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP;

/**
* Plans the restore action and add a restore.requested meta file to timeline.
*/
public class RestorePlanActionExecutor<T, I, K, O> extends BaseActionExecutor<T, I, K, O, Option<HoodieRestorePlan>> {


private static final Logger LOG = LoggerFactory.getLogger(RestorePlanActionExecutor.class);

public static final Integer RESTORE_PLAN_VERSION_1 = 1;
Expand All @@ -65,7 +71,8 @@ public RestorePlanActionExecutor(HoodieEngineContext context,
@Override
public Option<HoodieRestorePlan> execute() {
final HoodieInstant restoreInstant = instantGenerator.createNewInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.RESTORE_ACTION, instantTime);
try {
HoodieTableMetaClient metaClient = table.getMetaClient();
try (CompletionTimeQueryView completionTimeQueryView = metaClient.getTableFormat().getTimelineFactory().createCompletionTimeQueryView(metaClient)) {
// Get all the commits on the timeline after the provided commit time
// rollback pending clustering instants first before other instants (See HUDI-3362)
List<HoodieInstant> pendingClusteringInstantsToRollback = table.getActiveTimeline().filterPendingReplaceOrClusteringTimeline()
Expand All @@ -75,10 +82,13 @@ public Option<HoodieRestorePlan> execute() {
.filter(instant -> GREATER_THAN.test(instant.requestedTime(), savepointToRestoreTimestamp))
.collect(Collectors.toList());

// Get all the commits on the timeline after the provided commit time
// Get all the commits on the timeline after the provided commit's completion time unless it is the SOLO_COMMIT_TIMESTAMP which indicates there are no commits for the table
String completionTime = savepointToRestoreTimestamp.equals(SOLO_COMMIT_TIMESTAMP) ? savepointToRestoreTimestamp : completionTimeQueryView.getCompletionTime(savepointToRestoreTimestamp)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in which case we restore to SOLO_COMMIT_TIMESTAMP? is it valid in production?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was only occurring in the upgrade/downgrade testing on tables with no commits

.orElseThrow(() -> new HoodieException("Unable to find completion time for instant: " + savepointToRestoreTimestamp));

List<HoodieInstant> commitInstantsToRollback = table.getActiveTimeline().getWriteTimeline()
.getReverseOrderedInstants()
.filter(instant -> GREATER_THAN.test(instant.requestedTime(), savepointToRestoreTimestamp))
.getReverseOrderedInstantsByCompletionTime()
.filter(constructInstantFilter(metaClient.getTableConfig(), completionTime))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the filter creation can be moved to line 88.

.filter(instant -> !pendingClusteringInstantsToRollback.contains(instant))
.collect(Collectors.toList());

Expand All @@ -90,11 +100,23 @@ public Option<HoodieRestorePlan> execute() {
HoodieRestorePlan restorePlan = new HoodieRestorePlan(instantsToRollback, LATEST_RESTORE_PLAN_VERSION, savepointToRestoreTimestamp);
table.getActiveTimeline().saveToRestoreRequested(restoreInstant, restorePlan);
table.getMetaClient().reloadActiveTimeline();
LOG.info("Requesting Restore with instant time " + restoreInstant);
LOG.info("Requesting Restore with instant time {}", restoreInstant);
return Option.of(restorePlan);
} catch (HoodieIOException e) {
LOG.error("Got exception when saving restore requested file", e);
throw e;
} catch (Exception e) {
throw new HoodieException("Unable to restore to instant: " + savepointToRestoreTimestamp, e);
}
}

private Predicate<HoodieInstant> constructInstantFilter(HoodieTableConfig tableConfig, String completionTime) {
Copy link
Contributor

@danny0405 danny0405 Aug 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can reuse this comparactor part with SavepointHelpers.validateSavepointRestore so that we can maintain this intelligenable logic in one place and doc it well with some illustrations.

if (tableConfig.getTableVersion().greaterThanOrEquals(HoodieTableVersion.EIGHT) || tableConfig.getTableType() == HoodieTableType.COPY_ON_WRITE) {
return instant -> GREATER_THAN.test(instant.isCompleted() ? instant.getCompletionTime() : instant.requestedTime(), completionTime);
}
// For compaction on tables with version less than 8, if the compaction started before the target of the restore, it must not be removed since the log files will reference this commit
return instant -> {
if (instant.getAction().equals(HoodieTimeline.COMMIT_ACTION) || instant.getAction().equals(HoodieTimeline.COMPACTION_ACTION)) {
return GREATER_THAN.test(instant.requestedTime(), savepointToRestoreTimestamp);
}
return GREATER_THAN.test(instant.isCompleted() ? instant.getCompletionTime() : instant.requestedTime(), completionTime);
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -131,9 +130,7 @@ List<Pair<String, HoodieRollbackStat>> maybeDeleteAndCollectStats(HoodieEngineCo
List<String> filesToBeDeleted = rollbackRequest.getFilesToBeDeleted();
if (!filesToBeDeleted.isEmpty()) {
List<HoodieRollbackStat> rollbackStats = deleteFiles(metaClient, filesToBeDeleted, doDelete);
List<Pair<String, HoodieRollbackStat>> partitionToRollbackStats = new ArrayList<>();
rollbackStats.forEach(entry -> partitionToRollbackStats.add(Pair.of(entry.getPartitionPath(), entry)));
return partitionToRollbackStats.stream();
return rollbackStats.stream().map(entry -> Pair.of(entry.getPartitionPath(), entry));
} else if (!rollbackRequest.getLogBlocksToBeDeleted().isEmpty()) {
HoodieLogFormat.Writer writer = null;
final StoragePath filePath;
Expand Down
Loading
Loading