Skip to content

Commit 862d291

Browse files
committed
Applied feedback from PR
1 parent 865ad2c commit 862d291

File tree

5 files changed

+26
-21
lines changed

5 files changed

+26
-21
lines changed

incubator/binding-pgsql-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/pgsql/kafka/schema/pgsql.kafka.schema.patch.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
"routes": false,
3737
"required":
3838
[
39+
"catalog",
3940
"exit"
4041
]
4142
}

incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/PgsqlKafkaConfiguration.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public class PgsqlKafkaConfiguration extends Configuration
3131
KAFKA_TOPIC_REQUEST_TIMEOUT_MS = config.property("kafka.topic.request.timeout.ms", 30000);
3232
KAFKA_CREATE_TOPICS_PARTITION_COUNT = config.property("kafka.create.topics.partition.count", 1);
3333
KAFKA_CREATE_TOPICS_REPLICAS = config.property("kafka.create.topics.replicas", (short) 1);
34-
KAFKA_AVRO_SCHEMA_NAMESPACE = config.property("kafka.avro.schema.namespace", "io.aklivity.zilla");
34+
KAFKA_AVRO_SCHEMA_NAMESPACE = config.property("kafka.avro.schema.namespace", "{database}");
3535
PGSQL_KAFKA_CONFIG = config;
3636
}
3737

incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/schema/PgsqlKafkaValueAvroSchemaTemplate.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222

2323
public class PgsqlKafkaValueAvroSchemaTemplate extends PgsqlKafkaAvroSchemaTemplate
2424
{
25+
private static final String DATABASE_PLACEHOLDER = "{database}";
26+
2527
private final StringBuilder schemaBuilder = new StringBuilder();
2628
private final String namespace;
2729

@@ -37,12 +39,14 @@ public String generateSchema(
3739
{
3840
schemaBuilder.setLength(0);
3941

42+
final String newNamespace = namespace.replace(DATABASE_PLACEHOLDER, database);
43+
4044
final String recordName = String.format("%s.%s", database, createTable.getTable().getName());
4145

4246
schemaBuilder.append("{\n");
4347
schemaBuilder.append("\"type\": \"record\",\n");
4448
schemaBuilder.append("\"name\": \"").append(recordName).append("\",\n");
45-
schemaBuilder.append("\"namespace\": \"").append(namespace).append("\",\n");
49+
schemaBuilder.append("\"namespace\": \"").append(newNamespace).append("\",\n");
4650
schemaBuilder.append("\"fields\": [\n");
4751

4852
for (ColumnDefinition column : createTable.getColumnDefinitions())

incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaCommandType.java

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@
1515
package io.aklivity.zilla.runtime.binding.pgsql.kafka.internal.stream;
1616

1717
import java.util.Arrays;
18+
import java.util.Map;
19+
20+
import org.agrona.collections.Object2ObjectHashMap;
1821

1922
public enum PgsqlKafkaCommandType
2023
{
@@ -23,6 +26,16 @@ public enum PgsqlKafkaCommandType
2326

2427
private final byte[] value;
2528

29+
private static final Map<String, PgsqlKafkaCommandType> COMMAND_MAP = new Object2ObjectHashMap<>();
30+
31+
static
32+
{
33+
for (PgsqlKafkaCommandType commandType : PgsqlKafkaCommandType.values())
34+
{
35+
COMMAND_MAP.put(Arrays.toString(commandType.value), commandType);
36+
}
37+
}
38+
2639
PgsqlKafkaCommandType(byte[] value)
2740
{
2841
this.value = value;
@@ -33,21 +46,8 @@ public byte[] value()
3346
return value;
3447
}
3548

36-
public static PgsqlKafkaCommandType valueOf(
37-
byte[] value)
49+
public static PgsqlKafkaCommandType valueOf(byte[] value)
3850
{
39-
PgsqlKafkaCommandType command = UNKNOWN_COMMAND;
40-
41-
command:
42-
for (PgsqlKafkaCommandType commandType : PgsqlKafkaCommandType.values())
43-
{
44-
if (Arrays.equals(commandType.value, value))
45-
{
46-
command = commandType;
47-
break command;
48-
}
49-
}
50-
51-
return command;
51+
return COMMAND_MAP.getOrDefault(Arrays.toString(value), UNKNOWN_COMMAND);
5252
}
5353
}

incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaProxyFactory.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -135,8 +135,8 @@ public final class PgsqlKafkaProxyFactory implements PgsqlKafkaStreamFactory
135135
{
136136
Object2ObjectHashMap<PgsqlKafkaCommandType, PgsqlDecoder> pgsqlDecoder =
137137
new Object2ObjectHashMap<>();
138-
pgsqlDecoder.put(PgsqlKafkaCommandType.CREATE_TOPIC_COMMAND, this::onDecodeCreateTopicCommand);
139-
pgsqlDecoder.put(PgsqlKafkaCommandType.UNKNOWN_COMMAND, this::onDecodeUnknownCommand);
138+
pgsqlDecoder.put(PgsqlKafkaCommandType.CREATE_TOPIC_COMMAND, this::decodeCreateTopicCommand);
139+
pgsqlDecoder.put(PgsqlKafkaCommandType.UNKNOWN_COMMAND, this::decodeUnknownCommand);
140140
this.pgsqlDecoder = pgsqlDecoder;
141141
}
142142

@@ -1319,7 +1319,7 @@ private MessageConsumer newKafkaConsumer(
13191319
return receiver;
13201320
}
13211321

1322-
private void onDecodeCreateTopicCommand(
1322+
private void decodeCreateTopicCommand(
13231323
PgsqlProxy server,
13241324
long traceId,
13251325
long authorization,
@@ -1357,7 +1357,7 @@ else if (server.commandsProcessed == 1)
13571357
}
13581358
}
13591359

1360-
private void onDecodeUnknownCommand(
1360+
private void decodeUnknownCommand(
13611361
PgsqlProxy server,
13621362
long traceId,
13631363
long authorization,

0 commit comments

Comments
 (0)