Skip to content

Commit 57ff4e9

Browse files
cyril265zarusz
authored andcommitted
#419 add RedisListCheckerConsumerTest tests
Signed-off-by: Kirill Povolotskyy <[email protected]>
1 parent 1abed4f commit 57ff4e9

File tree

1 file changed

+140
-0
lines changed

1 file changed

+140
-0
lines changed
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
namespace SlimMessageBus.Host.Redis.Test;
2+
3+
using System.Text;
4+
5+
using Serialization;
6+
7+
using StackExchange.Redis;
8+
9+
public class RedisListCheckerConsumerTest
10+
{
11+
private readonly Mock<IDatabase> _databaseMock;
12+
private readonly Mock<IMessageProcessor<MessageWithHeaders>> _messageProcessorMock;
13+
private readonly Mock<IMessageSerializer> _envelopeSerializerMock;
14+
private readonly RedisListCheckerConsumer _subject;
15+
private readonly string _queueName = "test-queue";
16+
private readonly MessageWithHeaders _testMessage;
17+
18+
public RedisListCheckerConsumerTest()
19+
{
20+
_databaseMock = new Mock<IDatabase>();
21+
_messageProcessorMock = new Mock<IMessageProcessor<MessageWithHeaders>>();
22+
_envelopeSerializerMock = new Mock<IMessageSerializer>();
23+
var testPayload = "{\"Data\":\"test\"}"u8.ToArray();
24+
_testMessage = new MessageWithHeaders(testPayload, new Dictionary<string, object>());
25+
var queues = new[] { (_queueName, _messageProcessorMock.Object) };
26+
_subject = new RedisListCheckerConsumer(
27+
NullLogger<RedisListCheckerConsumer>.Instance,
28+
new List<IAbstractConsumerInterceptor>(),
29+
_databaseMock.Object,
30+
pollDelay: TimeSpan.FromMilliseconds(10),
31+
maxIdle: TimeSpan.FromMilliseconds(50),
32+
queues,
33+
_envelopeSerializerMock.Object);
34+
}
35+
36+
[Fact]
37+
public async Task Should_ProcessMessage_AfterException()
38+
{
39+
// Arrange
40+
var processedMessages = new List<MessageWithHeaders>();
41+
var callCount = 0;
42+
43+
_databaseMock
44+
.Setup(x => x.ListLeftPopAsync(It.IsAny<RedisKey>(), It.IsAny<CommandFlags>()))
45+
.ReturnsAsync(() =>
46+
{
47+
callCount++;
48+
if (callCount == 1)
49+
throw new RedisConnectionException(ConnectionFailureType.SocketFailure, "Connection failed");
50+
if (callCount == 2)
51+
return (RedisValue)"serialized-message";
52+
return RedisValue.Null;
53+
});
54+
55+
_envelopeSerializerMock
56+
.Setup(x => x.Deserialize(typeof(MessageWithHeaders), null, It.IsAny<byte[]>(), null))
57+
.Returns(_testMessage);
58+
59+
var tcs = new TaskCompletionSource();
60+
61+
_messageProcessorMock
62+
.Setup(x => x.ProcessMessage(
63+
It.IsAny<MessageWithHeaders>(),
64+
It.IsAny<IReadOnlyDictionary<string, object>>(),
65+
It.IsAny<IDictionary<string, object>>(),
66+
It.IsAny<IServiceProvider>(),
67+
It.IsAny<CancellationToken>()))
68+
.Returns<MessageWithHeaders, IReadOnlyDictionary<string, object>, IDictionary<string, object>,
69+
IServiceProvider, CancellationToken>((msg, _, _, _, _) =>
70+
{
71+
processedMessages.Add(msg);
72+
tcs.SetResult();
73+
return Task.FromResult(new ProcessMessageResult { Result = ProcessResult.Success });
74+
});
75+
76+
// Act
77+
_ = _subject.Start();
78+
await tcs.Task.WaitAsync(TimeSpan.FromSeconds(1)); // Wait for processing or timeout
79+
await _subject.Stop();
80+
81+
// Assert
82+
_testMessage.Should().BeEquivalentTo(processedMessages[0]);
83+
}
84+
85+
[Fact]
86+
public async Task Should_ProcessMultipleMessages_Successfully()
87+
{
88+
// Arrange
89+
var processedMessages = new List<MessageWithHeaders>();
90+
var callCount = 0;
91+
var totalMessages = 3;
92+
var tcs = new TaskCompletionSource();
93+
94+
_databaseMock
95+
.Setup(x => x.ListLeftPopAsync(It.IsAny<RedisKey>(), It.IsAny<CommandFlags>()))
96+
.ReturnsAsync(() =>
97+
{
98+
callCount++;
99+
if (callCount <= totalMessages) return (RedisValue)$"serialized-message-{callCount}";
100+
return RedisValue.Null;
101+
});
102+
103+
_envelopeSerializerMock
104+
.Setup(x => x.Deserialize(typeof(MessageWithHeaders), null, It.IsAny<byte[]>(), null))
105+
.Returns((Type type, string _, byte[] bytes, string __) =>
106+
{
107+
// Simulate unique payload for each message
108+
return new MessageWithHeaders(bytes, new Dictionary<string, object>());
109+
});
110+
111+
_messageProcessorMock
112+
.Setup(x => x.ProcessMessage(
113+
It.IsAny<MessageWithHeaders>(),
114+
It.IsAny<IReadOnlyDictionary<string, object>>(),
115+
It.IsAny<IDictionary<string, object>>(),
116+
It.IsAny<IServiceProvider>(),
117+
It.IsAny<CancellationToken>()))
118+
.Returns<MessageWithHeaders, IReadOnlyDictionary<string, object>, IDictionary<string, object>,
119+
IServiceProvider, CancellationToken>((msg, _, _, _, _) =>
120+
{
121+
processedMessages.Add(msg);
122+
if (processedMessages.Count == totalMessages) tcs.SetResult();
123+
124+
return Task.FromResult(new ProcessMessageResult { Result = ProcessResult.Success });
125+
});
126+
127+
// Act
128+
_ = _subject.Start();
129+
await tcs.Task.WaitAsync(TimeSpan.FromSeconds(1)); // Wait for all messages or timeout
130+
await _subject.Stop();
131+
132+
// Assert
133+
processedMessages.Count().Should().Be(totalMessages);
134+
for (var i = 0; i < totalMessages; i++)
135+
{
136+
var expectedPayload = Encoding.UTF8.GetBytes($"serialized-message-{i + 1}");
137+
processedMessages[i].Payload.Should().BeEquivalentTo(expectedPayload);
138+
}
139+
}
140+
}

0 commit comments

Comments
 (0)