Skip to content

Commit 8ea8b04

Browse files
committed
Verify schema registry
1 parent 862d291 commit 8ea8b04

File tree

4 files changed

+43
-9
lines changed

4 files changed

+43
-9
lines changed

incubator/binding-pgsql-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/pgsql/kafka/config/proxy.yaml

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,27 @@ catalogs:
2020
type: test
2121
options:
2222
url: http://localhost:8081
23-
23+
id: 1
24+
schema: |-
25+
{
26+
"type": "record",
27+
"name": "cities",
28+
"namespace": "dev",
29+
"fields": [
30+
{
31+
"name": "description",
32+
"type": string
33+
},
34+
{
35+
"name": "id",
36+
"type": string
37+
},
38+
{
39+
"name": "name",
40+
"type": string
41+
}
42+
]
43+
}
2444
bindings:
2545
app0:
2646
type: pgsql-kafka

incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/schema/PgsqlKafkaValueAvroSchemaTemplate.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,7 @@ public String generateSchema(
4040
schemaBuilder.setLength(0);
4141

4242
final String newNamespace = namespace.replace(DATABASE_PLACEHOLDER, database);
43-
44-
final String recordName = String.format("%s.%s", database, createTable.getTable().getName());
43+
final String recordName = createTable.getTable().getName();
4544

4645
schemaBuilder.append("{\n");
4746
schemaBuilder.append("\"type\": \"record\",\n");

incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaProxyFactory.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package io.aklivity.zilla.runtime.binding.pgsql.kafka.internal.stream;
1616

1717
import static io.aklivity.zilla.runtime.engine.buffer.BufferPool.NO_SLOT;
18+
import static io.aklivity.zilla.runtime.engine.catalog.CatalogHandler.NO_VERSION_ID;
1819
import static java.util.Objects.requireNonNull;
1920

2021
import java.io.InputStreamReader;
@@ -1346,14 +1347,20 @@ else if (server.commandsProcessed == 1)
13461347
final PgsqlKafkaBindingConfig binding = server.binding;
13471348
final String schema = binding.avroValueSchema.generateSchema(server.database, statement);
13481349
final String subject = String.format("%s.%s-value", server.database, topic);
1349-
binding.catalog.register(subject, schema);
13501350

1351-
final String policy = binding.avroValueSchema.primaryKey(statement) != null
1352-
? "compact"
1353-
: "delete";
1351+
if (binding.catalog.register(subject, schema) != NO_VERSION_ID)
1352+
{
1353+
final String policy = binding.avroValueSchema.primaryKey(statement) != null
1354+
? "compact"
1355+
: "delete";
13541356

1355-
final KafkaCreateTopicsProxy createTopicsProxy = server.createTopicsProxy;
1356-
createTopicsProxy.doKafkaBegin(traceId, authorization, topics, policy);
1357+
final KafkaCreateTopicsProxy createTopicsProxy = server.createTopicsProxy;
1358+
createTopicsProxy.doKafkaBegin(traceId, authorization, topics, policy);
1359+
}
1360+
else
1361+
{
1362+
server.cleanup(traceId, authorization);
1363+
}
13571364
}
13581365
}
13591366

runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/catalog/TestCatalogHandler.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,14 @@ public TestCatalogHandler(
3838
this.url = options != null ? options.url : null;
3939
}
4040

41+
@Override
42+
public int register(
43+
String subject,
44+
String schema)
45+
{
46+
return this.schema.equals(schema) ? this.id : NO_VERSION_ID;
47+
}
48+
4149
@Override
4250
public int resolve(
4351
String subject,

0 commit comments

Comments
 (0)