Skip to content

Commit 22d7bc0

Browse files
committed
Fix concurrent queue item handling (re #294)
Revert changes from #294 and instead pass ConsumerDispatchConcurrency in SubscribeOptions. Use this to control concurrency and not the prefetch count otherwise that will create way too many tasks in the batch handler
1 parent f91f5d3 commit 22d7bc0

File tree

7 files changed

+29
-12
lines changed

7 files changed

+29
-12
lines changed

src/Hosting/Queue/src/BaseQueueHostedService.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -192,8 +192,7 @@ private static RabbitMqClientOptions CreateOptions(BaseQueueHostedServiceOptions
192192
ConnectionTimeout = options.ConnectionTimeout,
193193
EnableSsl = options.EnableSsl,
194194
IgnoreSslErrors = options.IgnoreSslErrors,
195-
LoggerFactory = loggerFactory,
196-
ConsumerDispatchConcurrency = options.ConcurrentTaskCount
195+
LoggerFactory = loggerFactory
197196
};
198197

199198
if (options.SslVersion != null)

src/Hosting/Queue/src/BaseQueueHostedServiceOptions.cs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,4 @@ public abstract class BaseQueueHostedServiceOptions
5757
/// The serializer to use for deserializing messages from the Queue
5858
/// </summary>
5959
public IMessageSerializer? Serializer { get; set; }
60-
61-
/// <summary>
62-
/// The number of tasks to run concurrently.
63-
/// </summary>
64-
public ushort ConcurrentTaskCount { get; set; } = 1;
6560
}

src/Hosting/Queue/src/BatchQueueHostedService.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ protected override async Task<SubscriptionContext> SubscribeAsync(IQueueClient q
4949
var subscribeOptions = new SubscribeOptions
5050
{
5151
PrefetchCount = Options.BatchSize,
52+
// We only want to run 1 thread when processing batches
53+
ConsumerDispatchConcurrency = 1,
5254
AutoAcknowledge = false
5355
};
5456

@@ -212,7 +214,7 @@ private async Task IntervalWorkerAsync(CancellationToken cancellationToken)
212214
}
213215
else
214216
{
215-
snapshot = Array.Empty<TMessage>();
217+
snapshot = [];
216218
latestDeliveryTag = 0;
217219
}
218220
}

src/Hosting/Queue/src/QueueHostedService.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,11 @@ protected override Task<SubscriptionContext> SubscribeAsync(IQueueClient queueCl
2828
{
2929
return queueClient.SubscribeAsync<TMessage>(queueName,
3030
OnMessageInternalAsync,
31-
new SubscribeOptions {PrefetchCount = Options.ConcurrentTaskCount},
31+
new SubscribeOptions
32+
{
33+
PrefetchCount = Options.ConcurrentTaskCount,
34+
ConsumerDispatchConcurrency = Options.ConcurrentTaskCount
35+
},
3236
cancellationToken);
3337
}
3438

src/Hosting/Queue/src/QueueHostedServiceOptions.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,8 @@ namespace ClickView.GoodStuff.Hosting.Queue;
55
/// </summary>
66
public abstract class QueueHostedServiceOptions : BaseQueueHostedServiceOptions
77
{
8+
/// <summary>
9+
/// The number of tasks to run concurrently.
10+
/// </summary>
11+
public ushort ConcurrentTaskCount { get; set; } = 1;
812
}

src/Queues/RabbitMq/src/RabbitMqClient.cs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ public async Task EnqueueAsync<TData>(string exchange, TData data, EnqueueOption
4848

4949
await using var channel = await GetChannelAsync(
5050
options.EnablePublisherConfirms,
51+
consumerDispatchConcurrency: null,
5152
cancellationToken : cancellationToken);
5253

5354
var properties = new BasicProperties
@@ -81,7 +82,10 @@ public async Task<SubscriptionContext> SubscribeAsync<TData>(string queue,
8182

8283
// We don't want to dispose the channel here (unless an exception is thrown, see below).
8384
// The returned SubscriptionContext is the object that should be disposed (which disposes the channel)
84-
var channel = await GetChannelAsync(false, options.PrefetchCount, cancellationToken);
85+
var channel = await GetChannelAsync(
86+
enablePublisherConfirms: false,
87+
consumerDispatchConcurrency: options.ConsumerDispatchConcurrency,
88+
cancellationToken: cancellationToken);
8589

8690
try
8791
{
@@ -193,15 +197,15 @@ private async ValueTask<IConnection> ConnectSlowAsync(CancellationToken cancella
193197
}
194198

195199
private async Task<IChannel> GetChannelAsync(bool enablePublisherConfirms,
196-
ushort consumerDispatchConcurrency = 1,
200+
ushort? consumerDispatchConcurrency,
197201
CancellationToken cancellationToken = default)
198202
{
199203
var connection = await GetConnectionAsync(cancellationToken);
200204

201205
var options = new CreateChannelOptions(
202206
publisherConfirmationsEnabled: enablePublisherConfirms,
203207
publisherConfirmationTrackingEnabled: enablePublisherConfirms,
204-
consumerDispatchConcurrency : consumerDispatchConcurrency
208+
consumerDispatchConcurrency: consumerDispatchConcurrency
205209
);
206210

207211
return await connection.CreateChannelAsync(options, cancellationToken);

src/Queues/RabbitMq/src/SubscribeOptions.cs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,14 @@ public class SubscribeOptions
55
public bool AutoAcknowledge { get; init; }
66
public ushort PrefetchCount { get; init; } = 1;
77

8+
/// <summary>
9+
/// Set to a value greater than one to enable concurrent processing. For a concurrency greater than one,
10+
/// tasks will be offloaded to the worker thread pool so it is important to choose the value for the concurrency wisely to avoid thread pool overloading.
11+
/// If set to null, the value from <see cref="RabbitMqClientOptions"/> will be used.
12+
/// </summary>
13+
/// <remarks>For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them.
14+
/// In addition to that consumers need to be thread/concurrency safe.</remarks>
15+
public ushort? ConsumerDispatchConcurrency { get; set; }
16+
817
internal static readonly SubscribeOptions Default = new();
918
}

0 commit comments

Comments
 (0)