Skip to content

Conversation

@akrambek
Copy link
Contributor

Description

Support pgsql-kafka binding.

Fixes #1058

jfallows
jfallows previously approved these changes Sep 12, 2024
@akrambek akrambek marked this pull request as ready for review September 18, 2024 19:16
<configuration>
<excludes>
<exclude>io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/types/**/*.class</exclude>
<exclude>net/sf/jsqlparser/parser/*</exclude>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed? Does jsqlparser generate some code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No but it was causing some IlligalClassFormatException in jacoco

java.lang.instrument.IllegalClassFormatException: Error while instrumenting net/sf/jsqlparser/parser/CCJSqlParserTokenManager with JaCoCo 0.8.11.202310140853/f33756c.
	at org.jacoco.agent.rt.internal_4742761.CoverageTransformer.transform(CoverageTransformer.java:94)
	at java.instrument/java.lang.instrument.ClassFileTransformer.transform(ClassFileTransformer.java:244)
	at java.instrument/sun.instrument.TransformerManager.transform(TransformerManager.java:188)
	at java.instrument/sun.instrument.InstrumentationImpl.transform(InstrumentationImpl.java:541)
	at java.base/java.lang.ClassLoader.defineClass1(Native Method)
	at java.base/java.lang.ClassLoader.defineClass(ClassLoader.java:1013)
	at java.base/java.security.SecureClassLoader.defineClass(SecureClassLoader.java:150)
	at java.base/jdk.internal.loader.BuiltinClassLoader.defineClass(BuiltinClassLoader.java:862)
	at java.base/jdk.internal.loader.BuiltinClassLoader.findClassOnClassPathOrNull(BuiltinClassLoader.java:760)
	at java.base/jdk.internal.loader.BuiltinClassLoader.loadClassOrNull(BuiltinClassLoader.java:681)
	at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:639)
	at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
	at net.sf.jsqlparser.parser.CCJSqlParser.<init>(CCJSqlParser.java:42028)
	at net.sf.jsqlparser.parser.CCJSqlParserManager.parse(CCJSqlParserManager.java:21)

this.kind = binding.kind;
this.routes = binding.routes.stream().map(PgsqlKafkaRouteConfig::new).collect(toList());

this.catalog = supplyCatalog.apply(binding.catalogs.get(0).id);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is catalog is omitted?
Perhaps we can make it required in the schema patch?

KAFKA_TOPIC_REQUEST_TIMEOUT_MS = config.property("kafka.topic.request.timeout.ms", 30000);
KAFKA_CREATE_TOPICS_PARTITION_COUNT = config.property("kafka.create.topics.partition.count", 1);
KAFKA_CREATE_TOPICS_REPLICAS = config.property("kafka.create.topics.replicas", (short) 1);
KAFKA_AVRO_SCHEMA_NAMESPACE = config.property("kafka.avro.schema.namespace", "io.aklivity.zilla");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem like the right value for the avro namespace as it would more naturally be part of the application, not part of zilla directly.

Seems like it should perhaps default to the pgsql database parameter instead, like dev and perhaps this can be a pattern that would substitute the pgsql database parameter into it, so we could configure it with something like "com.example.{database}" and have it resolve to "com.example.dev". Then if there is no reference to {database} in the pattern, it would be used verbatim as it is now.

Copy link
Contributor Author

@akrambek akrambek Sep 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, good suggestion. I was not sure what you will recommend since com.example was too generic

Comment on lines 41 to 51
command:
for (PgsqlKafkaCommandType commandType : PgsqlKafkaCommandType.values())
{
if (Arrays.equals(commandType.value, value))
{
command = commandType;
break command;
}
}

return command;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each call to PgsqlKafkaCommandType.values() reallocates the array of enums.

Perhaps we can use a static { ... } initializer to create a static map of byte[] to PgsqlKafkaCommandType and then use that map here to lookup the byte[]?

return receiver;
}

private void onDecodeCreateTopicCommand(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private void onDecodeCreateTopicCommand(
private void decodeCreateTopicCommand(

}
}

private void onDecodeUnknownCommand(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private void onDecodeUnknownCommand(
private void decodeUnknownCommand(

jfallows
jfallows previously approved these changes Sep 18, 2024
@jfallows jfallows merged commit b032c4d into aklivity:develop Sep 18, 2024
5 checks passed
@jfallows jfallows mentioned this pull request Sep 17, 2024
3 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants