Skip to content

[DRAFT][DNM] CoW write handle with file group reader #13699

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 28 commits into
base: master
Choose a base branch
from

Conversation

the-other-tim-brown
Copy link
Contributor

Change Logs

Describe context and summary for this change. Highlight if any code was copied.

Impact

Describe any public API or user-facing feature change or any performance impact.

Risk level (write none, low medium or high below)

If medium or high, explain what verification was done to mitigate the risks.

Documentation Update

Describe any necessary documentation update if there is any new feature, config, or user-facing change. If not, put "none".

  • The config description must be updated if new configs are added or the default value of the configs are changed
  • Any new feature or user-facing change requires updating the Hudi website. Please create a Jira ticket, attach the
    ticket number here and follow the instruction to make
    changes to the website.

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@github-actions github-actions bot added the size:XL PR with lines of changes > 1000 label Aug 9, 2025
@@ -119,6 +119,9 @@ public static Iterator<List<WriteStatus>> runMerge(HoodieMergeHandle<?, ?, ?, ?>
"Error in finding the old file path at commit " + instantTime + " for fileId: " + fileId);
} else {
mergeHandle.doMerge();
if (mergeHandle instanceof FileGroupReaderBasedMergeHandle) {
mergeHandle.close();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Open question: Is there any reason to avoid calling close on the other merge handles?

indexStats.addHoodieRecordDelegate(HoodieRecordDelegate.fromHoodieRecord(record));
}
updateStatsForSuccess(optionalRecordMetadata);
}

public void manuallyTrackSuccess() {
this.manuallyTrackIndexUpdates = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can just set up trackSuccessRecords as false here.

@@ -430,16 +430,11 @@ private static <R> Option<HoodieRecord<R>> mergeIncomingWithExistingRecordWithEx
//the record was deleted
return Option.empty();
}
if (mergeResult.getRecord() == null) {
// SENTINEL case: the record did not match and merge case and should not be modified
if (mergeResult.getRecord() == null || mergeResult == existingBufferedRecord) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this is only for merge into and merge into only been utilized with EVENT_TIME merge mode, equals with buffered record is valid, otherwise, better use record equals here.

*/
public FileGroupReaderBasedMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId,
TaskContextSupplier taskContextSupplier, Option<BaseKeyGenerator> keyGeneratorOpt, HoodieReaderContext<T> readerContext) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to pass around the readerContext explicitly here? can we use hoodieTable.getContext.getReaderContextFactoryForWrite instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue is that the merge handles are created on the executors in spark so the hoodieTable.getContext will always return a local engine context instead of a spark engine context when required.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

always return a local engine context instead of a spark engine context when required

Can we fix that, like hoodieTable.getContextForWrite or something.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is getContextForWrite returning an EngineContext here?

init(operation, this.partitionPath);
this.props = TypedProperties.copy(config.getProps());
this.isCompaction = true;
initRecordIndexCallback();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we even need to track RLI for compactions?

initRecordTypeAndCdcLogger(enginRecordType);
init(operation, this.partitionPath);
this.props = TypedProperties.copy(config.getProps());
this.isCompaction = true;
Copy link
Contributor

@danny0405 danny0405 Aug 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we already have flag preserveMetadata to distinguish table service and regular write, can we continue to use that? some functions like SI tracing already relies on the flag preserveMetadata. And it seems clustering also uses this constructor.

}

