Skip to content

Commit c6d0551

Browse files
author
Jonathan Vexler
committed
Merge branch 'master' into incremental_v1_use_fg_reader
2 parents a2a4445 + 9e7fba4 commit c6d0551

File tree

137 files changed

+4732
-1223
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

137 files changed

+4732
-1223
lines changed

hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java

Lines changed: 59 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -18,29 +18,33 @@
1818

1919
package org.apache.hudi.cli.commands;
2020

21+
import org.apache.hudi.avro.HoodieAvroReaderContext;
2122
import org.apache.hudi.cli.HoodieCLI;
2223
import org.apache.hudi.cli.HoodiePrintHelper;
2324
import org.apache.hudi.cli.HoodieTableHeaderFields;
2425
import org.apache.hudi.cli.TableHeader;
25-
import org.apache.hudi.common.config.HoodieCommonConfig;
26-
import org.apache.hudi.common.config.HoodieMemoryConfig;
27-
import org.apache.hudi.common.config.HoodieReaderConfig;
26+
import org.apache.hudi.common.config.TypedProperties;
27+
import org.apache.hudi.common.engine.HoodieReaderContext;
2828
import org.apache.hudi.common.fs.FSUtils;
29-
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
29+
import org.apache.hudi.common.model.FileSlice;
30+
import org.apache.hudi.common.model.HoodieFileGroupId;
3031
import org.apache.hudi.common.model.HoodieLogFile;
3132
import org.apache.hudi.common.model.HoodieRecord;
3233
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
3334
import org.apache.hudi.common.table.HoodieTableMetaClient;
3435
import org.apache.hudi.common.table.TableSchemaResolver;
3536
import org.apache.hudi.common.table.log.HoodieLogFormat;
3637
import org.apache.hudi.common.table.log.HoodieLogFormat.Reader;
37-
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
3838
import org.apache.hudi.common.table.log.block.HoodieCorruptBlock;
3939
import org.apache.hudi.common.table.log.block.HoodieDataBlock;
4040
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
4141
import org.apache.hudi.common.table.log.block.HoodieLogBlock.FooterMetadataType;
4242
import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
4343
import org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType;
44+
import org.apache.hudi.common.table.read.HoodieFileGroupReader;
45+
import org.apache.hudi.common.table.timeline.HoodieInstant;
46+
import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
47+
import org.apache.hudi.common.table.timeline.HoodieTimeline;
4448
import org.apache.hudi.common.util.FileIOUtils;
4549
import org.apache.hudi.common.util.Option;
4650
import org.apache.hudi.common.util.collection.ClosableIterator;
@@ -61,13 +65,16 @@
6165
import java.util.List;
6266
import java.util.Map;
6367
import java.util.Objects;
64-
import java.util.Properties;
6568
import java.util.concurrent.atomic.AtomicInteger;
6669
import java.util.stream.Collectors;
6770

6871
import scala.Tuple2;
6972
import scala.Tuple3;
7073

74+
import static org.apache.hudi.common.config.HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED;
75+
import static org.apache.hudi.common.config.HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE;
76+
import static org.apache.hudi.common.config.HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE;
77+
import static org.apache.hudi.common.config.HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH;
7178
import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
7279

