Skip to content

Commit d913ec1

Browse files
committed
[Bugfix] Can't load data to StarRocks json column (StarRocks#161)
Can't map a Flink String column(json string) to StarRocks JSON column. The reason is that StarRocksDynamicSinkFunctionV2 does not set StarRocksTableRowTransformer#columns, and StarRocksTableRowTransformer#typeConvertion will not check whether the received data is a json string, and can't map it to StarRocks JSON.
1 parent 2ad589a commit d913ec1

File tree

3 files changed

+14
-4
lines changed

3 files changed

+14
-4
lines changed

src/main/java/com/starrocks/connector/flink/manager/StarRocksSinkTable.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22

33
import com.starrocks.connector.flink.connection.StarRocksJdbcConnectionOptions;
44
import com.starrocks.connector.flink.connection.StarRocksJdbcConnectionProvider;
5+
import com.starrocks.connector.flink.table.StarRocksDataType;
56
import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
6-
77
import org.apache.flink.table.api.TableColumn;
88
import org.apache.flink.table.api.TableSchema;
99
import org.apache.flink.table.api.constraints.UniqueConstraint;
@@ -55,6 +55,10 @@ public boolean isOpAutoProjectionInJson() {
5555
return version == null || version.length() > 0 && !version.trim().startsWith("1.");
5656
}
5757

58+
public Map<String, StarRocksDataType> getFieldMapping() {
59+
return starRocksQueryVisitor.getFieldMapping();
60+
}
61+
5862
public void validateTableStructure(StarRocksSinkOptions sinkOptions, TableSchema flinkSchema) {
5963
if (flinkSchema == null) {
6064
return;

src/main/java/com/starrocks/connector/flink/row/sink/StarRocksTableRowTransformer.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,14 @@
1414

1515
package com.starrocks.connector.flink.row.sink;
1616

17-
import com.starrocks.connector.flink.table.StarRocksDataType;
18-
1917
import com.alibaba.fastjson.JSON;
2018
import com.alibaba.fastjson.serializer.JSONSerializer;
2119
import com.alibaba.fastjson.serializer.ObjectSerializer;
2220
import com.alibaba.fastjson.serializer.SerializeConfig;
2321
import com.alibaba.fastjson.serializer.SerializeWriter;
2422
import com.google.common.collect.Lists;
2523
import com.google.common.collect.Maps;
24+
import com.starrocks.connector.flink.table.StarRocksDataType;
2625
import org.apache.flink.api.common.functions.RuntimeContext;
2726
import org.apache.flink.api.common.typeinfo.TypeInformation;
2827
import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -140,6 +139,12 @@ private Object typeConvertion(LogicalType type, RowData record, int pos) {
140139
if ((starRocksDataType == StarRocksDataType.JSON ||
141140
starRocksDataType == StarRocksDataType.UNKNOWN)
142141
&& (sValue.charAt(0) == '{' || sValue.charAt(0) == '[')) {
142+
// The json string need to be converted to a json object, and to the json string
143+
// again via JSON.toJSONString in StarRocksJsonSerializer#serialize. Otherwise,
144+
// the final json string in stream load will not be correct. For example, the received
145+
// string is "{"a": 1, "b": 2}", and if input it to JSON.toJSONString directly, the
146+
// result will be "{\"a\": 1, \"b\": 2}" which will not be recognized as a json in
147+
// StarRocks
143148
return JSON.parse(sValue);
144149
}
145150
return sValue;

src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunctionV2.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,14 +64,15 @@ public StarRocksDynamicSinkFunctionV2(StarRocksSinkOptions sinkOptions,
6464
StarRocksIRowTransformer<T> rowTransformer) {
6565
this.sinkOptions = sinkOptions;
6666
this.rowTransformer = rowTransformer;
67-
rowTransformer.setTableSchema(schema);
6867
StarRocksSinkTable sinkTable = StarRocksSinkTable.builder()
6968
.sinkOptions(sinkOptions)
7069
.build();
7170
sinkTable.validateTableStructure(sinkOptions, schema);
7271
// StarRocksJsonSerializer depends on SinkOptions#supportUpsertDelete which is decided in
7372
// StarRocksSinkTable#validateTableStructure, so create serializer after validating table structure
7473
this.serializer = StarRocksSerializerFactory.createSerializer(sinkOptions, schema.getFieldNames());
74+
rowTransformer.setStarRocksColumns(sinkTable.getFieldMapping());
75+
rowTransformer.setTableSchema(schema);
7576
this.sinkManager = new StarRocksSinkManagerV2(sinkOptions.getProperties());
7677
}
7778

0 commit comments

Comments
 (0)