Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
public enum PgsqlKafkaCommandType
{
CREATE_TOPIC_COMMAND("CREATE TOPIC".getBytes()),
DROP_TOPIC_COMMAND("DROP TOPIC".getBytes()),
UNKNOWN_COMMAND("UNKNOWN".getBytes());

private final byte[] value;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
public enum PgsqlKafkaCompletionCommand
{
CREATE_TOPIC_COMMAND("CREATE_TOPIC".getBytes()),
DROP_TOPIC_COMMAND("DROP_TOPIC".getBytes()),
UNKNOWN_COMMAND("UNKNOWN".getBytes());

private final byte[] value;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import static io.aklivity.zilla.runtime.engine.buffer.BufferPool.NO_SLOT;
import static io.aklivity.zilla.runtime.engine.catalog.CatalogHandler.NO_VERSION_ID;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Objects.requireNonNull;

import java.io.InputStreamReader;
Expand Down Expand Up @@ -64,6 +65,7 @@
import net.sf.jsqlparser.parser.CCJSqlParserManager;
import net.sf.jsqlparser.statement.Statement;
import net.sf.jsqlparser.statement.create.table.CreateTable;
import net.sf.jsqlparser.statement.drop.Drop;

public final class PgsqlKafkaProxyFactory implements PgsqlKafkaStreamFactory
{
Expand Down Expand Up @@ -143,6 +145,7 @@ public final class PgsqlKafkaProxyFactory implements PgsqlKafkaStreamFactory
Object2ObjectHashMap<PgsqlKafkaCommandType, PgsqlDecoder> pgsqlDecoder =
new Object2ObjectHashMap<>();
pgsqlDecoder.put(PgsqlKafkaCommandType.CREATE_TOPIC_COMMAND, this::decodeCreateTopicCommand);
pgsqlDecoder.put(PgsqlKafkaCommandType.DROP_TOPIC_COMMAND, this::decodeDropTopicCommand);
pgsqlDecoder.put(PgsqlKafkaCommandType.UNKNOWN_COMMAND, this::decodeUnknownCommand);
this.pgsqlDecoder = pgsqlDecoder;
}
Expand Down Expand Up @@ -232,6 +235,7 @@ private final class PgsqlProxy
private final String database;
private final PgsqlKafkaBindingConfig binding;
private final KafkaCreateTopicsProxy createTopicsProxy;
private final KafkaDeleteTopicsProxy deleteTopicsProxy;

private final IntArrayQueue queries;

Expand Down Expand Up @@ -281,6 +285,7 @@ private PgsqlProxy(
this.queries = new IntArrayQueue();

this.createTopicsProxy = new KafkaCreateTopicsProxy(routedId, resolvedId, this);
this.deleteTopicsProxy = new KafkaDeleteTopicsProxy(routedId, resolvedId, this);
}

private void onAppMessage(
Expand Down Expand Up @@ -488,7 +493,7 @@ private void onCommandCompleted(
doAppWindow(traceId, authorization);
}

public void onKafkaCreateTopicsBegin(
public void onKafkaBegin(
long traceId,
long authorization)
{
Expand Down Expand Up @@ -1018,7 +1023,81 @@ protected void onKafkaBegin(

if (!errorExits)
{
delegate.onKafkaCreateTopicsBegin(traceId, authorization);
delegate.onKafkaBegin(traceId, authorization);

doKafkaWindow(traceId, authorization);
doKafkaEnd(traceId, authorization);
}
else
{
delegate.cleanup(traceId, authorization);
}
}
}

private final class KafkaDeleteTopicsProxy extends KafkaProxy
{
private KafkaDeleteTopicsProxy(
long originId,
long routedId,
PgsqlProxy delegate)
{
super(originId, routedId, delegate);
}

private void doKafkaBegin(
long traceId,
long authorization,
List<String> topics)
{
initialSeq = delegate.initialSeq;
initialAck = delegate.initialAck;
initialMax = delegate.initialMax;
state = PgsqlKafkaState.openingInitial(state);

final KafkaBeginExFW kafkaBeginEx =
kafkaBeginExRW.wrap(extBuffer, 0, extBuffer.capacity())
.typeId(kafkaTypeId)
.request(r -> r
.deleteTopics(c -> c
.names(ct ->
topics.forEach(t -> ct.item(i -> i.set(t, UTF_8))))
.timeout(config.kafkaTopicRequestTimeoutMs())))
.build();

kafka = newKafkaConsumer(this::onKafkaMessage, originId, routedId, initialId, initialSeq, initialAck, initialMax,
traceId, authorization, 0, kafkaBeginEx);
}

@Override
protected void onKafkaBegin(
BeginFW begin)
{
final long sequence = begin.sequence();
final long acknowledge = begin.acknowledge();
final long traceId = begin.traceId();
final long authorization = begin.authorization();
final OctetsFW extension = begin.extension();

assert acknowledge <= sequence;
assert sequence >= replySeq;
assert acknowledge >= replyAck;

replySeq = sequence;
replyAck = acknowledge;
state = PgsqlKafkaState.openingReply(state);

assert replyAck <= replySeq;

final ExtensionFW beginEx = extension.get(extensionRO::tryWrap);
final KafkaBeginExFW kafkaBeginEx =
beginEx != null && beginEx.typeId() == kafkaTypeId ? extension.get(kafkaBeginExRO::tryWrap) : null;

boolean errorExits = kafkaBeginEx.response().deleteTopics().topics().anyMatch(t -> t.error() != 0);

if (!errorExits)
{
delegate.onKafkaBegin(traceId, authorization);

doKafkaWindow(traceId, authorization);
doKafkaEnd(traceId, authorization);
Expand Down Expand Up @@ -1292,6 +1371,35 @@ else if (server.commandsProcessed == 0)
}
}

private void decodeDropTopicCommand(
PgsqlProxy server,
long traceId,
long authorization,
DirectBuffer buffer,
int offset,
int length)
{
if (server.commandsProcessed == 1)
{
server.onCommandCompleted(traceId, authorization, length, PgsqlKafkaCompletionCommand.DROP_TOPIC_COMMAND);
}
else if (server.commandsProcessed == 0)
{
final Drop drop = (Drop) parseStatement(buffer, offset, length);
final String topic = drop.getName().getName();

final PgsqlKafkaBindingConfig binding = server.binding;
final String subjectKey = String.format("%s.%s-key", server.database, topic);
final String subjectValue = String.format("%s.%s-value", server.database, topic);

binding.catalog.unregister(subjectKey);
binding.catalog.unregister(subjectValue);

final KafkaDeleteTopicsProxy deleteTopicsProxy = server.deleteTopicsProxy;
deleteTopicsProxy.doKafkaBegin(traceId, authorization, List.of("%s.%s".formatted(server.database, topic)));
}
}

private void decodeUnknownCommand(
PgsqlProxy server,
long traceId,
Expand Down Expand Up @@ -1351,6 +1459,13 @@ private Statement parseStatement(
sql = sql.replace("CREATE TOPIC", "CREATE TABLE");
statement = parserManager.parse(new StringReader(sql));
}
if (decodeCommandType(buffer, offset, length).
equals(PgsqlKafkaCommandType.DROP_TOPIC_COMMAND))
{
String sql = buffer.getStringWithoutLengthUtf8(offset, length);
sql = sql.replace("DROP TOPIC", "DROP TABLE");
statement = parserManager.parse(new StringReader(sql));
}
else
{
inputStream.wrap(buffer, offset, length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,15 @@ public void shouldCreateTopic() throws Exception
{
k3po.finish();
}

@Test
@Configuration("proxy.yaml")
@Specification({
"${pgsql}/drop.topic/client",
"${kafka}/drop.topic/server"
})
public void shouldDropTopic() throws Exception
{
k3po.finish();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package io.aklivity.zilla.runtime.catalog.schema.registry.internal.handler;

import static io.aklivity.zilla.runtime.catalog.schema.registry.internal.handler.CachedSchemaId.IN_PROGRESS;
import static io.aklivity.zilla.runtime.catalog.schema.registry.internal.serializer.UnregisterSchemaRequest.NO_VERSIONS;

import java.net.URI;
import java.net.http.HttpClient;
Expand All @@ -35,14 +36,16 @@
import io.aklivity.zilla.runtime.catalog.schema.registry.internal.config.SchemaRegistryCatalogConfig;
import io.aklivity.zilla.runtime.catalog.schema.registry.internal.events.SchemaRegistryEventContext;
import io.aklivity.zilla.runtime.catalog.schema.registry.internal.serializer.RegisterSchemaRequest;
import io.aklivity.zilla.runtime.catalog.schema.registry.internal.serializer.UnregisterSchemaRequest;
import io.aklivity.zilla.runtime.catalog.schema.registry.internal.types.SchemaRegistryPrefixFW;
import io.aklivity.zilla.runtime.engine.catalog.CatalogHandler;
import io.aklivity.zilla.runtime.engine.model.function.ValueConsumer;

public class SchemaRegistryCatalogHandler implements CatalogHandler
{
private static final String SUBJECT_VERSION_PATH = "/subjects/{0}/versions/{1}";
private static final String SUBJECT_PATH = "/subjects/{0}/versions";
private static final String REGISTER_SUBJECT_PATH = "/subjects/{0}/versions";
private static final String UNREGISTER_SUBJECT_PATH = "/subjects/{0}";
private static final String SCHEMA_PATH = "/schemas/ids/{0}";

private static final int MAX_PADDING_LENGTH = 5;
Expand All @@ -55,7 +58,8 @@ public class SchemaRegistryCatalogHandler implements CatalogHandler

private final HttpClient client;
private final String baseUrl;
private final RegisterSchemaRequest request;
private final RegisterSchemaRequest registerRequest;
private final UnregisterSchemaRequest unregisterRequest;
private final CRC32C crc32c;
private final Int2ObjectCache<String> schemas;
private final Int2ObjectCache<CachedSchemaId> schemaIds;
Expand All @@ -70,7 +74,8 @@ public SchemaRegistryCatalogHandler(
{
this.baseUrl = catalog.options.url;
this.client = HttpClient.newHttpClient();
this.request = new RegisterSchemaRequest();
this.registerRequest = new RegisterSchemaRequest();
this.unregisterRequest = new UnregisterSchemaRequest();
this.crc32c = new CRC32C();
this.schemas = new Int2ObjectCache<>(1, 1024, i -> {});
this.schemaIds = new Int2ObjectCache<>(1, 1024, i -> {});
Expand All @@ -88,15 +93,30 @@ public int register(
{
int versionId = NO_VERSION_ID;

String response = sendPostHttpRequest(MessageFormat.format(SUBJECT_PATH, subject), schema);
String response = sendPostHttpRequest(MessageFormat.format(REGISTER_SUBJECT_PATH, subject), schema);
if (response != null)
{
versionId = request.resolveResponse(response);
versionId = registerRequest.resolveResponse(response);
}

return versionId;
}

@Override
public int[] unregister(
String subject)
{
int[] versions = NO_VERSIONS;

String response = sendDeleteHttpRequest(MessageFormat.format(UNREGISTER_SUBJECT_PATH, subject));
if (response != null)
{
versions = unregisterRequest.resolveResponse(response);
}

return versions;
}

@Override
public String resolve(
int schemaId)
Expand Down Expand Up @@ -149,7 +169,7 @@ public String resolve(
{
event.onRetrievableSchemaId(catalogId, schemaId);
}
newFuture.complete(new CachedSchema(request.resolveSchemaResponse(response), retryAttempts));
newFuture.complete(new CachedSchema(registerRequest.resolveSchemaResponse(response), retryAttempts));
}
}
catch (Throwable ex)
Expand Down Expand Up @@ -251,8 +271,8 @@ else if (response != null)
{
event.onRetrievableSchemaSubjectVersion(catalogId, subject, version);
}
newFuture.complete(new CachedSchemaId(System.currentTimeMillis(), request.resolveResponse(response),
retryAttempts, retryAfter));
newFuture.complete(new CachedSchemaId(System.currentTimeMillis(),
registerRequest.resolveResponse(response), retryAttempts, retryAfter));
}
}
catch (Throwable ex)
Expand Down Expand Up @@ -390,6 +410,28 @@ private String sendPostHttpRequest(
return responseBody;
}

private String sendDeleteHttpRequest(
String path)
{
HttpRequest httpRequest = HttpRequest
.newBuilder(toURI(baseUrl, path))
.version(HttpClient.Version.HTTP_1_1)
.DELETE()
.build();

String responseBody;
try
{
HttpResponse<String> httpResponse = client.send(httpRequest, HttpResponse.BodyHandlers.ofString());
responseBody = httpResponse.statusCode() == 200 ? httpResponse.body() : null;
}
catch (Exception ex)
{
responseBody = null;
}
return responseBody;
}

@Override
public String location()
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2021-2023 Aklivity Inc
*
* Licensed under the Aklivity Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* https://www.aklivity.io/aklivity-community-license/
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package io.aklivity.zilla.runtime.catalog.schema.registry.internal.serializer;

import java.io.StringReader;
import java.util.stream.IntStream;

import jakarta.json.Json;
import jakarta.json.JsonArray;
import jakarta.json.JsonReader;
import jakarta.json.stream.JsonParsingException;

public class UnregisterSchemaRequest
{
public static final int[] NO_VERSIONS = new int[0];

public int[] resolveResponse(
String response)
{
try
{
JsonReader reader = Json.createReader(new StringReader(response));
JsonArray array = reader.readArray();

return IntStream.range(0, array.size())
.map(array::getInt)
.toArray();
}
catch (JsonParsingException ex)
{
return NO_VERSIONS;
}
}
}
Loading