7380
/**
@@ -189,7 +196,7 @@ storage, new StoragePath(logFilePathPattern)).stream()
189196

190197
@ShellMethod(key = "show logfile records", value = "Read records from log files")
191198
public String showLogFileRecords(
192-
@ShellOption(value = {"--limit"}, help = "Limit commits",
199+
@ShellOption(value = {"--limit"}, help = "Limit number of records",
193200
defaultValue = "10") final Integer limit,
194201
@ShellOption(value = "--logFilePathPattern",
195202
help = "Fully qualified paths for the log files") final String logFilePathPattern,
@@ -222,34 +229,33 @@ storage, new StoragePath(logFilePathPattern)).stream()
222229
}
223230
Objects.requireNonNull(readerSchema);
224231
List<IndexedRecord> allRecords = new ArrayList<>();
225-
226232
if (shouldMerge) {
227233
System.out.println("===========================> MERGING RECORDS <===================");
228-
HoodieMergedLogRecordScanner scanner =
229-
HoodieMergedLogRecordScanner.newBuilder()
230-
.withStorage(storage)
231-
.withBasePath(client.getBasePath())
232-
.withLogFilePaths(logFilePaths)
233-
.withReaderSchema(readerSchema)
234-
.withLatestInstantTime(
235-
client.getActiveTimeline()
236-
.getCommitAndReplaceTimeline().lastInstant().get().requestedTime())
237-
.withReverseReader(
238-
Boolean.parseBoolean(
239-
HoodieReaderConfig.COMPACTION_REVERSE_LOG_READ_ENABLE.defaultValue()))
240-
.withBufferSize(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.defaultValue())
241-
.withMaxMemorySizeInBytes(
242-
HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES)
243-
.withSpillableMapBasePath(FileIOUtils.getDefaultSpillableMapBasePath())
244-
.withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())
245-
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())
246-
.withOptimizedLogBlocksScan(Boolean.parseBoolean(HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.defaultValue()))
247-
.build();
248-
for (HoodieRecord hoodieRecord : scanner) {
249-
Option<HoodieAvroIndexedRecord> record = hoodieRecord.toIndexedRecord(readerSchema, new Properties());
250-
if (allRecords.size() < limit) {
251-
allRecords.add(record.get().getData());
252-
}
234+
HoodieReaderContext<IndexedRecord> readerContext = new HoodieAvroReaderContext(
235+
HoodieCLI.getTableMetaClient().getStorage().getConf(),
236+
HoodieCLI.getTableMetaClient().getTableConfig(),
237+
Option.empty(),
238+
Option.empty());
239+
StoragePath firstLogFile = new StoragePath(logFilePaths.get(0));
240+
HoodieFileGroupId fileGroupId = new HoodieFileGroupId(FSUtils.getRelativePartitionPath(HoodieCLI.getTableMetaClient().getBasePath(), firstLogFile), FSUtils.getFileIdFromLogPath(firstLogFile));
241+
FileSlice fileSlice = new FileSlice(fileGroupId, HoodieTimeline.INIT_INSTANT_TS, null, logFilePaths.stream()
242+
.map(l -> new HoodieLogFile(new StoragePath(l))).collect(Collectors.toList()));
243+
try (HoodieFileGroupReader<IndexedRecord> fileGroupReader = HoodieFileGroupReader.<IndexedRecord>newBuilder()
244+
.withReaderContext(readerContext)
245+
.withHoodieTableMetaClient(HoodieCLI.getTableMetaClient())
246+
.withFileSlice(fileSlice)
247+
.withDataSchema(readerSchema)
248+
.withRequestedSchema(readerSchema)
249+
.withLatestCommitTime(client.getActiveTimeline().getCommitAndReplaceTimeline().lastInstant().map(HoodieInstant::requestedTime).orElse(HoodieInstantTimeGenerator.getCurrentInstantTimeStr()))
250+
.withProps(buildFileGroupReaderProperties())
251+
.withShouldUseRecordPosition(false)
252+
.build();
253+
ClosableIterator<IndexedRecord> recordIterator = fileGroupReader.getClosableIterator()) {
254+
recordIterator.forEachRemaining(record -> {
255+
if (allRecords.size() < limit) {
256+
allRecords.add(record);
257+
}
258+
});
253259
}
254260
} else {
255261
for (String logFile : logFilePaths) {
@@ -287,4 +293,24 @@ storage, new StoragePath(logFilePathPattern)).stream()
287293
}
288294
return HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_RECORDS}, rows);
289295
}
296+
297+
/**
298+
* Derive necessary properties for FG reader.
299+
*/
300+
private TypedProperties buildFileGroupReaderProperties() {
301+
TypedProperties props = new TypedProperties();
302+
props.setProperty(
303+
MAX_MEMORY_FOR_MERGE.key(),
304+
Long.toString(MAX_MEMORY_FOR_MERGE.defaultValue()));
305+
props.setProperty(
306+
SPILLABLE_MAP_BASE_PATH.key(),
307+
FileIOUtils.getDefaultSpillableMapBasePath());
308+
props.setProperty(
309+
SPILLABLE_DISK_MAP_TYPE.key(),
310+
SPILLABLE_DISK_MAP_TYPE.defaultValue().name());
311+
props.setProperty(
312+
DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
313+
Boolean.toString(DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()));
314+
return props;
315+
}
290316
}

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -203,13 +203,6 @@ protected HoodieTableMetaClient createMetaClient(boolean loadActiveTimelineOnLoa
203203
.setFileSystemRetryConfig(config.getFileSystemRetryConfig())
204204
.setMetaserverConfig(config.getProps()).build();
205205
}
206-
207-
/**
208-
* Returns next instant time in the correct format. An explicit Lock is enabled in the context.
209-
*/
210-
public String createNewInstantTime() {
211-
return TimelineUtils.generateInstantTime(true, timeGenerator);
212-
}
213206

