Skip to content

Commit 124c735

Browse files
committed
[Host] AutoStartConsumersEnabled not respected for child buses
Signed-off-by: Tomasz Maruszak <[email protected]>
1 parent 699c23a commit 124c735

File tree

9 files changed

+116
-33
lines changed

9 files changed

+116
-33
lines changed

src/Host.Plugin.Properties.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
<Import Project="Common.NuGet.Properties.xml" />
55

66
<PropertyGroup>
7-
<Version>3.3.2-rc100</Version>
7+
<Version>3.3.2-rc102</Version>
88
</PropertyGroup>
99

1010
</Project>

src/SlimMessageBus.Host.Configuration/Settings/MessageBusSettings.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,15 +37,14 @@ public IServiceProvider ServiceProvider
3737
/// <summary>
3838
/// When true will start the message consumption on consumers after the bus is created.
3939
/// </summary>
40-
public bool AutoStartConsumers { get; set; }
40+
public bool? AutoStartConsumers { get; set; }
4141

4242
public MessageBusSettings(MessageBusSettings parent = null)
4343
{
4444
_children = [];
4545
Producers = [];
4646
Consumers = [];
4747
SerializerType = typeof(IMessageSerializerProvider);
48-
AutoStartConsumers = true;
4948

5049
if (parent != null)
5150
{

src/SlimMessageBus.Host.Configuration/SlimMessageBus.Host.Configuration.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
<Description>Core configuration interfaces of SlimMessageBus</Description>
77
<PackageTags>SlimMessageBus</PackageTags>
88
<RootNamespace>SlimMessageBus.Host</RootNamespace>
9-
<Version>3.3.1</Version>
9+
<Version>3.3.2-rc102</Version>
1010
</PropertyGroup>
1111

1212
<ItemGroup>

src/SlimMessageBus.Host/DependencyResolver/MessageBusHostedService.cs

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,9 @@
55
/// <summary>
66
/// <see cref="IHostedService"/> responsible for starting message bus consumers.
77
/// </summary>
8-
public class MessageBusHostedService(IConsumerControl bus, MessageBusSettings messageBusSettings) : IHostedService
8+
internal class MessageBusHostedService(IMasterMessageBus bus) : IHostedService
99
{
10-
public async Task StartAsync(CancellationToken cancellationToken)
11-
{
12-
if (messageBusSettings.AutoStartConsumers)
13-
{
14-
await bus.Start();
15-
}
16-
}
10+
public Task StartAsync(CancellationToken cancellationToken) => bus.AutoStart(cancellationToken);
1711

1812
public Task StopAsync(CancellationToken cancellationToken) => bus.Stop();
1913
}

src/SlimMessageBus.Host/DependencyResolver/ServiceCollectionExtensions.cs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,14 @@ public static IServiceCollection AddSlimMessageBus(this IServiceCollection servi
7373
var currentBusProvider = svp.GetRequiredService<ICurrentMessageBusProvider>();
7474
MessageBus.SetProvider(currentBusProvider.GetCurrent);
7575

76-
return (IMasterMessageBus)mbb.Build();
76+
var messageBus = (IMasterMessageBus)mbb.Build();
77+
78+
// If we are not running in the hosted environment, we need to start the bus ourselves after creation
79+
// See MessageBusHostedService for the hosted environment
80+
// Fire and forget
81+
_ = Task.Run(() => messageBus.AutoStart(default));
82+
83+
return messageBus;
7784
});
7885

7986
services.TryAddTransient<IConsumerControl>(svp => svp.GetRequiredService<IMasterMessageBus>());

src/SlimMessageBus.Host/Hybrid/HybridMessageBus.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,9 @@ protected virtual MessageBusBase BuildBus(MessageBusBuilder builder)
6868
return (MessageBusBase)bus;
6969
}
7070

71+
public Task AutoStart(CancellationToken cancellationToken)
72+
=> Task.WhenAll(_busByName.Values.Select(x => x.AutoStart(cancellationToken)));
73+
7174
public Task Start() =>
7275
Task.WhenAll(_busByName.Values.Select(x => x.Start()));
7376

src/SlimMessageBus.Host/IMasterMessageBus.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,6 @@
33
public interface IMasterMessageBus : IMessageBusProducer, IConsumerControl, ITopologyControl, IMessageBusProvider
44
{
55
IMessageSerializerProvider SerializerProvider { get; }
6+
7+
Task AutoStart(CancellationToken cancellationToken);
68
}

src/SlimMessageBus.Host/MessageBusBase.cs

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ protected MessageBusBase(MessageBusSettings settings)
101101

102102
RuntimeTypeCache = settings.ServiceProvider.GetRequiredService<RuntimeTypeCache>();
103103

104-
MessageBusTarget = new MessageBusProxy(this, Settings.ServiceProvider);
104+
MessageBusTarget = new MessageBusProxy(this, settings.ServiceProvider);
105105

106106
TimeProvider = settings.ServiceProvider.GetRequiredService<TimeProvider>();
107107

@@ -124,25 +124,10 @@ protected void OnBuildProvider()
124124

125125
// Notify the bus has been created - before any message can be produced
126126
InitTaskList.Add(() => OnBusLifecycle(MessageBusLifecycleEventType.Created), CancellationToken);
127-
128-
// Auto start consumers if enabled
129-
if (Settings.AutoStartConsumers)
130-
{
131-
// Fire and forget start
132-
_ = Task.Run(async () =>
133-
{
134-
try
135-
{
136-
await Start().ConfigureAwait(false);
137-
}
138-
catch (Exception e)
139-
{
140-
LogCouldNotStartConsumers(e);
141-
}
142-
});
143-
}
144127
}
145128

