Skip to content

Commit 9e7fba4

Browse files
authored
[HUDI-9576] Test schema evolution in fg reader (#13549)
- Test schema evolution with File group reader across engines. For spark, all cases are green. For other record formats, some tests are disabled, but follow up patches will fix them. --------- Co-authored-by: Jonathan Vexler <=>
1 parent be216f4 commit 9e7fba4

File tree

10 files changed

+1194
-58
lines changed

10 files changed

+1194
-58
lines changed

hudi-client/hudi-java-client/src/test/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderOnJavaTestBase.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.apache.hudi.common.model.HoodieRecord;
2828
import org.apache.hudi.common.model.HoodieRecordPayload;
2929
import org.apache.hudi.common.table.HoodieTableMetaClient;
30-
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
3130
import org.apache.hudi.common.util.Option;
3231
import org.apache.hudi.config.HoodieWriteConfig;
3332
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
@@ -60,13 +59,13 @@ public String getCustomPayload() {
6059
}
6160

6261
@Override
63-
public void commitToTable(List<HoodieRecord> recordList, String operation, Map<String, String> writeConfigs) {
62+
public void commitToTable(List<HoodieRecord> recordList, String operation, boolean firstCommit, Map<String, String> writeConfigs, String schemaStr) {
6463
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
6564
.withEngineType(EngineType.JAVA)
6665
.withEmbeddedTimelineServerEnabled(false)
6766
.withProps(writeConfigs)
6867
.withPath(getBasePath())
69-
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
68+
.withSchema(schemaStr)
7069
.build();
7170

7271
HoodieJavaClientTestHarness.TestJavaTaskContextSupplier taskContextSupplier = new HoodieJavaClientTestHarness.TestJavaTaskContextSupplier();
@@ -75,8 +74,7 @@ public void commitToTable(List<HoodieRecord> recordList, String operation, Map<S
7574
StoragePath basePath = new StoragePath(getBasePath());
7675
try (HoodieStorage storage = new HoodieHadoopStorage(basePath, getStorageConf())) {
7776
boolean basepathExists = storage.exists(basePath);
78-
boolean operationIsInsert = operation.equalsIgnoreCase("insert");
79-
if (!basepathExists || operationIsInsert) {
77+
if (!basepathExists || firstCommit) {
8078
if (basepathExists) {
8179
storage.deleteDirectory(basePath);
8280
}
@@ -108,6 +106,8 @@ public void commitToTable(List<HoodieRecord> recordList, String operation, Map<S
108106
recordList.forEach(hoodieRecord -> recordsCopy.add(new HoodieAvroRecord<>(hoodieRecord.getKey(), (HoodieRecordPayload) hoodieRecord.getData())));
109107
if (operation.toLowerCase().equals("insert")) {
110108
writeClient.commit(instantTime, writeClient.insert(recordsCopy, instantTime), Option.empty(), DELTA_COMMIT_ACTION, Collections.emptyMap());
109+
} else if (operation.toLowerCase().equals("bulkInsert")) {
110+
writeClient.commit(instantTime, writeClient.bulkInsert(recordsCopy, instantTime), Option.empty(), DELTA_COMMIT_ACTION, Collections.emptyMap());
111111
} else {
112112
writeClient.commit(instantTime, writeClient.upsert(recordsCopy, instantTime), Option.empty(), DELTA_COMMIT_ACTION, Collections.emptyMap());
113113
}

hudi-client/hudi-java-client/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnJava.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@
1919

2020
package org.apache.hudi.common.table.read;
2121

22+
import org.apache.hudi.avro.ConvertingGenericData;
2223
import org.apache.hudi.avro.HoodieAvroReaderContext;
2324
import org.apache.hudi.common.engine.HoodieReaderContext;
2425
import org.apache.hudi.common.table.HoodieTableMetaClient;
26+
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
2527
import org.apache.hudi.common.util.Option;
2628
import org.apache.hudi.storage.StorageConfiguration;
2729
import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
@@ -30,6 +32,7 @@
3032
import org.apache.avro.generic.IndexedRecord;
3133

3234
import static org.junit.jupiter.api.Assertions.assertEquals;
35+
import static org.junit.jupiter.api.Assertions.assertTrue;
3336

3437
public class TestHoodieFileGroupReaderOnJava extends HoodieFileGroupReaderOnJavaTestBase<IndexedRecord> {
3538
private static final StorageConfiguration<?> STORAGE_CONFIGURATION = new HadoopStorageConfiguration(false);
@@ -49,4 +52,16 @@ public HoodieReaderContext<IndexedRecord> getHoodieReaderContext(
4952
public void assertRecordsEqual(Schema schema, IndexedRecord expected, IndexedRecord actual) {
5053
assertEquals(expected, actual);
5154
}
55+
56+
@Override
57+
public void assertRecordMatchesSchema(Schema schema, IndexedRecord record) {
58+
assertTrue(ConvertingGenericData.INSTANCE.validate(schema, record));
59+
}
60+
61+
@Override
62+
public HoodieTestDataGenerator.SchemaEvolutionConfigs getSchemaEvolutionConfigs() {
63+
HoodieTestDataGenerator.SchemaEvolutionConfigs configs = new HoodieTestDataGenerator.SchemaEvolutionConfigs();
64+
configs.addNewFieldSupport = false;
65+
return configs;
66+
}
5267
}

hudi-client/hudi-java-client/src/test/java/org/apache/hudi/hadoop/TestHoodieFileGroupReaderOnHive.java

Lines changed: 40 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -38,16 +38,17 @@
3838
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
3939
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
4040
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
41+
import org.apache.hadoop.hive.serde2.SerDeException;
42+
import org.apache.hadoop.hive.serde2.avro.HiveTypeUtils;
4143
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
42-
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
4344
import org.apache.hadoop.io.ArrayWritable;
4445
import org.apache.hadoop.mapred.JobConf;
4546
import org.junit.jupiter.api.AfterAll;
4647
import org.junit.jupiter.api.BeforeAll;
4748

4849
import java.io.IOException;
4950
import java.util.List;
50-
import java.util.Map;
51+
import java.util.Locale;
5152
import java.util.stream.Collectors;
5253

5354
import static org.apache.hudi.hadoop.HoodieFileGroupReaderBasedRecordReader.getStoredPartitionFieldNames;
@@ -105,23 +106,53 @@ public void assertRecordsEqual(Schema schema, ArrayWritable expected, ArrayWrita
105106
ArrayWritableTestUtil.assertArrayWritableEqual(schema, expected, actual, false);
106107
}
107108

109+
@Override
110+
public void assertRecordMatchesSchema(Schema schema, ArrayWritable record) {
111+
ArrayWritableTestUtil.assertArrayWritableMatchesSchema(schema, record);
112+
}
113+
114+
@Override
115+
public HoodieTestDataGenerator.SchemaEvolutionConfigs getSchemaEvolutionConfigs() {
116+
HoodieTestDataGenerator.SchemaEvolutionConfigs configs = new HoodieTestDataGenerator.SchemaEvolutionConfigs();
117+
configs.nestedSupport = false;
118+
configs.arraySupport = false;
119+
configs.mapSupport = false;
120+
configs.addNewFieldSupport = false;
121+
configs.intToLongSupport = false;
122+
configs.intToFloatSupport = false;
123+
configs.intToDoubleSupport = false;
124+
configs.intToStringSupport = false;
125+
configs.longToFloatSupport = false;
126+
configs.longToDoubleSupport = false;
127+
configs.longToStringSupport = false;
128+
configs.floatToDoubleSupport = false;
129+
configs.floatToStringSupport = false;
130+
configs.doubleToStringSupport = false;
131+
configs.stringToBytesSupport = false;
132+
configs.bytesToStringSupport = false;
133+
return configs;
134+
}
135+
108136
private void setupJobconf(JobConf jobConf, Schema schema) {
109137
List<Schema.Field> fields = schema.getFields();
110138
setHiveColumnNameProps(fields, jobConf, USE_FAKE_PARTITION);
111-
List<TypeInfo> types = TypeInfoUtils.getTypeInfosFromTypeString(HoodieTestDataGenerator.TRIP_HIVE_COLUMN_TYPES);
112-
Map<String, String> typeMappings = HoodieTestDataGenerator.AVRO_SCHEMA.getFields().stream().collect(Collectors.toMap(Schema.Field::name, field -> types.get(field.pos()).getTypeName()));
113-
String columnTypes = fields.stream().map(field -> typeMappings.getOrDefault(field.name(), "string")).collect(Collectors.joining(","));
114-
jobConf.set("columns.types", columnTypes + ",string");
139+
try {
140+
String columnTypes = HiveTypeUtils.generateColumnTypes(schema).stream().map(TypeInfo::getTypeName).collect(Collectors.joining(","));
141+
jobConf.set("columns.types", columnTypes + ",string");
142+
} catch (SerDeException e) {
143+
throw new RuntimeException(e);
144+
}
115145
}
116146

117147
private void setHiveColumnNameProps(List<Schema.Field> fields, JobConf jobConf, boolean isPartitioned) {
118-
String names = fields.stream().map(Schema.Field::name).collect(Collectors.joining(","));
148+
String names = fields.stream().map(Schema.Field::name).map(s -> s.toLowerCase(Locale.ROOT)).collect(Collectors.joining(","));
119149
String positions = fields.stream().map(f -> String.valueOf(f.pos())).collect(Collectors.joining(","));
120150
jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names);
121151
jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, positions);
122152

123-
String hiveOrderedColumnNames = fields.stream().filter(field -> !field.name().equalsIgnoreCase(PARTITION_COLUMN))
124-
.map(Schema.Field::name).collect(Collectors.joining(","));
153+
String hiveOrderedColumnNames = fields.stream().map(Schema.Field::name)
154+
.filter(name -> !name.equalsIgnoreCase(PARTITION_COLUMN))
155+
.map(s -> s.toLowerCase(Locale.ROOT)).collect(Collectors.joining(","));
125156
if (isPartitioned) {
126157
hiveOrderedColumnNames += "," + PARTITION_COLUMN;
127158
jobConf.set(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, PARTITION_COLUMN);

hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/ArrayWritableTestUtil.java

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,4 +191,107 @@ private static void assertWritablePrimaryType(Schema schema, Writable expected,
191191
assertEquals(expected, actual);
192192
}
193193
}
194+
195+
public static void assertArrayWritableMatchesSchema(Schema schema, Writable writable) {
196+
switch (schema.getType()) {
197+
case RECORD: {
198+
assertInstanceOf(ArrayWritable.class, writable);
199+
ArrayWritable arrayWritable = (ArrayWritable) writable;
200+
assertEquals(schema.getFields().size(), arrayWritable.get().length);
201+
for (Schema.Field field : schema.getFields()) {
202+
assertArrayWritableMatchesSchema(field.schema(), arrayWritable.get()[field.pos()]);
203+
}
204+
break;
205+
}
206+
case ARRAY: {
207+
assertInstanceOf(ArrayWritable.class, writable);
208+
ArrayWritable arrayWritable = (ArrayWritable) writable;
209+
for (int i = 0; i < arrayWritable.get().length; i++) {
210+
assertArrayWritableMatchesSchema(schema.getElementType(), arrayWritable.get()[i]);
211+
}
212+
break;
213+
}
214+
case MAP: {
215+
assertInstanceOf(ArrayWritable.class, writable);
216+
ArrayWritable arrayWritable = (ArrayWritable) writable;
217+
for (int i = 0; i < arrayWritable.get().length; i++) {
218+
Writable expectedKV = arrayWritable.get()[i];
219+
assertInstanceOf(ArrayWritable.class, expectedKV);
220+
ArrayWritable kv = (ArrayWritable) expectedKV;
221+
assertEquals(2, kv.get().length);
222+
assertNotNull(kv.get()[0]);
223+
assertArrayWritableMatchesSchema(schema.getValueType(), kv.get()[1]);
224+
}
225+
break;
226+
}
227+
case UNION:
228+
if (schema.getTypes().size() == 2
229+
&& schema.getTypes().get(0).getType() == Schema.Type.NULL) {
230+
assertArrayWritableMatchesSchema(schema.getTypes().get(1), writable);
231+
} else if (schema.getTypes().size() == 2
232+
&& schema.getTypes().get(1).getType() == Schema.Type.NULL) {
233+
assertArrayWritableMatchesSchema(schema.getTypes().get(0), writable);
234+
} else if (schema.getTypes().size() == 1) {
235+
assertArrayWritableMatchesSchema(schema.getTypes().get(0), writable);
236+
} else {
237+
throw new IllegalStateException("Union has more than 2 types or one type is not null: " + schema);
238+
}
239+
break;
240+
241+
default:
242+
assertWritablePrimaryTypeMatchesSchema(schema, writable);
243+
}
244+
}
245+
246+
private static void assertWritablePrimaryTypeMatchesSchema(Schema schema, Writable writable) {
247+
switch (schema.getType()) {
248+
case NULL:
249+
assertInstanceOf(NullWritable.class, writable);
250+
break;
251+
252+
case BOOLEAN:
253+
assertInstanceOf(BooleanWritable.class, writable);
254+
break;
255+
256+
case INT:
257+
if (schema.getLogicalType() instanceof LogicalTypes.Date) {
258+
assertInstanceOf(DateWritable.class, writable);
259+
} else {
260+
assertInstanceOf(IntWritable.class, writable);
261+
}
262+
break;
263+
264+
case LONG:
265+
assertInstanceOf(LongWritable.class, writable);
266+
break;
267+
268+
case FLOAT:
269+
assertInstanceOf(FloatWritable.class, writable);
270+
break;
271+
272+
case DOUBLE:
273+
assertInstanceOf(DoubleWritable.class, writable);
274+
break;
275+
276+
case BYTES:
277+
case ENUM:
278+
assertInstanceOf(BytesWritable.class, writable);
279+
break;
280+
281+
case STRING:
282+
assertInstanceOf(Text.class, writable);
283+
break;
284+
285+
case FIXED:
286+
if (schema.getLogicalType() instanceof LogicalTypes.Decimal) {
287+
assertInstanceOf(HiveDecimalWritable.class, writable);
288+
} else {
289+
throw new IllegalStateException("Unexpected schema type: " + schema);
290+
}
291+
break;
292+
293+
default:
294+
throw new IllegalStateException("Unexpected schema type: " + schema);
295+
}
296+
}
194297
}

0 commit comments

Comments
 (0)