Skip to content

Commit 4ebc254

Browse files
lokeshj1703Lokesh Jainthe-other-tim-browndanny0405
authored
[HUDI-9602] Use BufferedRecordMerger for dedup and global index path (#13600)
* use the BufferedRecordMerger to deduplicate inputs for COW and index write path; * Add a new sub-merger for MIT expression payload. --------- Co-authored-by: Lokesh Jain <[email protected]> Co-authored-by: Timothy Brown <[email protected]> Co-authored-by: danny0405 <[email protected]>
1 parent b90cb5d commit 4ebc254

File tree

38 files changed

+704
-236
lines changed

38 files changed

+704
-236
lines changed

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java

Lines changed: 123 additions & 57 deletions
Large diffs are not rendered by default.

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java

Lines changed: 79 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,35 @@
1818

1919
package org.apache.hudi.table.action.commit;
2020

21+
import org.apache.hudi.avro.AvroSchemaCache;
2122
import org.apache.hudi.common.config.TypedProperties;
2223
import org.apache.hudi.common.engine.HoodieEngineContext;
24+
import org.apache.hudi.common.engine.HoodieReaderContext;
25+
import org.apache.hudi.common.engine.RecordContext;
2326
import org.apache.hudi.common.function.SerializableFunctionUnchecked;
24-
import org.apache.hudi.common.model.HoodieRecordMerger;
27+
import org.apache.hudi.common.model.HoodieKey;
28+
import org.apache.hudi.common.model.HoodieOperation;
29+
import org.apache.hudi.common.model.HoodieRecord;
2530
import org.apache.hudi.common.model.WriteOperationType;
31+
import org.apache.hudi.common.table.HoodieTableConfig;
32+
import org.apache.hudi.common.table.read.BufferedRecord;
33+
import org.apache.hudi.common.table.read.BufferedRecordMerger;
34+
import org.apache.hudi.common.table.read.BufferedRecordMergerFactory;
2635
import org.apache.hudi.common.util.HoodieRecordUtils;
2736
import org.apache.hudi.common.util.HoodieTimer;
2837
import org.apache.hudi.common.util.Option;
38+
import org.apache.hudi.common.util.StringUtils;
39+
import org.apache.hudi.exception.HoodieException;
2940
import org.apache.hudi.exception.HoodieUpsertException;
3041
import org.apache.hudi.index.HoodieIndex;
3142
import org.apache.hudi.table.HoodieTable;
3243
import org.apache.hudi.table.action.HoodieWriteMetadata;
3344

45+
import org.apache.avro.Schema;
46+
47+
import java.io.IOException;
48+
import java.util.List;
49+
3450
public abstract class BaseWriteHelper<T, I, K, O, R> extends ParallelismHelper<I> {
3551

3652
protected BaseWriteHelper(SerializableFunctionUnchecked<I, Integer> partitionNumberExtractor) {
@@ -85,9 +101,68 @@ public I combineOnCondition(
85101
* @return Collection of HoodieRecord already be deduplicated
86102
*/
87103
public I deduplicateRecords(I records, HoodieTable<T, I, K, O> table, int parallelism) {
88-
HoodieRecordMerger recordMerger = HoodieRecordUtils.mergerToPreCombineMode(table.getConfig().getRecordMerger());
89-
return deduplicateRecords(records, table.getIndex(), parallelism, table.getConfig().getSchema(), table.getConfig().getProps(), recordMerger);
104+
HoodieReaderContext<T> readerContext =
105+
(HoodieReaderContext<T>) table.getContext().<T>getReaderContextFactoryForWrite(table.getMetaClient(), table.getConfig().getRecordMerger().getRecordType(), table.getConfig().getProps())
106+
.getContext();
107+
HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig();
108+
readerContext.initRecordMergerForIngestion(table.getConfig().getProps());
109+
List<String> orderingFieldNames = HoodieRecordUtils.getOrderingFieldNames(readerContext.getMergeMode(), table.getConfig().getProps(), table.getMetaClient());
110+
Schema recordSchema;
111+
if (StringUtils.nonEmpty(table.getConfig().getPartialUpdateSchema())) {
112+
recordSchema = new Schema.Parser().parse(table.getConfig().getPartialUpdateSchema());
113+
} else {
114+
recordSchema = new Schema.Parser().parse(table.getConfig().getWriteSchema());
115+
}
116+
recordSchema = AvroSchemaCache.intern(recordSchema);
117+
BufferedRecordMerger<T> bufferedRecordMerger = BufferedRecordMergerFactory.create(
118+
readerContext,
119+
readerContext.getMergeMode(),
120+
false,
121+
readerContext.getRecordMerger().map(HoodieRecordUtils::mergerToPreCombineMode),
122+
orderingFieldNames,
123+
Option.ofNullable(table.getConfig().getPayloadClass()),
124+
recordSchema,
125+
table.getConfig().getProps(),
126+
tableConfig.getPartialUpdateMode());
127+
return deduplicateRecords(
128+
records,
129+
table.getIndex(),
130+
parallelism,
131+
table.getConfig().getSchema(),
132+
table.getConfig().getProps(),
133+
bufferedRecordMerger,
134+
readerContext,
135+
orderingFieldNames.toArray(new String[0]));
90136
}
91137

92-
public abstract I deduplicateRecords(I records, HoodieIndex<?, ?> index, int parallelism, String schema, TypedProperties props, HoodieRecordMerger merger);
138+
public abstract I deduplicateRecords(I records,
139+
HoodieIndex<?, ?> index,
140+
int parallelism,
141+
String schema,
142+
TypedProperties props,
143+
BufferedRecordMerger<T> merger,
144+
HoodieReaderContext<T> readerContext,
145+
String[] orderingFieldNames);
146+
147+
protected static <T> HoodieRecord<T> reduceRecords(TypedProperties props, BufferedRecordMerger<T> recordMerger, String[] orderingFieldNames,
148+
HoodieRecord<T> previous, HoodieRecord<T> next, Schema schema, RecordContext<T> recordContext) {
149+
try {
150+
// NOTE: The order of previous and next is uncertain within a batch in "reduceByKey".
151+
// If the return value is empty, it means the previous should be chosen.
152+
BufferedRecord<T> newBufferedRecord = BufferedRecord.forRecordWithContext(next, schema, recordContext, props, orderingFieldNames);
153+
// Construct old buffered record.
154+
BufferedRecord<T> oldBufferedRecord = BufferedRecord.forRecordWithContext(previous, schema, recordContext, props, orderingFieldNames);
155+
// Run merge.
156+
Option<BufferedRecord<T>> merged = recordMerger.deltaMerge(newBufferedRecord, oldBufferedRecord);
157+
// NOTE: For merge mode based merging, it returns non-null.
158+
// For mergers / payloads based merging, it may return null.
159+
HoodieRecord<T> reducedRecord = merged.map(bufferedRecord -> recordContext.constructHoodieRecord(bufferedRecord, next.getPartitionPath())).orElse(previous);
160+
boolean choosePrevious = merged.isEmpty();
161+
HoodieKey reducedKey = choosePrevious ? previous.getKey() : next.getKey();
162+
HoodieOperation operation = choosePrevious ? previous.getOperation() : next.getOperation();
163+
return reducedRecord.newInstance(reducedKey, operation);
164+
} catch (IOException e) {
165+
throw new HoodieException(String.format("Error to merge two records, %s, %s", previous, next), e);
166+
}
167+
}
93168
}

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,15 @@
2323
import org.apache.hudi.common.config.TypedProperties;
2424
import org.apache.hudi.common.data.HoodieData;
2525
import org.apache.hudi.common.engine.HoodieEngineContext;
26+
import org.apache.hudi.common.engine.HoodieReaderContext;
27+
import org.apache.hudi.common.engine.RecordContext;
2628
import org.apache.hudi.common.model.HoodieKey;
27-
import org.apache.hudi.common.model.HoodieOperation;
2829
import org.apache.hudi.common.model.HoodieRecord;
29-
import org.apache.hudi.common.model.HoodieRecordMerger;
30+
import org.apache.hudi.common.table.read.BufferedRecordMerger;
3031
import org.apache.hudi.common.util.collection.Pair;
31-
import org.apache.hudi.exception.HoodieException;
3232
import org.apache.hudi.index.HoodieIndex;
3333
import org.apache.hudi.table.HoodieTable;
3434

35-
import java.io.IOException;
36-
3735
public class HoodieWriteHelper<T, R> extends BaseWriteHelper<T, HoodieData<HoodieRecord<T>>,
3836
HoodieData<HoodieKey>, HoodieData<WriteStatus>, R> {
3937

@@ -56,10 +54,17 @@ protected HoodieData<HoodieRecord<T>> tag(HoodieData<HoodieRecord<T>> dedupedRec
5654
}
5755

5856
@Override
59-
public HoodieData<HoodieRecord<T>> deduplicateRecords(
60-
HoodieData<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism, String schemaStr, TypedProperties props, HoodieRecordMerger merger) {
57+
public HoodieData<HoodieRecord<T>> deduplicateRecords(HoodieData<HoodieRecord<T>> records,
58+
HoodieIndex<?, ?> index,
59+
int parallelism,
60+
String schemaStr,
61+
TypedProperties props,
62+
BufferedRecordMerger<T> recordMerger,
63+
HoodieReaderContext<T> readerContext,
64+
String[] orderingFieldNames) {
6165
boolean isIndexingGlobal = index.isGlobal();
6266
final SerializableSchema schema = new SerializableSchema(schemaStr);
67+
RecordContext<T> recordContext = readerContext.getRecordContext();
6368
return records.mapToPair(record -> {
6469
HoodieKey hoodieKey = record.getKey();
6570
// If index used is global, then records are expected to differ in their partitionPath
@@ -68,17 +73,8 @@ public HoodieData<HoodieRecord<T>> deduplicateRecords(
6873
// Here we have to make a copy of the incoming record, since it might be holding
6974
// an instance of [[InternalRow]] pointing into shared, mutable buffer
7075
return Pair.of(key, record.copy());
71-
}).reduceByKey((rec1, rec2) -> {
72-
HoodieRecord<T> reducedRecord;
73-
try {
74-
reducedRecord = merger.merge(rec1, schema.get(), rec2, schema.get(), props).get().getLeft();
75-
} catch (IOException e) {
76-
throw new HoodieException(String.format("Error to merge two records, %s, %s", rec1, rec2), e);
77-
}
78-
boolean choosePrev = rec1.getData().equals(reducedRecord.getData());
79-
HoodieKey reducedKey = choosePrev ? rec1.getKey() : rec2.getKey();
80-
HoodieOperation operation = choosePrev ? rec1.getOperation() : rec2.getOperation();
81-
return reducedRecord.newInstance(reducedKey, operation);
82-
}, parallelism).map(Pair::getRight);
76+
}).reduceByKey(
77+
(previous, next) -> reduceRecords(props, recordMerger, orderingFieldNames, previous, next, schema.get(), recordContext),
78+
parallelism).map(Pair::getRight);
8379
}
8480
}

hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/HoodieWriterClientTestHarness.java

Lines changed: 42 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,12 @@
2727
import org.apache.hudi.common.config.HoodieMetadataConfig;
2828
import org.apache.hudi.common.config.HoodieStorageConfig;
2929
import org.apache.hudi.common.config.LockConfiguration;
30+
import org.apache.hudi.common.config.RecordMergeMode;
3031
import org.apache.hudi.common.data.HoodieData;
3132
import org.apache.hudi.common.data.HoodieListData;
3233
import org.apache.hudi.common.engine.EngineType;
3334
import org.apache.hudi.common.engine.HoodieEngineContext;
35+
import org.apache.hudi.common.engine.HoodieReaderContext;
3436
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
3537
import org.apache.hudi.common.fs.FSUtils;
3638
import org.apache.hudi.common.model.HoodieAvroRecord;
@@ -39,7 +41,6 @@
3941
import org.apache.hudi.common.model.HoodieFileGroupId;
4042
import org.apache.hudi.common.model.HoodieKey;
4143
import org.apache.hudi.common.model.HoodieOperation;
42-
import org.apache.hudi.common.model.HoodiePreCombineAvroRecordMerger;
4344
import org.apache.hudi.common.model.HoodieRecord;
4445
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
4546
import org.apache.hudi.common.model.HoodieWriteStat;
@@ -50,6 +51,8 @@
5051
import org.apache.hudi.common.table.HoodieTableMetaClient;
5152
import org.apache.hudi.common.table.HoodieTableVersion;
5253
import org.apache.hudi.common.table.marker.MarkerType;
54+
import org.apache.hudi.common.table.read.BufferedRecordMerger;
55+
import org.apache.hudi.common.table.read.BufferedRecordMergerFactory;
5356
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
5457
import org.apache.hudi.common.table.timeline.HoodieInstant;
5558
import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -93,6 +96,7 @@
9396
import org.apache.hudi.table.upgrade.UpgradeDowngrade;
9497
import org.apache.hudi.testutils.MetadataMergeWriteStatus;
9598

99+
import org.apache.avro.Schema;
96100
import org.apache.avro.generic.GenericRecord;
97101
import org.jetbrains.annotations.NotNull;
98102

@@ -129,6 +133,7 @@
129133
import static org.apache.hudi.common.testutils.HoodieTestUtils.TIMELINE_FACTORY;
130134
import static org.apache.hudi.common.testutils.Transformations.randomSelectAsHoodieKeys;
131135
import static org.apache.hudi.common.testutils.Transformations.recordsToRecordKeySet;
136+
import static org.apache.hudi.common.util.HoodieRecordUtils.getOrderingFieldNames;
132137
import static org.apache.hudi.config.HoodieClusteringConfig.SCHEDULE_INLINE_CLUSTERING;
133138
import static org.apache.hudi.testutils.Assertions.assertNoDupesWithinPartition;
134139
import static org.apache.hudi.testutils.Assertions.assertNoDuplicatesInPartition;
@@ -544,8 +549,34 @@ protected List<HoodieRecord<RawTripTestPayload>> dedupForCopyOnWriteStorage(Hood
544549
HoodieIndex index = mock(HoodieIndex.class);
545550
when(index.isGlobal()).thenReturn(isGlobal);
546551
int dedupParallelism = records.getNumPartitions() + additionalParallelism;
547-
HoodieData<HoodieRecord<RawTripTestPayload>> dedupedRecsRdd = (HoodieData<HoodieRecord<RawTripTestPayload>>) HoodieWriteHelper.newInstance()
548-
.deduplicateRecords(records, index, dedupParallelism, writeConfig.getSchema(), writeConfig.getProps(), HoodiePreCombineAvroRecordMerger.INSTANCE);
552+
BaseHoodieWriteClient writeClient = getHoodieWriteClient(writeConfig);
553+
HoodieReaderContext readerContext = writeClient.getEngineContext()
554+
.getReaderContextFactoryForWrite(metaClient, HoodieRecord.HoodieRecordType.AVRO, writeConfig.getProps()).getContext();
555+
List<String> orderingFieldNames = getOrderingFieldNames(
556+
readerContext.getMergeMode(), writeClient.getConfig().getProps(), metaClient);
557+
RecordMergeMode recordMergeMode = HoodieTableConfig.inferCorrectMergingBehavior(null, writeConfig.getPayloadClass(), null,
558+
String.join(",", orderingFieldNames), metaClient.getTableConfig().getTableVersion()).getLeft();
559+
BufferedRecordMerger<HoodieRecord> recordMerger = BufferedRecordMergerFactory.create(
560+
readerContext,
561+
recordMergeMode,
562+
false,
563+
Option.ofNullable(writeClient.getConfig().getRecordMerger()),
564+
orderingFieldNames,
565+
Option.ofNullable(writeClient.getConfig().getPayloadClass()),
566+
new Schema.Parser().parse(writeClient.getConfig().getSchema()),
567+
writeClient.getConfig().getProps(),
568+
metaClient.getTableConfig().getPartialUpdateMode());
569+
HoodieData<HoodieRecord<RawTripTestPayload>> dedupedRecsRdd =
570+
(HoodieData<HoodieRecord<RawTripTestPayload>>) HoodieWriteHelper.newInstance()
571+
.deduplicateRecords(
572+
records,
573+
index,
574+
dedupParallelism,
575+
writeConfig.getSchema(),
576+
writeConfig.getProps(),
577+
recordMerger,
578+
readerContext,
579+
orderingFieldNames.toArray(new String[0]));
549580
assertEquals(expectedNumPartitions, dedupedRecsRdd.getNumPartitions());
550581
List<HoodieRecord<RawTripTestPayload>> dedupedRecs = dedupedRecsRdd.collectAsList();
551582
assertEquals(isGlobal ? 1 : 2, dedupedRecs.size());
@@ -1187,24 +1218,22 @@ protected void testHoodieConcatHandleOnDupInserts(boolean isPrepped, InstantGene
11871218
*/
11881219
protected void testUpsertsInternal(Function3<Object, BaseHoodieWriteClient, Object, String> writeFn, boolean populateMetaFields, boolean isPrepped,
11891220
SupportsUpgradeDowngrade upgradeDowngrade) throws Exception {
1190-
11911221
metaClient.getStorage().deleteDirectory(new StoragePath(basePath));
1192-
1193-
metaClient = HoodieTableMetaClient.newTableBuilder()
1194-
.fromMetaClient(metaClient)
1195-
.setTableVersion(6)
1196-
.setPopulateMetaFields(populateMetaFields)
1197-
.initTable(metaClient.getStorageConf().newInstance(), metaClient.getBasePath());
1198-
11991222
HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY).withRollbackUsingMarkers(true)
12001223
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withMetadataIndexColumnStats(true).withColumnStatsIndexForColumns("driver,rider")
12011224
.withMetadataIndexColumnStatsFileGroupCount(1).withEngineType(getEngineType()).build())
12021225
.withWriteTableVersion(6);
12031226

12041227
addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
1205-
metaClient = HoodieTestUtils.createMetaClient(storageConf, new StoragePath(basePath), HoodieTableVersion.SIX);
1206-
12071228
HoodieWriteConfig config = cfgBuilder.build();
1229+
metaClient = HoodieTableMetaClient.newTableBuilder()
1230+
.fromProperties(config.getProps())
1231+
.setTableVersion(6)
1232+
.setTableType(metaClient.getTableType())
1233+
.setPopulateMetaFields(populateMetaFields)
1234+
.initTable(metaClient.getStorageConf().newInstance(), metaClient.getBasePath());
1235+
1236+
metaClient = HoodieTestUtils.createMetaClient(storageConf, new StoragePath(basePath), HoodieTableVersion.SIX);
12081237
BaseHoodieWriteClient client = getHoodieWriteClient(config);
12091238

12101239
// Write 1 (only inserts)

hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java

Lines changed: 13 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -22,21 +22,19 @@
2222
import org.apache.hudi.common.config.TypedProperties;
2323
import org.apache.hudi.common.data.HoodieListData;
2424
import org.apache.hudi.common.engine.HoodieEngineContext;
25+
import org.apache.hudi.common.engine.HoodieReaderContext;
2526
import org.apache.hudi.common.model.HoodieKey;
26-
import org.apache.hudi.common.model.HoodieOperation;
2727
import org.apache.hudi.common.model.HoodieRecord;
28-
import org.apache.hudi.common.model.HoodieRecordMerger;
2928
import org.apache.hudi.common.model.WriteOperationType;
29+
import org.apache.hudi.common.table.read.BufferedRecordMerger;
3030
import org.apache.hudi.common.util.CollectionUtils;
31-
import org.apache.hudi.exception.HoodieException;
3231
import org.apache.hudi.exception.HoodieUpsertException;
3332
import org.apache.hudi.index.HoodieIndex;
3433
import org.apache.hudi.table.HoodieTable;
3534
import org.apache.hudi.table.action.HoodieWriteMetadata;
3635

3736
import org.apache.avro.Schema;
3837

39-
import java.io.IOException;
4038
import java.time.Duration;
4139
import java.util.Iterator;
4240
import java.util.List;
@@ -92,31 +90,22 @@ protected Iterator<HoodieRecord<T>> tag(Iterator<HoodieRecord<T>> dedupedRecords
9290
}
9391

9492
@Override
95-
public Iterator<HoodieRecord<T>> deduplicateRecords(Iterator<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism, String schemaStr, TypedProperties props, HoodieRecordMerger merger) {
93+
public Iterator<HoodieRecord<T>> deduplicateRecords(Iterator<HoodieRecord<T>> records,
94+
HoodieIndex<?, ?> index,
95+
int parallelism,
96+
String schemaStr,
97+
TypedProperties props,
98+
BufferedRecordMerger<T> recordMerger,
99+
HoodieReaderContext<T> readerContext,
100+
String[] orderingFieldNames) {
96101
// If index used is global, then records are expected to differ in their partitionPath
97102
Map<Object, List<HoodieRecord<T>>> keyedRecords = CollectionUtils.toStream(records)
98103
.collect(Collectors.groupingBy(record -> record.getKey().getRecordKey()));
99104

100105
// caution that the avro schema is not serializable
101106
final Schema schema = new Schema.Parser().parse(schemaStr);
102-
return keyedRecords.values().stream().map(x -> x.stream().reduce((rec1, rec2) -> {
103-
HoodieRecord<T> reducedRecord;
104-
try {
105-
// Precombine do not need schema and do not return null
106-
reducedRecord = merger.merge(rec1, schema, rec2, schema, props).get().getLeft();
107-
} catch (IOException e) {
108-
throw new HoodieException(String.format("Error to merge two records, %s, %s", rec1, rec2), e);
109-
}
110-
// we cannot allow the user to change the key or partitionPath, since that will affect
111-
// everything
112-
// so pick it from one of the records.
113-
boolean choosePrev = rec1.getData() == reducedRecord.getData();
114-
HoodieKey reducedKey = choosePrev ? rec1.getKey() : rec2.getKey();
115-
HoodieOperation operation = choosePrev ? rec1.getOperation() : rec2.getOperation();
116-
HoodieRecord<T> hoodieRecord = reducedRecord.newInstance(reducedKey, operation);
117-
// reuse the location from the first record.
118-
hoodieRecord.setCurrentLocation(rec1.getCurrentLocation());
119-
return hoodieRecord;
120-
}).orElse(null)).filter(Objects::nonNull).iterator();
107+
return keyedRecords.values().stream().map(x -> x.stream().reduce((previous, next) ->
108+
reduceRecords(props, recordMerger, orderingFieldNames, previous, next, schema, readerContext.getRecordContext())
109+
).orElse(null)).filter(Objects::nonNull).iterator();
121110
}
122111
}

0 commit comments

Comments
 (0)