Skip to content

[Host.RabbitMq] No consumer handler is triggered when routing key is set to wildcard #431

@houdinisheng

Description

@houdinisheng

Hi,

We have a system that publish messages to a topic exchange using routing keys such as created.system_a or created.system_b, and the consumer with SlimMessageBus bind a queue to the exchange using a wildcard routing key created.*.
RabbitMQ delivers the message correctly to the queue, but SlimMessageBus logs the message No message processor found for routing key created.system_a and the consumer handler is not invoked.

Here's the detail information:

Environment:

  • SlimMessageBus v3.0.0
  • SlimMessageBus.Host.RabbitMQ v3.3.2
  • .NET 8.0
  • RabbitMQ 4.1.4

Code Sample:

// SlimMessageBus Registration
builder.Services.AddSlimMessageBus(mbb =>
{
    mbb.WithProviderRabbitMQ(cfg =>
    {
        // Change to your broker URI if needed
        cfg.ConnectionString = "amqp://guest:guest@localhost:5672/";
    });

    // Producer registration: send to topic exchange with routing key like created.<system>
    mbb.Produce<CreatedMessage>(x => x
        .Exchange("test_exchange", exchangeType: RabbitMQ.Client.ExchangeType.Topic)
        .RoutingKeyProvider((m, p) => $"created.{m.SystemName.ToLower()}"));

    // Consumer registration: bind queue to exchange using wildcard created.*
    mbb.Consume<CreatedMessage>(x => x
        .Queue("created_queue", durable: true, autoDelete: false)
        .ExchangeBinding("test_exchange", routingKey: "created.*")
        .AcknowledgementMode(RabbitMqMessageAcknowledgementMode.ConfirmAfterMessageProcessingWhenNoManualConfirmMade)
        .Instances(1)
        .WithConsumer<CreatedEventHandler>());

    mbb.AddJsonSerializer();
    // Handlers live in this assembly (Program.cs)
    mbb.AddServicesFromAssembly(typeof(Program).Assembly);
});

// Message and Handler define
public record CreatedMessage(string Id, string SystemName, string Data, string EventType = "created");

public class CreatedEventHandler : IConsumer<CreatedMessage>
{
    private readonly ILogger<CreatedEventHandler> _logger;
    public CreatedEventHandler(ILogger<CreatedEventHandler> logger) => _logger = logger;

    public async Task OnHandle(CreatedMessage message, CancellationToken cancellationToken)
    {
        _logger.LogInformation("Processing Created event: {MessageId} for system: {SystemName}", message.Id, message.SystemName);
       
	    await Task.Delay(50, cancellationToken);

        _logger.LogInformation("Successfully processed Created event: {MessageId}", message.Id);
    }
}

// Minimal API for testing
app.MapPost("/publish/{systemName}", async (string systemName, IMessageBus bus, ILoggerFactory lf) =>
{
    var logger = lf.CreateLogger("publish-endpoint");
    var msg = new CreatedMessage(Guid.NewGuid().ToString(), systemName, "data");
    logger.LogInformation("Publishing Created message with routing key created.{SystemName}", systemName);
    await bus.Publish(msg);
    return Results.Accepted($"/publish/{systemName}", msg);
});

Logs:

[15:12:22 DBG] Hosting starting
[15:12:22 INF] Declaring exchange test_exchange, ExchangeType: topic, Durable: False, AutoDelete: False
[15:12:22 INF] Declaring queue created_queue, Durable: True, AutoDelete: False, Exclusive: False
[15:12:22 INF] Creating consumers for Main bus...
[15:12:22 INF] Started consumers for Main bus
[15:12:22 DBG] No listening endpoints were configured. Binding to http://localhost:5000 by default.
[15:12:22 INF] Started consumers for Main bus
[15:12:22 DBG] No listening endpoints were configured. Binding to http://localhost:5000 by default.
[15:12:22 INF] Now listening on: http://localhost:5000
[15:12:22 DBG] Loaded hosting startup assembly repro-smb-rabbitmq-wildcard
[15:12:22 INF] Application started. Press Ctrl+C to shut down.
[15:12:22 INF] Hosting environment: Production
[15:12:22 INF] Content root path: D:\projects\SideProjects\MessageBusTests\repro-smb-rabbitmq-wildcard
[15:12:22 DBG] Hosting started
[15:12:34 DBG] Connection id "0HNFLN69GA87I" received FIN.
[15:12:34 DBG] Connection id "0HNFLN69GA87I" accepted.
[15:12:34 DBG] Connection id "0HNFLN69GA87I" started.
[15:12:34 DBG] Connection id "0HNFLN69GA87J" accepted.
[15:12:34 DBG] Connection id "0HNFLN69GA87J" started.
[15:12:34 DBG] Connection id "0HNFLN69GA87I" sending FIN because: "The Socket transport's send loop completed gracefully."       
[15:12:34 DBG] Connection id "0HNFLN69GA87I" disconnecting.
[15:12:34 DBG] Connection id "0HNFLN69GA87I" stopped.
[15:12:34 INF] Request starting HTTP/1.1 POST http://localhost:5000/publish/system_a - null 0
[15:12:34 DBG] Wildcard detected, all requests with hosts will be allowed.
[15:12:34 DBG] 1 candidate(s) found for the request path '/publish/system_a'
[15:12:34 DBG] Endpoint 'HTTP: POST /publish/{systemName}' with route pattern '/publish/{systemName}' is valid for the request path '/publish/system_a'
[15:12:34 DBG] Request matched endpoint 'HTTP: POST /publish/{systemName}'
[15:12:34 INF] Executing endpoint 'HTTP: POST /publish/{systemName}'
[15:12:34 INF] Publishing Created message with routing key created.system_a
[15:12:34 DBG] Matched producer for message type CreatedMessage for dispatched message type CreatedMessage
[15:12:34 DBG] Applying default path test_exchange for message type CreatedMessage
[15:12:34 DBG] Producing message CreatedMessage { Id = 68ed8bc6-2c29-4533-84ca-350e3e9e5b97, SystemName = system_a, Data = data, EventType = created } of type CreatedMessage to path test_exchange
[15:12:34 DBG] Type CreatedMessage serialized from CreatedMessage { Id = 68ed8bc6-2c29-4533-84ca-350e3e9e5b97, SystemName = system_a, Data = data, EventType = created } to JSON {"Id":"68ed8bc6-2c29-4533-84ca-350e3e9e5b97","SystemName":"system_a","Data":"data","EventType":"created"}
[15:12:34 INF] Setting HTTP status code 202.
[15:12:34 DBG] Message arrived on queue created_queue from exchange test_exchange with delivery tag 1
[15:12:34 DBG] Exchange test_exchange - Queue created_queue: No message processor found for routing key created.system_a
[15:12:34 INF] Writing value of type 'CreatedMessage' as Json.
[15:12:34 INF] Executed endpoint 'HTTP: POST /publish/{systemName}'
[15:12:34 DBG] Connection id "0HNFLN69GA87J" completed keep alive response.
[15:12:34 INF] Request finished HTTP/1.1 POST http://localhost:5000/publish/system_a - 202 null application/json; charset=utf-8 247.9211ms
[15:14:45 DBG] Connection id "0HNFLN69GA87J" disconnecting.
[15:14:45 DBG] Connection id "0HNFLN69GA87J" stopped.
[15:14:45 DBG] Connection id "0HNFLN69GA87J" sending FIN because: "The Socket transport's send loop completed gracefully." 

Is there any suggestion about this issue?
We have multiple external systems, and some systems will process several messages of different systems, use wildcard is easy to be configured.
Many thanks.

Metadata

Metadata

Assignees

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions