Skip to content

Commit 266ca32

Browse files
Davis-Zhang-Onehouserahil-c
authored andcommitted
[MINOR] TableChanges class throw schema evolution exception (apache#13581)
1 parent ce60cf3 commit 266ca32

File tree

3 files changed

+33
-29
lines changed

3 files changed

+33
-29
lines changed

hudi-common/src/main/java/org/apache/hudi/internal/schema/action/TableChanges.java

Lines changed: 25 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.hudi.internal.schema.action;
2020

21+
import org.apache.hudi.exception.SchemaCompatibilityException;
2122
import org.apache.hudi.internal.schema.HoodieSchemaException;
2223
import org.apache.hudi.internal.schema.InternalSchema;
2324
import org.apache.hudi.internal.schema.InternalSchemaBuilder;
@@ -83,20 +84,21 @@ public Map<Integer, Types.Field> getUpdates() {
8384
* @param name name of the column to update
8485
* @param newType new type for the column
8586
* @return this
86-
* @throws IllegalArgumentException
87+
* @throws SchemaCompatibilityException
8788
*/
8889
public ColumnUpdateChange updateColumnType(String name, Type newType) {
8990
checkColModifyIsLegal(name);
9091
if (newType.isNestedType()) {
91-
throw new IllegalArgumentException(String.format("only support update primitive type but find nest column: %s", name));
92+
throw new SchemaCompatibilityException(String.format("Cannot update column '%s' to nested type '%s'.", name, newType));
9293
}
9394
Types.Field field = internalSchema.findField(name);
9495
if (field == null) {
95-
throw new IllegalArgumentException(String.format("cannot update a missing column: %s", name));
96+
throw new SchemaCompatibilityException(String.format("Cannot update type for column '%s' because it does not exist in the schema", name));
9697
}
9798

9899
if (!SchemaChangeUtils.isTypeUpdateAllow(field.type(), newType)) {
99-
throw new IllegalArgumentException(String.format("cannot update origin type: %s to a incompatibility type: %s", field.type(), newType));
100+
throw new SchemaCompatibilityException(String.format(
101+
"Cannot update column '%s' from type '%s' to incompatible type '%s'.", name, field.type(), newType));
100102
}
101103

102104
if (field.type().equals(newType)) {
@@ -119,13 +121,13 @@ public ColumnUpdateChange updateColumnType(String name, Type newType) {
119121
* @param name name of the column to update
120122
* @param newDoc new documentation for the column
121123
* @return this
122-
* @throws IllegalArgumentException
124+
* @throws SchemaCompatibilityException
123125
*/
124126
public ColumnUpdateChange updateColumnComment(String name, String newDoc) {
125127
checkColModifyIsLegal(name);
126128
Types.Field field = internalSchema.findField(name);
127129
if (field == null) {
128-
throw new IllegalArgumentException(String.format("cannot update a missing column: %s", name));
130+
throw new SchemaCompatibilityException(String.format("Cannot update comment for column '%s' because it does not exist in the schema", name));
129131
}
130132
// consider null
131133
if (Objects.equals(field.doc(), newDoc)) {
@@ -148,19 +150,19 @@ public ColumnUpdateChange updateColumnComment(String name, String newDoc) {
148150
* @param name name of the column to rename
149151
* @param newName new name for the column
150152
* @return this
151-
* @throws IllegalArgumentException
153+
* @throws SchemaCompatibilityException
152154
*/
153155
public ColumnUpdateChange renameColumn(String name, String newName) {
154156
checkColModifyIsLegal(name);
155157
Types.Field field = internalSchema.findField(name);
156158
if (field == null) {
157-
throw new IllegalArgumentException(String.format("cannot update a missing column: %s", name));
159+
throw new SchemaCompatibilityException(String.format("Cannot rename column '%s' because it does not exist in the schema", name));
158160
}
159161
if (newName == null || newName.isEmpty()) {
160-
throw new IllegalArgumentException(String.format("cannot rename column: %s to empty", name));
162+
throw new SchemaCompatibilityException(String.format("Cannot rename column '%s' to empty or null name. New name must be non-empty", name));
161163
}
162164
if (internalSchema.hasColumn(newName, caseSensitive)) {
163-
throw new IllegalArgumentException(String.format("cannot rename column: %s to a existing name", name));
165+
throw new SchemaCompatibilityException(String.format("Cannot rename column '%s' to '%s' because a column with name '%s' already exists in the schema", name, newName, newName));
164166
}
165167
// save update info
166168
Types.Field update = updates.get(field.fieldId());
@@ -179,7 +181,7 @@ public ColumnUpdateChange renameColumn(String name, String newName) {
179181
* @param name name of the column to update
180182
* @param nullable nullable for updated name
181183
* @return this
182-
* @throws IllegalArgumentException
184+
* @throws SchemaCompatibilityException
183185
*/
184186
public ColumnUpdateChange updateColumnNullability(String name, boolean nullable) {
185187
return updateColumnNullability(name, nullable, false);
@@ -189,14 +191,15 @@ public ColumnUpdateChange updateColumnNullability(String name, boolean nullable,
189191
checkColModifyIsLegal(name);
190192
Types.Field field = internalSchema.findField(name);
191193
if (field == null) {
192-
throw new IllegalArgumentException(String.format("cannot update a missing column: %s", name));
194+
throw new SchemaCompatibilityException(String.format("Cannot update nullability for column '%s' because it does not exist in the schema", name));
193195
}
194196
if (field.isOptional() == nullable) {
195197
// do nothings
196198
return this;
197199
}
198200
if (field.isOptional() && !nullable && !force) {
199-
throw new IllegalArgumentException("cannot update column Nullability: optional to required");
201+
throw new SchemaCompatibilityException(String.format(
202+
"Cannot change column '%s' from optional to required. This would break compatibility with existing data that may contain null values", name));
200203
}
201204
// save update info
202205
Types.Field update = updates.get(field.fieldId());
@@ -224,7 +227,7 @@ protected Integer findIdByFullName(String fullName) {
224227
if (field != null) {
225228
return field.fieldId();
226229
} else {
227-
throw new IllegalArgumentException(String.format("cannot find col id for given column fullName: %s", fullName));
230+
throw new SchemaCompatibilityException(String.format("Cannot find column ID for column path '%s'. The column may not exist in the schema", fullName));
228231
}
229232
}
230233

@@ -239,7 +242,7 @@ public static ColumnUpdateChange get(InternalSchema schema, boolean caseSensitiv
239242

240243
/** Deal with delete columns changes for table. */
241244
public static class ColumnDeleteChange extends TableChange.BaseColumnChange {
242-
private final Set deletes = new HashSet<>();
245+
private final Set<Integer> deletes = new HashSet<>();
243246

244247
@Override
245248
public ColumnChangeID columnChangeId() {
@@ -268,7 +271,7 @@ public ColumnDeleteChange deleteColumn(String name) {
268271
checkColModifyIsLegal(name);
269272
Types.Field field = internalSchema.findField(name);
270273
if (field == null) {
271-
throw new IllegalArgumentException(String.format("cannot delete missing columns: %s", name));
274+
throw new SchemaCompatibilityException(String.format("Cannot delete column '%s' because it does not exist in the schema", name));
272275
}
273276
deletes.add(field.fieldId());
274277
return this;
@@ -335,25 +338,25 @@ private void addColumnsInternal(String parent, String name, Type type, String do
335338
if (!parent.isEmpty()) {
336339
Types.Field parentField = internalSchema.findField(parent);
337340
if (parentField == null) {
338-
throw new HoodieSchemaException(String.format("cannot add column: %s which parent: %s is not exist", name, parent));
341+
throw new HoodieSchemaException(String.format("Cannot add column '%s' because its parent column '%s' does not exist in the schema", name, parent));
339342
}
340-
Type parentType = parentField.type();
341343
if (!(parentField.type() instanceof Types.RecordType)) {
342-
throw new HoodieSchemaException("only support add nested columns to struct column");
344+
throw new HoodieSchemaException(String.format(
345+
"Cannot add nested column '%s' to parent '%s' of type '%s'. Nested columns can only be added to struct/record types", name, parent, parentField.type()));
343346
}
344347
parentId = parentField.fieldId();
345348
Types.Field newParentField = internalSchema.findField(parent + "." + name);
346349
if (newParentField != null) {
347-
throw new HoodieSchemaException(String.format("cannot add column: %s which already exist", name));
350+
throw new HoodieSchemaException(String.format("Cannot add column '%s' to parent '%s' because the column already exists at path '%s'", name, parent, parent + "." + name));
348351
}
349352
fullName = parent + "." + name;
350353
} else {
351354
if (internalSchema.hasColumn(name, caseSensitive)) {
352-
throw new HoodieSchemaException(String.format("cannot add column: %s which already exist", name));
355+
throw new HoodieSchemaException(String.format("Cannot add column '%s' because it already exists in the schema", name));
353356
}
354357
}
355358
if (fullColName2Id.containsKey(fullName)) {
356-
throw new HoodieSchemaException(String.format("cannot repeat add column: %s", name));
359+
throw new HoodieSchemaException(String.format("Cannot add column '%s' multiple times. Column at path '%s' has already been added in this change set", name, fullName));
357360
}
358361
fullColName2Id.put(fullName, nextId);
359362
if (parentId != -1) {

hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolutionBySQL.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.hudi.adapter.TestHoodieCatalogs;
2222
import org.apache.hudi.exception.HoodieCatalogException;
2323
import org.apache.hudi.exception.HoodieNotSupportedException;
24+
import org.apache.hudi.exception.SchemaCompatibilityException;
2425
import org.apache.hudi.utils.FlinkMiniCluster;
2526
import org.apache.hudi.utils.TestTableEnvs;
2627

@@ -191,8 +192,9 @@ void testIllegalModifyColumnType() {
191192
TableException.class,
192193
() -> tableEnv.executeSql(alterSql),
193194
"Should throw exception when the type update is not allowed ");
194-
assertTrue(e.getCause() instanceof IllegalArgumentException);
195-
assertTrue(e.getCause().getMessage().contains("cannot update origin type: string to a incompatibility type: int"));
195+
assertTrue(e.getCause() instanceof SchemaCompatibilityException);
196+
assertTrue(e.getCause().getMessage().contains("Cannot update column 'f_str' from type 'string' to incompatible type 'int'."),
197+
e.getCause().getMessage());
196198
}
197199

198200
@Test

hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -280,11 +280,10 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
280280
}
281281
// check duplicate add or rename
282282
// keep consistent with hive, column names insensitive
283-
checkExceptions(s"alter table $tableName rename column col0 to col9")(Seq("cannot rename column: col0 to a existing name",
284-
"Cannot rename column, because col9 already exists in root"))
285-
checkExceptions(s"alter table $tableName rename column col0 to COL9")(Seq("cannot rename column: col0 to a existing name", "Cannot rename column, because COL9 already exists in root"))
286-
checkExceptions(s"alter table $tableName add columns(col9 int first)")(Seq("cannot add column: col9 which already exist", "Cannot add column, because col9 already exists in root"))
287-
checkExceptions(s"alter table $tableName add columns(COL9 int first)")(Seq("cannot add column: COL9 which already exist", "Cannot add column, because COL9 already exists in root"))
283+
checkExceptions(s"alter table $tableName rename column col0 to col9")(Seq("Cannot rename column 'col0' to 'col9' because a column with name 'col9' already exists in the schema"))
284+
checkExceptions(s"alter table $tableName rename column col0 to COL9")(Seq("Cannot rename column 'col0' to 'COL9' because a column with name 'COL9' already exists in the schema"))
285+
checkExceptions(s"alter table $tableName add columns(col9 int first)")(Seq("Cannot add column 'col9' because it already exists in the schema"))
286+
checkExceptions(s"alter table $tableName add columns(COL9 int first)")(Seq("Cannot add column 'COL9' because it already exists in the schema"))
288287
// test add comment for columns / alter columns comment
289288
spark.sql(s"alter table $tableName add columns(col1_new int comment 'add new columns col1_new after id' after id)")
290289
spark.sql(s"alter table $tableName alter column col9 comment 'col9 desc'")

0 commit comments

Comments
 (0)