214207
/**
215208
* Returns next instant time in the correct format.

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@
4646
import org.apache.hudi.common.model.WriteOperationType;
4747
import org.apache.hudi.common.table.HoodieTableConfig;
4848
import org.apache.hudi.common.table.HoodieTableMetaClient;
49-
import org.apache.hudi.common.table.HoodieTableVersion;
5049
import org.apache.hudi.common.table.TableSchemaResolver;
5150
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
5251
import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -1518,7 +1517,7 @@ protected void tryUpgrade(HoodieTableMetaClient metaClient, Option<String> insta
15181517
}
15191518

15201519
new UpgradeDowngrade(metaClient, config, context, upgradeDowngradeHelper)
1521-
.run(HoodieTableVersion.current(), instantTime.orElse(null));
1520+
.run(config.getWriteVersion(), instantTime.orElse(null));
15221521

15231522
metaClient.reloadTableConfig();
15241523
metaClient.reloadActiveTimeline();

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,8 @@ public class HoodieWriteConfig extends HoodieConfig {
146146
.defaultValue(HoodieTableVersion.current().versionCode())
147147
.withValidValues(
148148
String.valueOf(HoodieTableVersion.SIX.versionCode()),
149-
String.valueOf(HoodieTableVersion.current().versionCode())
149+
String.valueOf(HoodieTableVersion.EIGHT.versionCode()),
150+
String.valueOf(HoodieTableVersion.NINE.versionCode())
150151
)
151152
.sinceVersion("1.0.0")
152153
.withDocumentation("The table version this writer is storing the table in. This should match the current table version.");
@@ -839,7 +840,15 @@ public class HoodieWriteConfig extends HoodieConfig {
839840
.defaultValue(true)
840841
.markAdvanced()
841842
.sinceVersion("1.0.0")
842-
.withDocumentation("Whether to enable incremental table service. So far Clustering and Compaction support incremental processing.");
843+
.withDocumentation("Whether to enable incremental table service. "
844+
+ "So far Clustering and Compaction support incremental processing.");
845+
846+
public static final ConfigProperty<Boolean> TRACK_EVENT_TIME_WATERMARK = ConfigProperty
847+
.key("hoodie.write.track.event.time.watermark")
848+
.defaultValue(false)
849+
.markAdvanced()
850+
.sinceVersion("1.1.0")
851+
.withDocumentation("Records event time watermark metadata in commit metadata when enabled");
843852

844853
/**
845854
* Config key with boolean value that indicates whether record being written during MERGE INTO Spark SQL
@@ -1222,7 +1231,6 @@ public class HoodieWriteConfig extends HoodieConfig {
12221231
*/
12231232
@Deprecated
12241233
public static final String DEFAULT_EXTERNAL_RECORD_AND_SCHEMA_TRANSFORMATION = AVRO_EXTERNAL_SCHEMA_TRANSFORMATION_ENABLE.defaultValue();
1225-
12261234
/**
12271235
* Use Spark engine by default.
12281236
*/

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import java.util.Collections;
5454
import java.util.List;
5555
import java.util.Map;
56+
import java.util.Objects;
5657
import java.util.Set;
5758
import java.util.stream.Collectors;
5859

@@ -147,6 +148,10 @@ public HoodieCompactionPlan generateCompactionPlan(String compactionInstant) thr
147148
// for both OCC and NB-CC, this is in-correct.
148149
.filter(logFile -> completionTimeQueryView.isCompletedBefore(compactionInstant, logFile.getDeltaCommitTime()))
149150
.sorted(HoodieLogFile.getLogFileComparator()).collect(toList());
151+
if (logFiles.isEmpty()) {
152+
// compaction is not needed if there is no log file.
153+
return null;
154+
}
150155
totalLogFiles.add(logFiles.size());
151156
totalFileSlices.add(1L);
152157
// Avro generated classes are not inheriting Serializable. Using CompactionOperation POJO
@@ -155,7 +160,7 @@ public HoodieCompactionPlan generateCompactionPlan(String compactionInstant) thr
155160
Option<HoodieBaseFile> dataFile = s.getBaseFile();
156161
return new CompactionOperation(dataFile, partitionPath, logFiles,
157162
writeConfig.getCompactionStrategy().captureMetrics(writeConfig, s));
158-
}), partitionPaths.size()).stream()
163+
}).filter(Objects::nonNull), partitionPaths.size()).stream()
159164
.map(CompactionUtils::buildHoodieCompactionOperation).collect(toList());
160165

