Skip to content

Commit 035e392

Browse files
Custom serializer for subscriptions (#408)
* Add support for custom serializers to individual subscriptions. * Simplify registration of custom dependencies for subscriptions * Add custom type mapper support
1 parent df72c08 commit 035e392

File tree

11 files changed

+186
-205
lines changed

11 files changed

+186
-205
lines changed

samples/esdb/Bookings/Registrations.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ public static void AddEventuous(this IServiceCollection services, IConfiguration
3434
services.AddSingleton<Services.ConvertCurrency>((from, currency) => new Money(from.Amount * 2, currency));
3535

3636
services.AddSingleton(Mongo.ConfigureMongo(configuration));
37-
services.AddCheckpointStore<MongoCheckpointStore>();
3837

3938
services.AddSubscription<AllStreamSubscription, AllStreamSubscriptionOptions>(
4039
"BookingsProjections",

src/Core/src/Eventuous.Subscriptions/Logging/Logger.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ public class LogContext {
4040
public InternalLogger? InfoLog { get; }
4141
public InternalLogger? WarnLog { get; }
4242
public InternalLogger? ErrorLog { get; }
43+
public InternalLogger? FatalLog { get; }
4344

4445
public LogContext(string subscriptionId, ILoggerFactory loggerFactory) {
4546
SubscriptionId = subscriptionId;
@@ -49,6 +50,7 @@ public LogContext(string subscriptionId, ILoggerFactory loggerFactory) {
4950
InfoLog = GetLogger(LogLevel.Information);
5051
WarnLog = GetLogger(LogLevel.Warning);
5152
ErrorLog = GetLogger(LogLevel.Error);
53+
FatalLog = GetLogger(LogLevel.Critical);
5254

5355
return;
5456

src/Core/src/Eventuous.Subscriptions/Registrations/SubscriptionBuilderExtensions.cs

Lines changed: 44 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -37,15 +37,11 @@ public static SubscriptionBuilder WithPartitioningByStream(this SubscriptionBuil
3737
/// <summary>
3838
/// Use non-default checkpoint store for the specific subscription
3939
/// </summary>
40-
/// <param name="builder"></param>
41-
/// <typeparam name="TSubscription">Subscription type</typeparam>
42-
/// <typeparam name="TOptions">Subscription options type</typeparam>
40+
/// <param name="builder">Subscription builder</param>
4341
/// <typeparam name="T">Checkpoint store type</typeparam>
4442
/// <returns></returns>
45-
public static SubscriptionBuilder<TSubscription, TOptions> UseCheckpointStore<TSubscription, TOptions, T>(this SubscriptionBuilder<TSubscription, TOptions> builder)
46-
where T : class, ICheckpointStore
47-
where TSubscription : EventSubscriptionWithCheckpoint<TOptions>
48-
where TOptions : SubscriptionWithCheckpointOptions {
43+
public static SubscriptionBuilder UseCheckpointStore<T>(this SubscriptionBuilder builder)
44+
where T : class, ICheckpointStore {
4945
builder.Services.TryAddKeyedSingleton<T>(builder.SubscriptionId);
5046

5147
if (EventuousDiagnostics.Enabled) {
@@ -64,19 +60,12 @@ public static SubscriptionBuilder<TSubscription, TOptions> UseCheckpointStore<TS
6460
/// <summary>
6561
/// Use non-default checkpoint store for the specific subscription
6662
/// </summary>
67-
/// <param name="builder"></param>
63+
/// <param name="builder">Subscription builder</param>
6864
/// <param name="factory">Function to resolve the checkpoint store service from service provider</param>
69-
/// <typeparam name="TSubscription">Subscription type</typeparam>
70-
/// <typeparam name="TOptions">Subscription options type</typeparam>
7165
/// <typeparam name="T">Checkpoint store type</typeparam>
7266
/// <returns></returns>
73-
public static SubscriptionBuilder<TSubscription, TOptions> UseCheckpointStore<TSubscription, TOptions, T>(
74-
this SubscriptionBuilder<TSubscription, TOptions> builder,
75-
Func<IServiceProvider, T> factory
76-
)
77-
where T : class, ICheckpointStore
78-
where TSubscription : EventSubscriptionWithCheckpoint<TOptions>
79-
where TOptions : SubscriptionWithCheckpointOptions {
67+
public static SubscriptionBuilder UseCheckpointStore<T>(this SubscriptionBuilder builder, Func<IServiceProvider, T> factory)
68+
where T : class, ICheckpointStore {
8069
if (EventuousDiagnostics.Enabled) {
8170
builder.Services.TryAddKeyedSingleton<ICheckpointStore>(
8271
builder.SubscriptionId,
@@ -89,4 +78,42 @@ Func<IServiceProvider, T> factory
8978

9079
return builder;
9180
}
81+
82+
/// <summary>
83+
/// Use non-default serializer for the specific subscription
84+
/// </summary>
85+
/// <param name="builder">Subscription builder</param>
86+
/// <param name="factory">Function to create the serializer instance</param>
87+
/// <typeparam name="T">Serializer type</typeparam>
88+
/// <returns></returns>
89+
public static SubscriptionBuilder UseSerializer<T>(this SubscriptionBuilder builder, Func<IServiceProvider, T> factory) where T : class, IEventSerializer {
90+
builder.Services.TryAddKeyedSingleton<IEventSerializer>(builder.SubscriptionId, (sp, _) => factory(sp));
91+
92+
return builder;
93+
}
94+
95+
/// <summary>
96+
/// Use non-default serializer for the specific subscription
97+
/// </summary>
98+
/// <param name="builder">Subscription builder</param>
99+
/// <typeparam name="T">Serializer type</typeparam>
100+
/// <returns></returns>
101+
public static SubscriptionBuilder UseSerializer<T>(this SubscriptionBuilder builder) where T : class, IEventSerializer {
102+
builder.Services.TryAddKeyedSingleton<IEventSerializer, T>(builder.SubscriptionId);
103+
104+
return builder;
105+
}
106+
107+
/// <summary>
108+
/// Use non-default type mapper for the specific subscription
109+
/// </summary>
110+
/// <param name="builder">Subscription builder</param>
111+
/// <param name="typeMapper">Custom type mapper instance</param>
112+
/// <typeparam name="T">Type mapper type</typeparam>
113+
/// <returns></returns>
114+
public static SubscriptionBuilder UseTypeMapper<T>(this SubscriptionBuilder builder, T typeMapper) where T : class, ITypeMapper {
115+
builder.Services.TryAddKeyedSingleton<ITypeMapper>(builder.SubscriptionId, typeMapper);
116+
117+
return builder;
118+
}
92119
}

src/Core/test/Eventuous.Tests.Subscriptions/DefaultConsumerTests.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
1+
using System.Diagnostics.CodeAnalysis;
12
using Eventuous.Subscriptions;
23
using Eventuous.Subscriptions.Consumers;
34
using Eventuous.Subscriptions.Context;
45
using Eventuous.TestHelpers.TUnit;
56

67
namespace Eventuous.Tests.Subscriptions;
78

8-
public class DefaultConsumerTests() : IDisposable {
9+
[SuppressMessage("Performance", "CA1822:Mark members as static")]
10+
public class DefaultConsumerTests : IDisposable {
911
readonly TestEventListener _listener = new();
1012

1113
[Test]
@@ -16,7 +18,7 @@ public async Task ShouldFailWhenHandlerNacks() {
1618

1719
await consumer.Consume(ctx);
1820

19-
ctx.HandlingResults.GetFailureStatus().Should().Be(EventHandlingStatus.Failure);
21+
await Assert.That(ctx.HandlingResults.GetFailureStatus()).IsEqualTo(EventHandlingStatus.Failure);
2022
}
2123

2224
public void Dispose() => _listener.Dispose();

src/Diagnostics/test/Eventuous.Tests.OpenTelemetry/Fixtures/MetricsSubscriptionFixtureBase.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ protected override void SetupServices(IServiceCollection services) {
5656
SubscriptionId,
5757
builder => builder
5858
.Configure(ConfigureSubscription)
59-
.UseCheckpointStore<TSubscription, TSubscriptionOptions, NoOpCheckpointStore>()
59+
.UseCheckpointStore<NoOpCheckpointStore>()
6060
.AddEventHandler<TestHandler>()
6161
);
6262

src/EventStore/src/Eventuous.EventStore/Subscriptions/StreamSubscription.cs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,16 @@ public StreamSubscription(
7171
ILoggerFactory? loggerFactory = null,
7272
IEventSerializer? eventSerializer = null,
7373
IMetadataSerializer? metaSerializer = null
74-
) : base(client, options, checkpointStore, consumePipe, SubscriptionKind.Stream, loggerFactory, eventSerializer, metaSerializer)
75-
=> Ensure.NotEmptyString(options.StreamName);
74+
) : base(client, options, checkpointStore, consumePipe, SubscriptionKind.Stream, loggerFactory, eventSerializer, metaSerializer) {
75+
if (string.IsNullOrWhiteSpace(options.StreamName)) {
76+
Log.FatalLog?.Log("Subscription has no stream name configured. Use SubscriptionBuilder.Configure to set the stream name", SubscriptionId);
77+
78+
// ReSharper disable once NotResolvedInText
79+
#pragma warning disable CA2208
80+
throw new ArgumentNullException("StreamName");
81+
#pragma warning restore CA2208
82+
}
83+
}
7684

7785
/// <summary>
7886
/// Starts a catch-up subscription

src/EventStore/src/Eventuous.EventStore/Subscriptions/SubscriptionBuilderExtensions.cs

Lines changed: 0 additions & 60 deletions
This file was deleted.
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
// Copyright (C) Eventuous HQ OÜ.All rights reserved
2+
// Licensed under the Apache License, Version 2.0.
3+
4+
using Eventuous.EventStore.Producers;
5+
using Eventuous.EventStore.Subscriptions;
6+
using Eventuous.Producers;
7+
using Eventuous.Subscriptions.Context;
8+
using Eventuous.Subscriptions.Registrations;
9+
using Eventuous.TestHelpers.TUnit.Logging;
10+
using Eventuous.Tests.Subscriptions.Base;
11+
using Microsoft.Extensions.DependencyInjection;
12+
using Microsoft.Extensions.Hosting;
13+
using Testcontainers.EventStoreDb;
14+
15+
namespace Eventuous.Tests.EventStore.Subscriptions;
16+
17+
public class CustomDependenciesTests {
18+
readonly TestSerializer _serializer = new();
19+
readonly TestCheckpointStore _checkpointStore = new();
20+
IHostedService _service = null!;
21+
EventStoreDbContainer _container = null!;
22+
readonly StreamName _streamName = new($"test-{Guid.NewGuid():N}");
23+
IProducer _producer = null!;
24+
TestEventHandler _handler = null!;
25+
26+
[Before(Test)]
27+
public async Task Setup(CancellationToken cancellationToken) {
28+
_container = EsdbContainer.Create();
29+
var services = new ServiceCollection();
30+
await _container.StartAsync(cancellationToken);
31+
32+
services.AddLogging(b => b.AddFilter("Grpc", LogLevel.Error).AddConsole().AddTUnit(LogLevel.Debug));
33+
34+
services.AddEventStoreClient(_container.GetConnectionString());
35+
services.AddProducer<EventStoreProducer>();
36+
37+
// Add NoOp store globally to make sure it's not picked up
38+
services.AddCheckpointStore<NoOpCheckpointStore>();
39+
40+
_handler = new();
41+
var typeMapper = new TypeMapper();
42+
typeMapper.AddType<TestEvent>();
43+
44+
services.AddSubscription<StreamSubscription, StreamSubscriptionOptions>(
45+
"test-custom",
46+
b => b
47+
.Configure(cfg => cfg.StreamName = _streamName)
48+
.UseCheckpointStore<TestCheckpointStore>(_ => _checkpointStore)
49+
.UseSerializer<TestSerializer>(_ => _serializer)
50+
.UseTypeMapper(typeMapper)
51+
.AddEventHandler(_handler)
52+
);
53+
54+
var provider = services.BuildServiceProvider();
55+
56+
_producer = provider.GetRequiredService<IProducer>();
57+
_service = provider.GetRequiredService<IHostedService>();
58+
59+
await _service.StartAsync(cancellationToken);
60+
}
61+
62+
[After(Test)]
63+
public async Task Shutdown(CancellationToken cancellationToken) {
64+
await _service.StopAsync(cancellationToken);
65+
await _container.StopAsync(cancellationToken);
66+
}
67+
68+
[Test]
69+
public async Task ShouldUseCustomDependencies(CancellationToken cancellationToken) {
70+
var message = TestEvent.Create();
71+
await _producer.Produce(_streamName, message, new(), cancellationToken: cancellationToken);
72+
73+
while (_handler.Message == null) {
74+
await Task.Delay(100, cancellationToken);
75+
}
76+
77+
await Assert.That(_handler.Message).IsTypeOf<TestEvent>();
78+
await Assert.That(_handler.Message).IsEqualTo(message with {Number = message.Number + 1});
79+
}
80+
81+
class TestEventHandler : IEventHandler {
82+
public string DiagnosticName => nameof(TestEventHandler);
83+
84+
public object? Message { get; private set; }
85+
86+
public ValueTask<EventHandlingStatus> HandleEvent(IMessageConsumeContext context) {
87+
Message = context.Message;
88+
89+
return ValueTask.FromResult(EventHandlingStatus.Handled);
90+
}
91+
}
92+
93+
class TestCheckpointStore : ICheckpointStore {
94+
public bool ReceivedGetCheckpoint { get; private set; }
95+
96+
public ValueTask<Checkpoint> GetLastCheckpoint(string checkpointId, CancellationToken cancellationToken) {
97+
ReceivedGetCheckpoint = true;
98+
99+
return ValueTask.FromResult(new Checkpoint(checkpointId, null));
100+
}
101+
102+
public ValueTask<Checkpoint> StoreCheckpoint(Checkpoint checkpoint, bool force, CancellationToken cancellationToken) {
103+
return ValueTask.FromResult(checkpoint);
104+
}
105+
}
106+
107+
class TestSerializer : IEventSerializer {
108+
public DeserializationResult DeserializeEvent(ReadOnlySpan<byte> data, string eventType, string contentType) {
109+
var result = DefaultEventSerializer.Instance.DeserializeEvent(data, eventType, contentType);
110+
111+
if (result is not DeserializationResult.SuccessfullyDeserialized { Payload: var evt }) {
112+
return result;
113+
}
114+
115+
return evt is not TestEvent testEvent
116+
? result
117+
: new DeserializationResult.SuccessfullyDeserialized(testEvent with { Number = testEvent.Number + 1 });
118+
}
119+
120+
public SerializationResult SerializeEvent(object evt) {
121+
throw new NotImplementedException();
122+
}
123+
}
124+
}

src/EventStore/test/Eventuous.Tests.EventStore/Subscriptions/SubscriptionIgnoredMessagesTests.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using Eventuous.EventStore.Producers;
22
using Eventuous.EventStore.Subscriptions;
33
using Eventuous.Producers;
4+
using Eventuous.Subscriptions.Registrations;
45
using Eventuous.Tests.Subscriptions.Base;
56
using Microsoft.Extensions.DependencyInjection;
67
// ReSharper disable MethodHasAsyncOverload

0 commit comments

Comments
 (0)