Skip to content

Commit 00062c2

Browse files
committed
[Host.AzureServiceBus][Host.AzureEventHub] Fix the batch publish chunking iterations
Signed-off-by: Tomasz Maruszak <[email protected]>
1 parent 528cb04 commit 00062c2

File tree

6 files changed

+302
-116
lines changed

6 files changed

+302
-116
lines changed

src/Host.Plugin.Properties.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
<Import Project="Common.NuGet.Properties.xml" />
55

66
<PropertyGroup>
7-
<Version>3.3.1-rc103</Version>
7+
<Version>3.3.1</Version>
88
</PropertyGroup>
99

1010
</Project>

src/SlimMessageBus.Host.AzureEventHub/EventHubMessageBus.cs

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -187,21 +187,31 @@ public override async Task<ProduceToTransportBulkResult<T>> ProduceToTransportBu
187187
{
188188
inBatch.Add(item.Envelope);
189189
advance = it.MoveNext();
190-
if (advance)
190+
}
191+
else
192+
{
193+
// Current message doesn't fit in this batch
194+
if (batch.Count == 0)
191195
{
192-
continue;
196+
throw new ProducerMessageBusException($"Failed to add message {item.Envelope.Message} of type {item.Envelope.MessageType?.Name} on path {path} to an empty batch");
193197
}
194-
}
195198

196-
if (batch.Count == 0)
197-
{
198-
throw new ProducerMessageBusException($"Failed to add message {item.Envelope.Message} of type {item.Envelope.MessageType?.Name} on path {path} to an empty batch");
199+
// Send the current batch and retry with the current message in a new batch
200+
await producer.SendAsync(batch, cancellationToken).ConfigureAwait(false);
201+
dispatched.AddRange(inBatch);
202+
inBatch.Clear();
203+
204+
batch.Dispose();
205+
batch = null;
206+
// Don't advance - retry the current message in the next iteration
199207
}
208+
}
200209

201-
advance = false;
210+
// Send any remaining messages in the final batch
211+
if (batch != null && batch.Count > 0)
212+
{
202213
await producer.SendAsync(batch, cancellationToken).ConfigureAwait(false);
203214
dispatched.AddRange(inBatch);
204-
inBatch.Clear();
205215

206216
batch.Dispose();
207217
batch = null;

src/SlimMessageBus.Host.AzureServiceBus/ServiceBusMessageBus.cs

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -251,22 +251,32 @@ public override async Task<ProduceToTransportBulkResult<T>> ProduceToTransportBu
251251
{
252252
inBatch.Add(item.Envelope);
253253
advance = it.MoveNext();
254-
if (advance)
254+
}
255+
else
256+
{
257+
// Current message doesn't fit in this batch
258+
if (batch.Count == 0)
255259
{
256-
continue;
260+
throw new ProducerMessageBusException($"Failed to add message {item.Envelope.Message} of Type {item.Envelope.MessageType?.Name} on Path {path} to an empty batch");
257261
}
258-
}
259262

260-
if (batch.Count == 0)
261-
{
262-
throw new ProducerMessageBusException($"Failed to add message {item.Envelope.Message} of Type {item.Envelope.MessageType?.Name} on Path {path} to an empty batch");
263+
// Send the current batch and retry with the current message in a new batch
264+
await SendBatchAsync(path, senderClient, envelopes, batch, cancellationToken).ConfigureAwait(false);
265+
dispatched.AddRange(inBatch);
266+
inBatch.Clear();
267+
268+
batch.Dispose();
269+
batch = null;
270+
// Don't advance - retry the current message in the next iteration
263271
}
272+
}
264273

265-
advance = false;
274+
// Send any remaining messages in the final batch
275+
if (batch != null && batch.Count > 0)
276+
{
266277
await SendBatchAsync(path, senderClient, envelopes, batch, cancellationToken).ConfigureAwait(false);
267278
dispatched.AddRange(inBatch);
268-
inBatch.Clear();
269-
279+
270280
batch.Dispose();
271281
batch = null;
272282
}

src/SlimMessageBus.Host.Configuration/SlimMessageBus.Host.Configuration.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
<Description>Core configuration interfaces of SlimMessageBus</Description>
77
<PackageTags>SlimMessageBus</PackageTags>
88
<RootNamespace>SlimMessageBus.Host</RootNamespace>
9-
<Version>3.3.1-rc103</Version>
9+
<Version>3.3.1</Version>
1010
</PropertyGroup>
1111

1212
<ItemGroup>

0 commit comments

Comments
 (0)