private void initRecordIndexCallback() {
if (this.writeStatus.isTrackingSuccessfulWrites()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The isTrackingSuccessfulWrites flag in write status comes from hoodieTable.shouldTrackSuccessRecords(), which is true when RLI or partitioned RLI is enabled, we should skip the location tracing for compaction which is redundant.

private void populateIncomingRecordsMapIterator(Iterator<HoodieRecord<T>> newRecordsItr) {
if (!isCompaction) {
// avoid populating external spillable in base {@link HoodieWriteMergeHandle)
this.incomingRecordsItr = new MappingIterator<>(newRecordsItr, record -> (HoodieRecord) record);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this still needed ?

this.secondaryIndexCallbackOpt = Option.empty();
}
secondaryIndexCallbackOpt.ifPresent(callbacks::add);
return callbacks.isEmpty() ? Option.empty() : Option.of(CompositeCallback.of(callbacks));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all the callbacks can be initialzied as local variables, there is no need to use CompositeCallback when the callbacks is single.

*/
public ReaderContextFactory<?> getReaderContextFactoryForWrite(HoodieTableMetaClient metaClient, HoodieRecord.HoodieRecordType recordType,
TypedProperties properties) {
TypedProperties properties, boolean outputsCustomPayloads) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This flag is only meaningful for avro reader context, is there anyway we can constraint it just to AvroReaderContextFactory?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't find a good way right now. This flag is really representing two different stages of the writer path, the dedupe/indexing stages and the final write. In the final write, we don't want to ever use the payload based records since we just want the final indexed record representation of the record.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have a plan to abadon the payload based records in writer path right? So this should be just a temporary solution?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll still need it for ExpressionPayload and for any user provided payload so it is not temporary but these restrictions may allow us to clean things up further.


@Override
public void onUpdate(String recordKey, BufferedRecord<T> previousRecord, BufferedRecord<T> mergedRecord) {
writeStatus.addRecordDelegate(HoodieRecordDelegate.create(recordKey, partitionPath, fileRecordLocation, fileRecordLocation, mergedRecord.getHoodieOperation() == HoodieOperation.UPDATE_BEFORE));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we even need to add the delete when mergedRecord.getHoodieOperation() == HoodieOperation.UPDATE_BEFORE is true ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The write status will still be updated in the current code with this record delegate even though ignoreIndexUpdate is true. This is keeping parity with the old system but I am not sure of the context for this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The flag is used when all the delegates are collected into the driver and been utilized to calcurate the RLI index items for MDT, the delegate with flag ignoreIndexUpdate as true are just been dropped directly, so there is no need to even generate and collect them.


@Override
public void onInsert(String recordKey, BufferedRecord<T> newRecord) {
writeStatus.addRecordDelegate(HoodieRecordDelegate.create(recordKey, partitionPath, null, fileRecordLocation, newRecord.getHoodieOperation() == HoodieOperation.UPDATE_BEFORE));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

newRecord.getHoodieOperation() == HoodieOperation.UPDATE_BEFORE is always false.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's always false today but do we want to keep this in case there is some future case where it may not be the case?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but do we want to keep this in case there is some future case

I don't think so, the hoodie operation is designed to be force set up there.

public void onDelete(String recordKey, BufferedRecord<T> previousRecord, HoodieOperation hoodieOperation) {
// The update before operation is used when a deletion is being sent to the old File Group in a different partition.
// In this case, we do not want to delete the record metadata from the index.
writeStatus.addRecordDelegate(HoodieRecordDelegate.create(recordKey, partitionPath, fileRecordLocation, null, hoodieOperation == HoodieOperation.UPDATE_BEFORE));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hoodieOperation == HoodieOperation.UPDATE_BEFORE is always false.

} else {
Schema readerSchema = readerContext.getSchemaHandler().getRequestedSchema();
// If the record schema is different from the reader schema, rewrite the record using the payload methods to ensure consistency with legacy writer paths
if (!readerSchema.equals(recordSchema)) {
Copy link
Contributor

@danny0405 danny0405 Aug 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be super costly. can it be simplified by checking the fields number?

static <T> StreamingFileGroupRecordBufferLoader<T> getInstance() {
return INSTANCE;
StreamingFileGroupRecordBufferLoader(Schema recordSchema) {
this.recordSchema = recordSchema;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is no need to pass around the schema explicitly, it is actually the writeSchema, which equals to:
schemaHandler.requestedSchema - metadata fields, we already have utility method for it: HoodieAvroUtils.removeMetadataFields.

* @param writeStatus The Write status
* @param secondaryIndexDefns Definitions for secondary index which need to be updated
*/
static <T> void trackSecondaryIndexStats(HoodieKey hoodieKey, Option<BufferedRecord<T>> combinedRecordOpt, @Nullable BufferedRecord<T> oldRecord, boolean isDelete,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Method mirrors the one above it but operates directly on BufferedRecord instead of converting to HoodieRecord

*/
public FileGroupReaderBasedMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId,
TaskContextSupplier taskContextSupplier, Option<BaseKeyGenerator> keyGeneratorOpt, HoodieReaderContext<T> readerContext) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue is that the merge handles are created on the executors in spark so the hoodieTable.getContext will always return a local engine context instead of a spark engine context when required.

*/
public ReaderContextFactory<?> getReaderContextFactoryForWrite(HoodieTableMetaClient metaClient, HoodieRecord.HoodieRecordType recordType,
TypedProperties properties) {
TypedProperties properties, boolean outputsCustomPayloads) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't find a good way right now. This flag is really representing two different stages of the writer path, the dedupe/indexing stages and the final write. In the final write, we don't want to ever use the payload based records since we just want the final indexed record representation of the record.


@Override
public void onInsert(String recordKey, BufferedRecord<T> newRecord) {
writeStatus.addRecordDelegate(HoodieRecordDelegate.create(recordKey, partitionPath, null, fileRecordLocation, newRecord.getHoodieOperation() == HoodieOperation.UPDATE_BEFORE));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's always false today but do we want to keep this in case there is some future case where it may not be the case?


@Override
public void onUpdate(String recordKey, BufferedRecord<T> previousRecord, BufferedRecord<T> mergedRecord) {
writeStatus.addRecordDelegate(HoodieRecordDelegate.create(recordKey, partitionPath, fileRecordLocation, fileRecordLocation, mergedRecord.getHoodieOperation() == HoodieOperation.UPDATE_BEFORE));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The write status will still be updated in the current code with this record delegate even though ignoreIndexUpdate is true. This is keeping parity with the old system but I am not sure of the context for this.

@the-other-tim-brown the-other-tim-brown force-pushed the cow-merge-handle-to-fgr-3 branch from 50022dc to 47f8303 Compare August 11, 2025 16:18
import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.INSTANT_TIME;

abstract class FileGroupRecordBuffer<T> implements HoodieFileGroupRecordBuffer<T> {
protected final Set<String> usedKeys = new HashSet<>();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a possibility of duplicate keys in a file and there is an expectation that updates are applied to both rows. See Test only insert for source table in dup key without preCombineField for an example. We need to figure out if there is a better way to handle this.

@hudi-bot
Copy link

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
size:XL PR with lines of changes > 1000
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants