Skip to content

smallrye-reactive-messaging-in-memory not working with KafkaTransactions #3104

@MaximGonnissen

Description

@MaximGonnissen

Hi there,

Versions for context:

  • Quarkus 3.20.1
  • smallrye-reactive-messaging-*: 4.27.0

We're facing an issue where we're attempting to test a flow which uses a KafkaTransactions object. It seems that in memory channels aren't supported(?) by KafkaTransactions, as the producer is null during the test. Debugging shows it goes through the regular connector to fetch the producer, rather than the in memory connector.

minimal repeatable KafkaTransactions example:

  @Channel("my-channel")
  KafkaTransactions<ProducerRecord<Void, MyChannelMessage>> kafkaTransactions;

minimal repeatable test setup:

@QuarkusTest
@QuarkusTestResource(value = InMemoryReactiveMessagingLifecycleManager.class, initArgs = {
    @ResourceArg(value = "outgoing", name = "my-channel")
})
class MyChannelIT {
  @Inject
  @Connector("smallrye-in-memory")
  InMemoryConnector connector;

  InMemorySink<ProducerRecord<Void, MyChannelMessage>> MyChannelMemorySink;

  @Test
  void foo() {
    MyChannelMemorySink= connector.sink("my-channel");

    // (...)

    // Assert that the message was received by the sink successfully
    assertThat(MyChannelMemorySink.received().size()).isEqualTo(1);
  }
}

InMemoryReactiveMessagingLifecycleManager class

public class InMemoryReactiveMessagingLifecycleManager implements QuarkusTestResourceLifecycleManager {

  private Map<String, String> params = new HashMap<>();

  @Override
  public void init(Map<String, String> params) {
    this.params.putAll(params);
  }

  @Override
  public Map<String, String> start() {
    Map<String, String> env = new HashMap<>();
    for (Map.Entry<String, String> connection : this.params.entrySet()) {
      switch (connection.getValue()) {
        case "incoming": env.putAll(InMemoryConnector.switchIncomingChannelsToInMemory(connection.getKey())); break;
        case "outgoing": env.putAll(InMemoryConnector.switchOutgoingChannelsToInMemory(connection.getKey())); break;
      }
    }

    return env;
  }

  @Override
  public void stop() {
    InMemoryConnector.clear();
  }
}

The error we get:

Suppressed: java.lang.NullPointerException: Cannot invoke "io.smallrye.reactive.messaging.kafka.KafkaProducer.beginTransaction()" because "this.this$0.producer" is null
		at io.smallrye.reactive.messaging.kafka.transactions.KafkaTransactionsImpl$Transaction.execute(KafkaTransactionsImpl.java:176)
		at io.smallrye.reactive.messaging.kafka.transactions.KafkaTransactionsImpl.withTransaction(KafkaTransactionsImpl.java:66)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions