Skip to content

Commit 2238855

Browse files
authored
pgsql DROP TOPIC command to KafkaDeleteTopicsBeginEx plus catalog unregister subject (#1280)
1 parent 9499bef commit 2238855

File tree

11 files changed

+312
-11
lines changed

11 files changed

+312
-11
lines changed

incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaCommandType.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
public enum PgsqlKafkaCommandType
2323
{
2424
CREATE_TOPIC_COMMAND("CREATE TOPIC".getBytes()),
25+
DROP_TOPIC_COMMAND("DROP TOPIC".getBytes()),
2526
UNKNOWN_COMMAND("UNKNOWN".getBytes());
2627

2728
private final byte[] value;

incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaCompletionCommand.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
public enum PgsqlKafkaCompletionCommand
1818
{
1919
CREATE_TOPIC_COMMAND("CREATE_TOPIC".getBytes()),
20+
DROP_TOPIC_COMMAND("DROP_TOPIC".getBytes()),
2021
UNKNOWN_COMMAND("UNKNOWN".getBytes());
2122

2223
private final byte[] value;

incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaProxyFactory.java

Lines changed: 117 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
import static io.aklivity.zilla.runtime.engine.buffer.BufferPool.NO_SLOT;
1818
import static io.aklivity.zilla.runtime.engine.catalog.CatalogHandler.NO_VERSION_ID;
19+
import static java.nio.charset.StandardCharsets.UTF_8;
1920
import static java.util.Objects.requireNonNull;
2021

2122
import java.io.InputStreamReader;
@@ -64,6 +65,7 @@
6465
import net.sf.jsqlparser.parser.CCJSqlParserManager;
6566
import net.sf.jsqlparser.statement.Statement;
6667
import net.sf.jsqlparser.statement.create.table.CreateTable;
68+
import net.sf.jsqlparser.statement.drop.Drop;
6769

6870
public final class PgsqlKafkaProxyFactory implements PgsqlKafkaStreamFactory
6971
{
@@ -143,6 +145,7 @@ public final class PgsqlKafkaProxyFactory implements PgsqlKafkaStreamFactory
143145
Object2ObjectHashMap<PgsqlKafkaCommandType, PgsqlDecoder> pgsqlDecoder =
144146
new Object2ObjectHashMap<>();
145147
pgsqlDecoder.put(PgsqlKafkaCommandType.CREATE_TOPIC_COMMAND, this::decodeCreateTopicCommand);
148+
pgsqlDecoder.put(PgsqlKafkaCommandType.DROP_TOPIC_COMMAND, this::decodeDropTopicCommand);
146149
pgsqlDecoder.put(PgsqlKafkaCommandType.UNKNOWN_COMMAND, this::decodeUnknownCommand);
147150
this.pgsqlDecoder = pgsqlDecoder;
148151
}
@@ -232,6 +235,7 @@ private final class PgsqlProxy
232235
private final String database;
233236
private final PgsqlKafkaBindingConfig binding;
234237
private final KafkaCreateTopicsProxy createTopicsProxy;
238+
private final KafkaDeleteTopicsProxy deleteTopicsProxy;
235239

236240
private final IntArrayQueue queries;
237241

@@ -281,6 +285,7 @@ private PgsqlProxy(
281285
this.queries = new IntArrayQueue();
282286

283287
this.createTopicsProxy = new KafkaCreateTopicsProxy(routedId, resolvedId, this);
288+
this.deleteTopicsProxy = new KafkaDeleteTopicsProxy(routedId, resolvedId, this);
284289
}
285290

286291
private void onAppMessage(
@@ -488,7 +493,7 @@ private void onCommandCompleted(
488493
doAppWindow(traceId, authorization);
489494
}
490495

491-
public void onKafkaCreateTopicsBegin(
496+
public void onKafkaBegin(
492497
long traceId,
493498
long authorization)
494499
{
@@ -1018,7 +1023,81 @@ protected void onKafkaBegin(
10181023

10191024
if (!errorExits)
10201025
{
1021-
delegate.onKafkaCreateTopicsBegin(traceId, authorization);
1026+
delegate.onKafkaBegin(traceId, authorization);
1027+
1028+
doKafkaWindow(traceId, authorization);
1029+
doKafkaEnd(traceId, authorization);
1030+
}
1031+
else
1032+
{
1033+
delegate.cleanup(traceId, authorization);
1034+
}
1035+
}
1036+
}
1037+
1038+
private final class KafkaDeleteTopicsProxy extends KafkaProxy
1039+
{
1040+
private KafkaDeleteTopicsProxy(
1041+
long originId,
1042+
long routedId,
1043+
PgsqlProxy delegate)
1044+
{
1045+
super(originId, routedId, delegate);
1046+
}
1047+
1048+
private void doKafkaBegin(
1049+
long traceId,
1050+
long authorization,
1051+
List<String> topics)
1052+
{
1053+
initialSeq = delegate.initialSeq;
1054+
initialAck = delegate.initialAck;
1055+
initialMax = delegate.initialMax;
1056+
state = PgsqlKafkaState.openingInitial(state);
1057+
1058+
final KafkaBeginExFW kafkaBeginEx =
1059+
kafkaBeginExRW.wrap(extBuffer, 0, extBuffer.capacity())
1060+
.typeId(kafkaTypeId)
1061+
.request(r -> r
1062+
.deleteTopics(c -> c
1063+
.names(ct ->
1064+
topics.forEach(t -> ct.item(i -> i.set(t, UTF_8))))
1065+
.timeout(config.kafkaTopicRequestTimeoutMs())))
1066+
.build();
1067+
1068+
kafka = newKafkaConsumer(this::onKafkaMessage, originId, routedId, initialId, initialSeq, initialAck, initialMax,
1069+
traceId, authorization, 0, kafkaBeginEx);
1070+
}
1071+
1072+
@Override
1073+
protected void onKafkaBegin(
1074+
BeginFW begin)
1075+
{
1076+
final long sequence = begin.sequence();
1077+
final long acknowledge = begin.acknowledge();
1078+
final long traceId = begin.traceId();
1079+
final long authorization = begin.authorization();
1080+
final OctetsFW extension = begin.extension();
1081+
1082+
assert acknowledge <= sequence;
1083+
assert sequence >= replySeq;
1084+
assert acknowledge >= replyAck;
1085+
1086+
replySeq = sequence;
1087+
replyAck = acknowledge;
1088+
state = PgsqlKafkaState.openingReply(state);
1089+
1090+
assert replyAck <= replySeq;
1091+
1092+
final ExtensionFW beginEx = extension.get(extensionRO::tryWrap);
1093+
final KafkaBeginExFW kafkaBeginEx =
1094+
beginEx != null && beginEx.typeId() == kafkaTypeId ? extension.get(kafkaBeginExRO::tryWrap) : null;
1095+
1096+
boolean errorExits = kafkaBeginEx.response().deleteTopics().topics().anyMatch(t -> t.error() != 0);
1097+
1098+
if (!errorExits)
1099+
{
1100+
delegate.onKafkaBegin(traceId, authorization);
10221101

10231102
doKafkaWindow(traceId, authorization);
10241103
doKafkaEnd(traceId, authorization);
@@ -1292,6 +1371,35 @@ else if (server.commandsProcessed == 0)
12921371
}
12931372
}
12941373

1374+
private void decodeDropTopicCommand(
1375+
PgsqlProxy server,
1376+
long traceId,
1377+
long authorization,
1378+
DirectBuffer buffer,
1379+
int offset,
1380+
int length)
1381+
{
1382+
if (server.commandsProcessed == 1)
1383+
{
1384+
server.onCommandCompleted(traceId, authorization, length, PgsqlKafkaCompletionCommand.DROP_TOPIC_COMMAND);
1385+
}
1386+
else if (server.commandsProcessed == 0)
1387+
{
1388+
final Drop drop = (Drop) parseStatement(buffer, offset, length);
1389+
final String topic = drop.getName().getName();
1390+
1391+
final PgsqlKafkaBindingConfig binding = server.binding;
1392+
final String subjectKey = String.format("%s.%s-key", server.database, topic);
1393+
final String subjectValue = String.format("%s.%s-value", server.database, topic);
1394+
1395+
binding.catalog.unregister(subjectKey);
1396+
binding.catalog.unregister(subjectValue);
1397+
1398+
final KafkaDeleteTopicsProxy deleteTopicsProxy = server.deleteTopicsProxy;
1399+
deleteTopicsProxy.doKafkaBegin(traceId, authorization, List.of("%s.%s".formatted(server.database, topic)));
1400+
}
1401+
}
1402+
12951403
private void decodeUnknownCommand(
12961404
PgsqlProxy server,
12971405
long traceId,
@@ -1351,6 +1459,13 @@ private Statement parseStatement(
13511459
sql = sql.replace("CREATE TOPIC", "CREATE TABLE");
13521460
statement = parserManager.parse(new StringReader(sql));
13531461
}
1462+
if (decodeCommandType(buffer, offset, length).
1463+
equals(PgsqlKafkaCommandType.DROP_TOPIC_COMMAND))
1464+
{
1465+
String sql = buffer.getStringWithoutLengthUtf8(offset, length);
1466+
sql = sql.replace("DROP TOPIC", "DROP TABLE");
1467+
statement = parserManager.parse(new StringReader(sql));
1468+
}
13541469
else
13551470
{
13561471
inputStream.wrap(buffer, offset, length);

incubator/binding-pgsql-kafka/src/test/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/ProxyIT.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,4 +58,15 @@ public void shouldCreateTopic() throws Exception
5858
{
5959
k3po.finish();
6060
}
61+
62+
@Test
63+
@Configuration("proxy.yaml")
64+
@Specification({
65+
"${pgsql}/drop.topic/client",
66+
"${kafka}/drop.topic/server"
67+
})
68+
public void shouldDropTopic() throws Exception
69+
{
70+
k3po.finish();
71+
}
6172
}

runtime/catalog-schema-registry/src/main/java/io/aklivity/zilla/runtime/catalog/schema/registry/internal/handler/SchemaRegistryCatalogHandler.java

Lines changed: 50 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package io.aklivity.zilla.runtime.catalog.schema.registry.internal.handler;
1616

1717
import static io.aklivity.zilla.runtime.catalog.schema.registry.internal.handler.CachedSchemaId.IN_PROGRESS;
18+
import static io.aklivity.zilla.runtime.catalog.schema.registry.internal.serializer.UnregisterSchemaRequest.NO_VERSIONS;
1819

1920
import java.net.URI;
2021
import java.net.http.HttpClient;
@@ -35,14 +36,16 @@
3536
import io.aklivity.zilla.runtime.catalog.schema.registry.internal.config.SchemaRegistryCatalogConfig;
3637
import io.aklivity.zilla.runtime.catalog.schema.registry.internal.events.SchemaRegistryEventContext;
3738
import io.aklivity.zilla.runtime.catalog.schema.registry.internal.serializer.RegisterSchemaRequest;
39+
import io.aklivity.zilla.runtime.catalog.schema.registry.internal.serializer.UnregisterSchemaRequest;
3840
import io.aklivity.zilla.runtime.catalog.schema.registry.internal.types.SchemaRegistryPrefixFW;
3941
import io.aklivity.zilla.runtime.engine.catalog.CatalogHandler;
4042
import io.aklivity.zilla.runtime.engine.model.function.ValueConsumer;
4143

4244
public class SchemaRegistryCatalogHandler implements CatalogHandler
4345
{
4446
private static final String SUBJECT_VERSION_PATH = "/subjects/{0}/versions/{1}";
45-
private static final String SUBJECT_PATH = "/subjects/{0}/versions";
47+
private static final String REGISTER_SUBJECT_PATH = "/subjects/{0}/versions";
48+
private static final String UNREGISTER_SUBJECT_PATH = "/subjects/{0}";
4649
private static final String SCHEMA_PATH = "/schemas/ids/{0}";
4750

4851
private static final int MAX_PADDING_LENGTH = 5;
@@ -55,7 +58,8 @@ public class SchemaRegistryCatalogHandler implements CatalogHandler
5558

5659
private final HttpClient client;
5760
private final String baseUrl;
58-
private final RegisterSchemaRequest request;
61+
private final RegisterSchemaRequest registerRequest;
62+
private final UnregisterSchemaRequest unregisterRequest;
5963
private final CRC32C crc32c;
6064
private final Int2ObjectCache<String> schemas;
6165
private final Int2ObjectCache<CachedSchemaId> schemaIds;
@@ -70,7 +74,8 @@ public SchemaRegistryCatalogHandler(
7074
{
7175
this.baseUrl = catalog.options.url;
7276
this.client = HttpClient.newHttpClient();
73-
this.request = new RegisterSchemaRequest();
77+
this.registerRequest = new RegisterSchemaRequest();
78+
this.unregisterRequest = new UnregisterSchemaRequest();
7479
this.crc32c = new CRC32C();
7580
this.schemas = new Int2ObjectCache<>(1, 1024, i -> {});
7681
this.schemaIds = new Int2ObjectCache<>(1, 1024, i -> {});
@@ -88,15 +93,30 @@ public int register(
8893
{
8994
int versionId = NO_VERSION_ID;
9095

91-
String response = sendPostHttpRequest(MessageFormat.format(SUBJECT_PATH, subject), schema);
96+
String response = sendPostHttpRequest(MessageFormat.format(REGISTER_SUBJECT_PATH, subject), schema);
9297
if (response != null)
9398
{
94-
versionId = request.resolveResponse(response);
99+
versionId = registerRequest.resolveResponse(response);
95100
}
96101

97102
return versionId;
98103
}
99104

105+
@Override
106+
public int[] unregister(
107+
String subject)
108+
{
109+
int[] versions = NO_VERSIONS;
110+
111+
String response = sendDeleteHttpRequest(MessageFormat.format(UNREGISTER_SUBJECT_PATH, subject));
112+
if (response != null)
113+
{
114+
versions = unregisterRequest.resolveResponse(response);
115+
}
116+
117+
return versions;
118+
}
119+
100120
@Override
101121
public String resolve(
102122
int schemaId)
@@ -149,7 +169,7 @@ public String resolve(
149169
{
150170
event.onRetrievableSchemaId(catalogId, schemaId);
151171
}
152-
newFuture.complete(new CachedSchema(request.resolveSchemaResponse(response), retryAttempts));
172+
newFuture.complete(new CachedSchema(registerRequest.resolveSchemaResponse(response), retryAttempts));
153173
}
154174
}
155175
catch (Throwable ex)
@@ -251,8 +271,8 @@ else if (response != null)
251271
{
252272
event.onRetrievableSchemaSubjectVersion(catalogId, subject, version);
253273
}
254-
newFuture.complete(new CachedSchemaId(System.currentTimeMillis(), request.resolveResponse(response),
255-
retryAttempts, retryAfter));
274+
newFuture.complete(new CachedSchemaId(System.currentTimeMillis(),
275+
registerRequest.resolveResponse(response), retryAttempts, retryAfter));
256276
}
257277
}
258278
catch (Throwable ex)
@@ -390,6 +410,28 @@ private String sendPostHttpRequest(
390410
return responseBody;
391411
}
392412

413+
private String sendDeleteHttpRequest(
414+
String path)
415+
{
416+
HttpRequest httpRequest = HttpRequest
417+
.newBuilder(toURI(baseUrl, path))
418+
.version(HttpClient.Version.HTTP_1_1)
419+
.DELETE()
420+
.build();
421+
422+
String responseBody;
423+
try
424+
{
425+
HttpResponse<String> httpResponse = client.send(httpRequest, HttpResponse.BodyHandlers.ofString());
426+
responseBody = httpResponse.statusCode() == 200 ? httpResponse.body() : null;
427+
}
428+
catch (Exception ex)
429+
{
430+
responseBody = null;
431+
}
432+
return responseBody;
433+
}
434+
393435
@Override
394436
public String location()
395437
{
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright 2021-2023 Aklivity Inc
3+
*
4+
* Licensed under the Aklivity Community License (the "License"); you may not use
5+
* this file except in compliance with the License. You may obtain a copy of the
6+
* License at
7+
*
8+
* https://www.aklivity.io/aklivity-community-license/
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
13+
* specific language governing permissions and limitations under the License.
14+
*/
15+
package io.aklivity.zilla.runtime.catalog.schema.registry.internal.serializer;
16+
17+
import java.io.StringReader;
18+
import java.util.stream.IntStream;
19+
20+
import jakarta.json.Json;
21+
import jakarta.json.JsonArray;
22+
import jakarta.json.JsonReader;
23+
import jakarta.json.stream.JsonParsingException;
24+
25+
public class UnregisterSchemaRequest
26+
{
27+
public static final int[] NO_VERSIONS = new int[0];
28+
29+
public int[] resolveResponse(
30+
String response)
31+
{
32+
try
33+
{
34+
JsonReader reader = Json.createReader(new StringReader(response));
35+
JsonArray array = reader.readArray();
36+
37+
return IntStream.range(0, array.size())
38+
.map(array::getInt)
39+
.toArray();
40+
}
41+
catch (JsonParsingException ex)
42+
{
43+
return NO_VERSIONS;
44+
}
45+
}
46+
}

0 commit comments

Comments
 (0)