161166
LOG.info("Total of {} compaction operations are retrieved for table {}", operations.size(), hoodieTable.getConfig().getBasePath());
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hudi.table.upgrade;
20+
21+
import org.apache.hudi.common.config.ConfigProperty;
22+
import org.apache.hudi.common.engine.HoodieEngineContext;
23+
import org.apache.hudi.config.HoodieWriteConfig;
24+
25+
import java.util.Collections;
26+
import java.util.Map;
27+
28+
public class EightToNineUpgradeHandler implements UpgradeHandler {
29+
30+
@Override
31+
public Map<ConfigProperty, String> upgrade(HoodieWriteConfig config, HoodieEngineContext context,
32+
String instantTime, SupportsUpgradeDowngrade upgradeDowngradeHelper) {
33+
34+
return Collections.emptyMap();
35+
}
36+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.hudi.table.upgrade;
21+
22+
import org.apache.hudi.common.config.ConfigProperty;
23+
import org.apache.hudi.common.engine.HoodieEngineContext;
24+
import org.apache.hudi.common.util.collection.Pair;
25+
import org.apache.hudi.config.HoodieWriteConfig;
26+
27+
import java.util.Collections;
28+
import java.util.List;
29+
import java.util.Map;
30+
31+
public class NineToEightDowngradeHandler implements DowngradeHandler {
32+
33+
@Override
34+
public Pair<Map<ConfigProperty, String>, List<ConfigProperty>> downgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime, SupportsUpgradeDowngrade upgradeDowngradeHelper) {
35+
return Pair.of(Collections.emptyMap(), Collections.emptyList());
36+
}
37+
}

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,8 @@ protected Map<ConfigProperty, String> upgrade(HoodieTableVersion fromVersion, Ho
245245
return new SixToSevenUpgradeHandler().upgrade(config, context, instantTime, upgradeDowngradeHelper);
246246
} else if (fromVersion == HoodieTableVersion.SEVEN && toVersion == HoodieTableVersion.EIGHT) {
247247
return new SevenToEightUpgradeHandler().upgrade(config, context, instantTime, upgradeDowngradeHelper);
248+
} else if (fromVersion == HoodieTableVersion.EIGHT && toVersion == HoodieTableVersion.NINE) {
249+
return new EightToNineUpgradeHandler().upgrade(config, context, instantTime, upgradeDowngradeHelper);
248250
} else {
249251
throw new HoodieUpgradeDowngradeException(fromVersion.versionCode(), toVersion.versionCode(), true);
250252
}
@@ -267,6 +269,8 @@ protected Pair<Map<ConfigProperty, String>, List<ConfigProperty>> downgrade(Hood
267269
return new SevenToSixDowngradeHandler().downgrade(config, context, instantTime, upgradeDowngradeHelper);
268270
} else if (fromVersion == HoodieTableVersion.EIGHT && toVersion == HoodieTableVersion.SEVEN) {
269271
return new EightToSevenDowngradeHandler().downgrade(config, context, instantTime, upgradeDowngradeHelper);
272+
} else if (fromVersion == HoodieTableVersion.NINE && toVersion == HoodieTableVersion.EIGHT) {
273+
return new NineToEightDowngradeHandler().downgrade(config, context, instantTime, upgradeDowngradeHelper);
270274
} else {
271275
throw new HoodieUpgradeDowngradeException(fromVersion.versionCode(), toVersion.versionCode(), false);
272276
}

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngradeUtils.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -174,13 +174,13 @@ static void rollbackFailedWritesAndCompact(HoodieTable table, HoodieEngineContex
174174
properties.putAll(config.getProps());
175175
// TimeGenerators are cached and re-used based on table base path. Since here we are changing the lock configurations, avoiding the cache use
176176
// for upgrade code block.
177-
properties.put(HoodieTimeGeneratorConfig.TIME_GENERATOR_REUSE_ENABLE.key(),"false");
177+
properties.put(HoodieTimeGeneratorConfig.TIME_GENERATOR_REUSE_ENABLE.key(), "false");
178178
// override w/ NoopLock Provider to avoid re-entrant locking. already upgrade is happening within the table level lock.
179179
// Below we do trigger rollback and compaction which might again try to acquire the lock. So, here we are explicitly overriding to
180180
// NoopLockProvider for just the upgrade code block.
181181
properties.put(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), NoopLockProvider.class.getName());
182182
// if auto adjust it not disabled, chances that InProcessLockProvider will get overridden for single writer use-cases.
183-
properties.put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(),"false");
183+
properties.put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "false");
184184
HoodieWriteConfig rollbackWriteConfig = HoodieWriteConfig.newBuilder()
185185
.withProps(properties)
186186
.withWriteTableVersion(tableVersion.versionCode())

0 commit comments

Comments
 (0)