Skip to content

Commit 3d62660

Browse files
committed
Allow blocking/synchronous delivery to consumers
It simplifies some testing scenarios because you don't need infrastructure and boilerplate for message waits.
1 parent 4241620 commit 3d62660

File tree

4 files changed

+247
-59
lines changed

4 files changed

+247
-59
lines changed

src/AddUp.FakeRabbitMQ.Tests/FakeModelBasicTests.cs

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
using System.Linq;
44
using System.Text;
55
using System.Threading;
6+
using FluentAssertions;
67
using RabbitMQ.Client;
78
using RabbitMQ.Client.Events;
89
using RabbitMQ.Client.Exceptions;
@@ -425,6 +426,64 @@ public void BasicPublish_after_BasicConsume_does_not_deadlock()
425426
}
426427
}
427428

429+
[Fact]
430+
public void BasicPublish_after_BasicConsume_with_BlockingDelivery_is_synchronous()
431+
{
432+
var server = new RabbitServer { BlockingConsumerDelivery = true };
433+
using var model = new FakeModel(server);
434+
435+
model.ExchangeDeclare("my_exchange", ExchangeType.Direct);
436+
model.QueueDeclare("my_queue");
437+
model.ExchangeBind("my_queue", "my_exchange", null);
438+
439+
var trackedValue = new AsyncLocal<string>
440+
{
441+
Value = "initial"
442+
};
443+
444+
var consumer = new EventingBasicConsumer(model);
445+
consumer.Received += (_, _) =>
446+
{
447+
trackedValue.Value = "from_consumer";
448+
};
449+
450+
model.BasicConsume("my_queue", false, consumer);
451+
452+
var encodedMessage = Encoding.ASCII.GetBytes("hello world!");
453+
model.BasicPublish("my_exchange", null, model.CreateBasicProperties(), encodedMessage);
454+
455+
trackedValue.Value.Should().Be("from_consumer");
456+
}
457+
458+
[Fact]
459+
public void BasicPublish_before_BasicConsume_with_BlockingDelivery_is_synchronous()
460+
{
461+
var server = new RabbitServer { BlockingConsumerDelivery = true };
462+
using var model = new FakeModel(server);
463+
464+
model.ExchangeDeclare("my_exchange", ExchangeType.Direct);
465+
model.QueueDeclare("my_queue");
466+
model.ExchangeBind("my_queue", "my_exchange", null);
467+
468+
var encodedMessage = Encoding.ASCII.GetBytes("hello world!");
469+
model.BasicPublish("my_exchange", null, model.CreateBasicProperties(), encodedMessage);
470+
471+
var trackedValue = new AsyncLocal<string>
472+
{
473+
Value = "initial"
474+
};
475+
476+
var consumer = new EventingBasicConsumer(model);
477+
consumer.Received += (_, _) =>
478+
{
479+
trackedValue.Value = "from_consumer";
480+
};
481+
482+
model.BasicConsume("my_queue", false, consumer);
483+
484+
trackedValue.Value.Should().Be("from_consumer");
485+
}
486+
428487
[Fact]
429488
public void BasicPublishBatch_publishes_messages()
430489
{
Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
using System.Threading.Channels;
2+
using System;
3+
using System.Threading.Tasks;
4+
using System.Threading;
5+
using RabbitMQ.Client.Events;
6+
7+
namespace AddUp.RabbitMQ.Fakes
8+
{
9+
internal abstract class ConsumerDeliveryQueue
10+
{
11+
protected readonly FakeModel model;
12+
private readonly Action<CallbackExceptionEventArgs> onDeliveryException;
13+
14+
protected ConsumerDeliveryQueue(FakeModel model, Action<CallbackExceptionEventArgs> onDeliveryException)
15+
{
16+
this.model = model;
17+
this.onDeliveryException = onDeliveryException;
18+
}
19+
20+
public abstract void Deliver(Action deliveryAction);
21+
22+
protected void ExecuteDelivery(Action deliveryAction)
23+
{
24+
try
25+
{
26+
if (!model.IsOpen) return;
27+
deliveryAction();
28+
}
29+
catch (Exception ex)
30+
{
31+
var callbackArgs = CallbackExceptionEventArgs.Build(ex, "");
32+
onDeliveryException(callbackArgs);
33+
}
34+
}
35+
36+
/// <summary>
37+
/// Marks the queue as complete, meaning no more new deliveries will be accepted.
38+
/// </summary>
39+
public abstract void Complete();
40+
41+
/// <summary>
42+
/// Wait for any remaining queues deliveries to finish.
43+
/// </summary>
44+
public abstract void WaitForCompletion();
45+
46+
public static ConsumerDeliveryQueue Create(FakeModel model, bool blockingDelivery, Action<CallbackExceptionEventArgs> onDeliveryException)
47+
{
48+
return blockingDelivery
49+
? (ConsumerDeliveryQueue) new BlockingDeliveryQueue(model, onDeliveryException)
50+
: new NonBlockingDeliveryQueue(model, onDeliveryException);
51+
}
52+
}
53+
54+
internal class NonBlockingDeliveryQueue : ConsumerDeliveryQueue
55+
{
56+
private readonly Task deliveriesTask;
57+
private readonly AsyncLocal<bool> isDeliveriesTask = new AsyncLocal<bool>();
58+
private readonly Channel<Action> deliveries = Channel.CreateUnbounded<Action>(new UnboundedChannelOptions
59+
{
60+
SingleReader = true,
61+
SingleWriter = false,
62+
});
63+
64+
public NonBlockingDeliveryQueue(FakeModel model, Action<CallbackExceptionEventArgs> onDeliveryException)
65+
: base(model, onDeliveryException)
66+
{
67+
deliveriesTask = Task.Run(HandleDeliveries);
68+
}
69+
70+
71+
public override void Deliver(Action deliveryAction)
72+
{
73+
_ = deliveries.Writer.TryWrite(deliveryAction);
74+
}
75+
76+
/// <summary>
77+
/// Rabbit docs states that each connection is backed by a single background thread:
78+
///
79+
/// https://www.rabbitmq.com/dotnet-api-guide.html#concurrency-thread-usage
80+
///
81+
/// However, this is not actually true, it's backed by a Task:
82+
///
83+
/// https://github.com/rabbitmq/rabbitmq-dotnet-client/blob/65dd5f92dda130ec35b4ad6fe7bc54dbcb1637fd/projects/RabbitMQ.Client/client/impl/ConsumerWorkService.cs#L81
84+
///
85+
/// FakeModels aren't aware of their connection, so in order to emulate this, just
86+
/// run a task that handles deliveries per task. It's necessary to match RabbitMQ
87+
/// semantics as running delivery callbacks synchronously can cause deadlocks in
88+
/// code under test.
89+
/// </summary>
90+
private async Task HandleDeliveries()
91+
{
92+
try
93+
{
94+
isDeliveriesTask.Value = true;
95+
while (await deliveries.Reader.WaitToReadAsync().ConfigureAwait(false))
96+
{
97+
while (deliveries.Reader.TryRead(out var delivery))
98+
{
99+
ExecuteDelivery(delivery);
100+
}
101+
}
102+
}
103+
catch
104+
{
105+
// Swallow exceptions so FakeModel.Close() doesn't have to deal with it.
106+
}
107+
}
108+
109+
public override void Complete()
110+
{
111+
_ = deliveries.Writer.TryComplete();
112+
}
113+
114+
public override void WaitForCompletion()
115+
{
116+
// It's possible that we can end up calling Close on a model from within the delivery handler.
117+
// If this is the case, we must not wait on it to complete as this will deadlock!
118+
if (!isDeliveriesTask.Value)
119+
deliveriesTask.Wait();
120+
}
121+
}
122+
123+
internal class BlockingDeliveryQueue : ConsumerDeliveryQueue
124+
{
125+
private static readonly TimeSpan DeliveryWaitTimeout = TimeSpan.FromMinutes(1);
126+
127+
private readonly SemaphoreSlim deliveryLock = new SemaphoreSlim(1);
128+
129+
private volatile bool notAcceptingNewDeliveries = false;
130+
131+
public BlockingDeliveryQueue(FakeModel model, Action<CallbackExceptionEventArgs> onDeliveryException)
132+
: base(model, onDeliveryException)
133+
{
134+
}
135+
136+
public override void Deliver(Action deliveryAction)
137+
{
138+
if (notAcceptingNewDeliveries)
139+
return;
140+
141+
try
142+
{
143+
deliveryLock.Wait(DeliveryWaitTimeout);
144+
145+
ExecuteDelivery(deliveryAction);
146+
}
147+
finally
148+
{
149+
deliveryLock.Release();
150+
}
151+
}
152+
153+
public override void Complete()
154+
{
155+
notAcceptingNewDeliveries = true;
156+
}
157+
158+
public override void WaitForCompletion()
159+
{
160+
try
161+
{
162+
deliveryLock.Wait(DeliveryWaitTimeout);
163+
}
164+
finally
165+
{
166+
deliveryLock.Release();
167+
}
168+
}
169+
}
170+
}

src/AddUp.FakeRabbitMQ/FakeModel.cs

Lines changed: 12 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@
33
using System.Collections.Generic;
44
using System.Linq;
55
using System.Threading;
6-
using System.Threading.Channels;
7-
using System.Threading.Tasks;
86
using RabbitMQ.Client;
97
using RabbitMQ.Client.Events;
108
using RabbitMQ.Client.Exceptions;
@@ -15,20 +13,18 @@ internal sealed class FakeModel : IModel
1513
{
1614
private readonly ConcurrentDictionary<ulong, RabbitMessage> workingMessages = new ConcurrentDictionary<ulong, RabbitMessage>();
1715
private readonly ConcurrentDictionary<string, ConsumerData> consumers = new ConcurrentDictionary<string, ConsumerData>();
18-
private readonly Channel<Action> deliveries = Channel.CreateUnbounded<Action>(new UnboundedChannelOptions
19-
{
20-
SingleReader = true,
21-
SingleWriter = false,
22-
});
16+
17+
private readonly ConsumerDeliveryQueue deliveryQueue;
2318
private readonly RabbitServer server;
24-
private readonly Task deliveriesTask;
25-
private readonly AsyncLocal<bool> isDeliveriesTask = new AsyncLocal<bool>();
2619
private long lastDeliveryTag;
2720

2821
public FakeModel(RabbitServer rabbitServer)
2922
{
3023
server = rabbitServer;
31-
deliveriesTask = Task.Run(HandleDeliveries);
24+
deliveryQueue = ConsumerDeliveryQueue.Create(
25+
this,
26+
rabbitServer.BlockingConsumerDelivery,
27+
onDeliveryException: args => CallbackException(this, args));
3228
}
3329

3430
#pragma warning disable 67
@@ -127,7 +123,8 @@ void notifyConsumerOfMessage(RabbitMessage message)
127123
if (queueInstance != null)
128124
{
129125
void publishedAction(object sender, RabbitMessage message) =>
130-
deliveries.Writer.TryWrite(() => notifyConsumerOfMessage(message));
126+
deliveryQueue.Deliver(() => notifyConsumerOfMessage(message));
127+
131128
var consumerData = new ConsumerData(consumer, queueInstance, publishedAction);
132129

133130
// https://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.consume.consumer-tag
@@ -138,7 +135,8 @@ ConsumerData updateFunction(string s, ConsumerData _) =>
138135
_ = consumers.AddOrUpdate(consumerTag, consumerData, updateFunction);
139136

140137
foreach (var message in queueInstance.Messages)
141-
_ = deliveries.Writer.TryWrite(() => notifyConsumerOfMessage(message));
138+
consumerData.QueueMessagePublished(this, message);
139+
142140
queueInstance.MessagePublished += consumerData.QueueMessagePublished;
143141

144142
if (consumer is IAsyncBasicConsumer asyncBasicConsumer)
@@ -279,7 +277,7 @@ private void Close(ushort replyCode, string replyText, bool abort)
279277
foreach (var consumerTag in consumerTags)
280278
BasicCancel(consumerTag);
281279

282-
_ = deliveries.Writer.TryComplete();
280+
deliveryQueue.Complete();
283281
ModelShutdown?.Invoke(this, reason);
284282
}
285283
catch
@@ -288,10 +286,7 @@ private void Close(ushort replyCode, string replyText, bool abort)
288286
}
289287
}
290288

