-
Notifications
You must be signed in to change notification settings - Fork 189
Description
We have a use-case where we receive a message and must produce multiple output messages, and in some cases even zero messages. We thought we could do it like (note that we are in the kotlin world)
@Incoming("incoming-topic")
@Outgoing("outgoing-topic")
suspend fun consume(message: Message<MyMessage>): Multi<Message<MyMessage>>
From our understanding the method should return an empty Multi in case no message must be produced:
return Multi.createFrom().empty()
As we have a private message structure (which is in reality a protobuf-generated class), we need a custom serializer class like
class MySerializer<T : MyMessage?> : Serializer<T?> {
override fun serialize(
topic: String,
data: T?,
): ByteArray? {
return data?.toByteArray()
}
}
Wenn running our application, in the case the consumer returns Multi.createFrom().empty(), the serializer is called where the data-object is an EmptyMulti, thus causing a ClassCastException. (we also tried to use another serializer which would return null in that case, but then we have illegal null-messages in the kafka cluster).
So the question is: what are we doing wrong? How can we tell the framework that no message should be published?