129+
130+
146131
protected virtual void Build()
147132
{
148133
ProducerSettingsByMessageType = new ProducerByMessageTypeCache<ProducerSettings>(_logger, BuildProducerByBaseMessageType(), RuntimeTypeCache);
@@ -178,6 +163,22 @@ private async Task OnBusLifecycle(MessageBusLifecycleEventType eventType)
178163
}
179164
}
180165

166+
public async virtual Task AutoStart(CancellationToken cancellationToken)
167+
{
168+
// Auto start consumers if enabled for this bus (check first on this bus if setting set, otherwise check parent bus, fallack to true)
169+
if (Settings.AutoStartConsumers ?? Settings.Parent?.AutoStartConsumers ?? true)
170+
{
171+
try
172+
{
173+
await Start().ConfigureAwait(false);
174+
}
175+
catch (Exception e)
176+
{
177+
LogCouldNotStartConsumers(e);
178+
}
179+
}
180+
}
181+
181182
public async Task Start()
182183
{
183184
lock (_startLock)

src/Tests/SlimMessageBus.Host.Test/MessageBusBaseTests.cs

Lines changed: 79 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
namespace SlimMessageBus.Host.Test;
22

33
using SlimMessageBus.Host.Collections;
4+
using SlimMessageBus.Host.Hybrid;
45
using SlimMessageBus.Host.Test.Common;
56

67
public class MessageBusBaseTests : IDisposable
78
{
8-
private MessageBusBuilder BusBuilder { get; }
9+
private MessageBusBuilder BusBuilder { get; set; }
910
private readonly Lazy<MessageBusTested> _busLazy;
1011
private MessageBusTested Bus => _busLazy.Value;
1112
private readonly DateTimeOffset _timeZero;
@@ -59,7 +60,15 @@ public MessageBusBaseTests()
5960
};
6061
});
6162

62-
_busLazy = new Lazy<MessageBusTested>(() => (MessageBusTested)BusBuilder.Build());
63+
_busLazy = new Lazy<MessageBusTested>(CreateMessageBus<MessageBusTested>);
64+
}
65+
66+
private T CreateMessageBus<T>() where T : IMasterMessageBus
67+
{
68+
var bus = (T)BusBuilder.Build();
69+
_ = Task.Run(() => bus.AutoStart(default));
70+
Thread.Sleep(200);
71+
return bus;
6372
}
6473

6574
public void Dispose()
@@ -94,6 +103,74 @@ public void When_Create_Given_ConfigurationThatDeclaresSameMessageTypeMoreThanOn
94103
.WithMessage("*was declared more than once*");
95104
}
96105

106+
[Theory]
107+
[InlineData(null, null, true, true)]
108+
[InlineData(null, false, false, true)]
109+
[InlineData(false, null, false, false)]
110+
[InlineData(false, true, true, false)]
111+
public async Task When_Create_Given_TwoChildBusAndOneHasAutoStartConsumersAsOff_Then_OnlyChildBusIsStarted(bool? rootBusEnabled, bool? child1Enabled, bool child1ShouldStart, bool child2ShouldStart)
112+
{
113+
// arrange
114+
BusBuilder = MessageBusBuilder
115+
.Create()
116+
.WithServiceProvider(_serviceProviderMock.Object)
117+
.WithProviderHybrid();
118+
119+
Mock<MessageBusBase> childBusMock1 = null;
120+
Mock<MessageBusBase> childBusMock2 = null;
121+
122+
if (rootBusEnabled != null)
123+
{
124+
BusBuilder.AutoStartConsumersEnabled(rootBusEnabled.Value);
125+
}
126+
127+
BusBuilder.AddChildBus("child1", mbb =>
128+
{
129+
130+
if (child1Enabled != null)
131+
{
132+
mbb.AutoStartConsumersEnabled(child1Enabled.Value);
133+
}
134+
mbb.WithProvider((s) =>
135+
{
136+
var childBusSettings = new MessageBusSettings(s)
137+
{
138+
Name = "child1"
139+
};
140+
childBusMock1 = new Mock<MessageBusBase>(childBusSettings) { CallBase = true };
141+
return childBusMock1.Object;
142+
});
143+
});
144+
145+
BusBuilder.AddChildBus("child2", mbb =>
146+
{
147+
mbb.WithProvider((s) =>
148+
{
149+
var childBusSettings = new MessageBusSettings(s)
150+
{
151+
Name = "child2"
152+
};
153+
childBusMock2 = new Mock<MessageBusBase>(childBusSettings) { CallBase = true };
154+
return childBusMock2.Object;
155+
});
156+
});
157+
158+
159+
// act
160+
var bus = CreateMessageBus<IMasterMessageBus>();
161+
162+
await Task.Delay(TimeSpan.FromSeconds(2));
163+
164+
// assert
165+
childBusMock1.Should().NotBeNull();
166+
childBusMock2.Should().NotBeNull();
167+
168+
childBusMock1.Verify(x => x.AutoStart(It.IsAny<CancellationToken>()), Times.Once);
169+
childBusMock2.Verify(x => x.AutoStart(It.IsAny<CancellationToken>()), Times.Once);
170+
childBusMock1.Verify(x => x.OnStart(), child1ShouldStart ? Times.Once : Times.Never);
171+
childBusMock2.Verify(x => x.OnStart(), child2ShouldStart ? Times.Once : Times.Never);
172+
}
173+
97174
[Fact]
98175
public async Task When_Create_Then_BusLifecycleCreatedIsSentToRegisteredInterceptors()
99176
{

0 commit comments

Comments
 (0)