291-
// It's possible that we can end up calling Close on a model from within the delivery handler.
292-
// If this is the case, we must not wait on it to complete as this will deadlock!
293-
if (!isDeliveriesTask.Value)
294-
deliveriesTask.Wait();
289+
deliveryQueue.WaitForCompletion();
295290
}
296291

297292
public void ConfirmSelect()
@@ -482,47 +477,5 @@ public bool WaitForConfirms(TimeSpan timeout, out bool timedOut)
482477

483478
public void WaitForConfirmsOrDie() => WaitForConfirmsOrDie(Timeout.InfiniteTimeSpan);
484479
public void WaitForConfirmsOrDie(TimeSpan timeout) => _ = WaitForConfirms(timeout);
485-
486-
/// <summary>
487-
/// Rabbit docs states that each connection is backed by a single background thread:
488-
///
489-
/// https://www.rabbitmq.com/dotnet-api-guide.html#concurrency-thread-usage
490-
///
491-
/// However, this is not actually true, it's backed by a Task:
492-
///
493-
/// https://github.com/rabbitmq/rabbitmq-dotnet-client/blob/65dd5f92dda130ec35b4ad6fe7bc54dbcb1637fd/projects/RabbitMQ.Client/client/impl/ConsumerWorkService.cs#L81
494-
///
495-
/// FakeModels aren't aware of their connection, so in order to emulate this, just
496-
/// run a task that handles deliveries per task. It's necessary to match RabbitMQ
497-
/// semantics as running delivery callbacks synchronously can cause deadlocks in
498-
/// code under test.
499-
/// </summary>
500-
private async Task HandleDeliveries()
501-
{
502-
try
503-
{
504-
isDeliveriesTask.Value = true;
505-
while (await deliveries.Reader.WaitToReadAsync().ConfigureAwait(false))
506-
{
507-
while (deliveries.Reader.TryRead(out var delivery))
508-
{
509-
try
510-
{
511-
if (!IsOpen) break;
512-
delivery();
513-
}
514-
catch (Exception ex)
515-
{
516-
var callbackArgs = CallbackExceptionEventArgs.Build(ex, "");
517-
CallbackException(this, callbackArgs);
518-
}
519-
}
520-
}
521-
}
522-
catch
523-
{
524-
// Swallow exceptions so Close() doesn't have to deal with it.
525-
}
526-
}
527480
}
528481
}

src/AddUp.FakeRabbitMQ/RabbitServer.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,12 @@ public RabbitServer()
2323
public ConcurrentDictionary<string, RabbitExchange> Exchanges { get; }
2424
public ConcurrentDictionary<string, RabbitQueue> Queues { get; }
2525

26+
/// <summary>
27+
/// If true, deliveries to consumers will execute in a blocking manner, meaning the publish will not
28+
/// finish until the message has reached all registered consumers.
29+
/// </summary>
30+
public bool BlockingConsumerDelivery { get; set; }
31+
2632
public void Reset()
2733
{
2834
Exchanges.Clear();

0 commit comments

Comments
 (0)