Skip to content

Commit c268df0

Browse files
committed
Ability to do live aggregations without an identifier on the type. Closes GH-3920
Upgrading JasperFx & JasperFx.Events
1 parent 8f4a47a commit c268df0

18 files changed

+282
-38
lines changed
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
using System;
2+
using System.Threading.Tasks;
3+
using JasperFx.Events;
4+
using Marten.Testing.Harness;
5+
using Shouldly;
6+
using Xunit;
7+
8+
namespace EventSourcingTests.Aggregation;
9+
10+
public class live_aggregation_without_an_aggregate_identifier : OneOffConfigurationsContext
11+
{
12+
[Fact]
13+
public async Task live_aggregation_with_guid_identifiers()
14+
{
15+
StoreOptions(opts =>
16+
{
17+
opts.Events.StreamIdentity = StreamIdentity.AsGuid;
18+
});
19+
20+
var streamId = Guid.NewGuid();
21+
theSession.Events.StartStream(streamId, new AEvent(), new BEvent(), new BEvent(), new CEvent());
22+
await theSession.SaveChangesAsync();
23+
24+
var version1 = await theSession.Events.AggregateStreamAsync<CountOfLetters>(streamId);
25+
26+
var version2 = await theSession.Events.FetchLatest<CountOfLetters>(streamId);
27+
28+
var version3 = await theSession.Events.FetchForWriting<CountOfLetters>(streamId);
29+
30+
var version4 = await theSession.Events.AggregateStreamToLastKnownAsync<CountOfLetters>(streamId);
31+
32+
version1.ACount.ShouldBe(1);
33+
version1.BCount.ShouldBe(2);
34+
version1.CCount.ShouldBe(1);
35+
version1.DCount.ShouldBe(0);
36+
37+
version2.ACount.ShouldBe(1);
38+
version2.BCount.ShouldBe(2);
39+
version2.CCount.ShouldBe(1);
40+
version2.DCount.ShouldBe(0);
41+
42+
version3.Aggregate.ACount.ShouldBe(1);
43+
version3.Aggregate.BCount.ShouldBe(2);
44+
version3.Aggregate.CCount.ShouldBe(1);
45+
version3.Aggregate.DCount.ShouldBe(0);
46+
47+
version4.ACount.ShouldBe(1);
48+
version4.BCount.ShouldBe(2);
49+
version4.CCount.ShouldBe(1);
50+
version4.DCount.ShouldBe(0);
51+
}
52+
53+
[Fact]
54+
public async Task live_aggregation_with_string_identifiers()
55+
{
56+
StoreOptions(opts =>
57+
{
58+
opts.Events.StreamIdentity = StreamIdentity.AsString;
59+
});
60+
61+
var streamId = Guid.NewGuid().ToString();
62+
theSession.Events.StartStream(streamId, new AEvent(), new BEvent(), new BEvent(), new CEvent());
63+
await theSession.SaveChangesAsync();
64+
65+
var version1 = await theSession.Events.AggregateStreamAsync<CountOfLetters>(streamId);
66+
67+
var version2 = await theSession.Events.FetchLatest<CountOfLetters>(streamId);
68+
69+
var version3 = await theSession.Events.FetchForWriting<CountOfLetters>(streamId);
70+
71+
var version4 = await theSession.Events.AggregateStreamToLastKnownAsync<CountOfLetters>(streamId);
72+
73+
version1.ACount.ShouldBe(1);
74+
version1.BCount.ShouldBe(2);
75+
version1.CCount.ShouldBe(1);
76+
version1.DCount.ShouldBe(0);
77+
78+
version2.ACount.ShouldBe(1);
79+
version2.BCount.ShouldBe(2);
80+
version2.CCount.ShouldBe(1);
81+
version2.DCount.ShouldBe(0);
82+
83+
version3.Aggregate.ACount.ShouldBe(1);
84+
version3.Aggregate.BCount.ShouldBe(2);
85+
version3.Aggregate.CCount.ShouldBe(1);
86+
version3.Aggregate.DCount.ShouldBe(0);
87+
88+
version4.ACount.ShouldBe(1);
89+
version4.BCount.ShouldBe(2);
90+
version4.CCount.ShouldBe(1);
91+
version4.DCount.ShouldBe(0);
92+
}
93+
}
94+
95+
96+
public class CountOfLetters
97+
{
98+
public int ACount { get; set; }
99+
public int BCount { get; set; }
100+
public int CCount { get; set; }
101+
public int DCount { get; set; }
102+
public int ECount { get; set; }
103+
104+
public void Apply(AEvent _)
105+
{
106+
ACount++;
107+
}
108+
109+
public void Apply(BEvent _)
110+
{
111+
BCount++;
112+
}
113+
114+
public void Apply(CEvent _)
115+
{
116+
CCount++;
117+
}
118+
119+
public void Apply(DEvent _)
120+
{
121+
DCount++;
122+
}
123+
124+
public void Apply(EEvent _)
125+
{
126+
ECount++;
127+
}
128+
129+
}
130+

src/Marten/Events/EventGraph.cs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,21 @@ internal EventGraph(StoreOptions options)
9494
public IAggregatorSource<IQuerySession>? Build<TDoc>()
9595
{
9696
var idType = Options.Storage.MappingFor(typeof(TDoc)).IdType;
97+
98+
// For the quite legitimate case of doing a live aggregation when
99+
// there is no Id member
100+
if (idType == null)
101+
{
102+
if (StreamIdentity == StreamIdentity.AsGuid)
103+
{
104+
idType = typeof(Guid);
105+
}
106+
else
107+
{
108+
idType = typeof(string);
109+
}
110+
}
111+
97112
return typeof(SingleStreamProjection<,>)
98113
.CloseAndBuildAs<IAggregatorSource<IQuerySession>>(typeof(TDoc), idType);
99114
}

src/Marten/Events/EventStore.FetchForWriting.cs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
using JasperFx.Core;
99
using JasperFx.Core.Reflection;
1010
using JasperFx.Events;
11+
using JasperFx.Events.Aggregation;
1112
using JasperFx.Events.Projections;
1213
using Marten.Internal;
1314
using Marten.Internal.Sessions;
@@ -195,24 +196,24 @@ internal IAggregateFetchPlan<TDoc, TId> FindFetchPlan<TDoc, TId>() where TDoc :
195196
return (IAggregateFetchPlan<TDoc, TId>)stored;
196197
}
197198

198-
var storage = _store.Options.ResolveCorrectedDocumentStorage<TDoc, TId>(DocumentTracking.IdentityOnly);
199-
200-
var plan = determineFetchPlan(storage, _session.Options);
199+
var plan = determineFetchPlan<TDoc, TId>(_session.Options);
201200

202201
_fetchStrategies = _fetchStrategies.AddOrUpdate(typeof(TDoc), plan);
203202

204203
return plan;
205204
}
206205

207-
private IAggregateFetchPlan<TDoc, TId> determineFetchPlan<TDoc, TId>(IDocumentStorage<TDoc, TId> storage,
208-
StoreOptions options) where TDoc : class where TId : notnull
206+
private IAggregateFetchPlan<TDoc, TId> determineFetchPlan<TDoc, TId>(StoreOptions options) where TDoc : class where TId : notnull
209207
{
210208
foreach (var planner in options.Projections.allPlanners())
211209
{
212-
if (planner.TryMatch(storage, (IEventIdentityStrategy<TId>)this, options, out var plan)) return plan;
210+
if (planner.TryMatch<TDoc, TId>((IEventIdentityStrategy<TId>)this, options, out var plan))
211+
{
212+
return plan;
213+
}
213214
}
214215

215-
throw new ArgumentOutOfRangeException(nameof(storage),
216+
throw new InvalidOperationException(
216217
$"Unable to determine a fetch plan for aggregate {typeof(TDoc).FullNameInCode()}. Is there a valid single stream aggregation projection for this type?");
217218
}
218219
}

src/Marten/Events/Fetching/AsyncFetchPlanner.cs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using System;
22
using System.Diagnostics.CodeAnalysis;
33
using JasperFx.Core.Reflection;
4+
using JasperFx.Events.Aggregation;
45
using JasperFx.Events.Daemon;
56
using JasperFx.Events.Projections;
67
using Marten.Events.Projections;
@@ -11,8 +12,9 @@ namespace Marten.Events.Fetching;
1112

1213
internal class AsyncFetchPlanner: IFetchPlanner
1314
{
14-
public bool TryMatch<TDoc, TId>(IDocumentStorage<TDoc, TId> storage, IEventIdentityStrategy<TId> identity, StoreOptions options,
15-
[NotNullWhen(true)]out IAggregateFetchPlan<TDoc, TId>? plan) where TDoc : class where TId : notnull
15+
public bool TryMatch<TDoc, TId>(IEventIdentityStrategy<TId> identity,
16+
StoreOptions options,
17+
[NotNullWhen(true)] out IAggregateFetchPlan<TDoc, TId>? plan) where TDoc : class where TId : notnull
1618
{
1719
if (options.Projections.TryFindAggregate(typeof(TDoc), out var projection))
1820
{
@@ -22,8 +24,6 @@ public bool TryMatch<TDoc, TId>(IDocumentStorage<TDoc, TId> storage, IEventIdent
2224
$"The aggregate type {typeof(TDoc).FullNameInCode()} is the subject of a multi-stream projection and cannot be used with FetchForWriting");
2325
}
2426

25-
26-
2727
if (projection.Scope == AggregationScope.MultiStream)
2828
{
2929
throw new InvalidOperationException(
@@ -32,10 +32,9 @@ public bool TryMatch<TDoc, TId>(IDocumentStorage<TDoc, TId> storage, IEventIdent
3232

3333
if (projection.Lifecycle == ProjectionLifecycle.Async)
3434
{
35-
var mapping = options.Storage.FindMapping(typeof(TDoc)) as DocumentMapping;
36-
if (mapping != null && mapping.Metadata.Revision.Enabled)
35+
if (options.Storage.FindMapping(typeof(TDoc)) is DocumentMapping { Metadata.Revision.Enabled: true })
3736
{
38-
plan = new FetchAsyncPlan<TDoc, TId>(options.EventGraph, identity, storage);
37+
plan = new FetchAsyncPlan<TDoc, TId>(options.EventGraph, identity, options.ResolveCorrectedDocumentStorage<TDoc, TId>(DocumentTracking.IdentityOnly));
3938
return true;
4039
}
4140
}

src/Marten/Events/Fetching/FetchInlinedPlan.ExpectedVersion.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
using Marten.Exceptions;
88
using Marten.Internal;
99
using Marten.Internal.Sessions;
10+
using Marten.Internal.Storage;
1011
using Marten.Linq.QueryHandlers;
1112
using Npgsql;
1213
using Weasel.Postgresql;
@@ -29,7 +30,7 @@ public async Task<IEventStream<TDoc>> FetchForWriting(DocumentSessionBase sessio
2930

3031
builder.StartNewCommand();
3132

32-
var handler = new LoadByIdHandler<TDoc, TId>(storage, id);
33+
var handler = new LoadByIdHandler<TDoc, TId>((IDocumentStorage<TDoc, TId>)storage, id);
3334
handler.ConfigureCommand(builder, session);
3435

3536
await using var reader =
@@ -91,7 +92,7 @@ public IQueryHandler<IEventStream<TDoc>> BuildQueryHandler(QuerySession session,
9192
{
9293
session.AssertIsDocumentSession();
9394
var storage = findDocumentStorage(session);
94-
var handler = new LoadByIdHandler<TDoc, TId>(storage, id);
95+
var handler = new LoadByIdHandler<TDoc, TId>((IDocumentStorage<TDoc, TId>)storage, id);
9596
return new WithStartingVersionHandler(this, id, handler, expectedStartingVersion);
9697
}
9798

src/Marten/Events/Fetching/FetchInlinedPlan.ForReading.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using System.Threading;
22
using System.Threading.Tasks;
33
using Marten.Internal.Sessions;
4+
using Marten.Internal.Storage;
45
using Marten.Linq.QueryHandlers;
56
using Weasel.Postgresql;
67

@@ -23,7 +24,7 @@ internal partial class FetchInlinedPlan<TDoc, TId>
2324
var builder = new BatchBuilder { TenantId = session.TenantId };
2425
builder.Append(";");
2526

26-
var handler = new LoadByIdHandler<TDoc, TId>(storage, id);
27+
var handler = new LoadByIdHandler<TDoc, TId>((IDocumentStorage<TDoc, TId>)storage, id);
2728
handler.ConfigureCommand(builder, session);
2829

2930
await using var reader =

src/Marten/Events/Fetching/FetchInlinedPlan.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
using System.Threading.Tasks;
77
using JasperFx;
88
using JasperFx.Core.Reflection;
9+
using JasperFx.Events.Aggregation;
910
using JasperFx.Events.Projections;
1011
using Marten.Exceptions;
1112
using Marten.Internal;

src/Marten/Events/Fetching/FetchLivePlan.cs

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
using System.Threading.Tasks;
44
using JasperFx;
55
using JasperFx.Core.Reflection;
6+
using JasperFx.Events;
67
using JasperFx.Events.Aggregation;
78
using JasperFx.Events.Projections;
89
using Marten.Exceptions;
@@ -17,11 +18,11 @@ namespace Marten.Events.Fetching;
1718
internal partial class FetchLivePlan<TDoc, TId>: IAggregateFetchPlan<TDoc, TId> where TDoc : class where TId : notnull
1819
{
1920
private readonly IAggregator<TDoc, TId, IQuerySession> _aggregator;
20-
private readonly IDocumentStorage<TDoc, TId> _documentStorage;
21+
private readonly IIdentitySetter<TDoc, TId> _documentStorage;
2122
private readonly IEventIdentityStrategy<TId> _identityStrategy;
2223

2324
public FetchLivePlan(EventGraph events, IEventIdentityStrategy<TId> identityStrategy,
24-
IDocumentStorage<TDoc, TId> documentStorage)
25+
IIdentitySetter<TDoc, TId> documentStorage)
2526
{
2627
IsGlobal = events.GlobalAggregates.Contains(typeof(TDoc));
2728

@@ -30,8 +31,20 @@ public FetchLivePlan(EventGraph events, IEventIdentityStrategy<TId> identityStra
3031

3132
var raw = events.Options.Projections.AggregatorFor<TDoc>();
3233

33-
_aggregator = raw as IAggregator<TDoc, TId, IQuerySession>
34-
?? typeof(IdentityForwardingAggregator<,,,>).CloseAndBuildAs<IAggregator<TDoc, TId, IQuerySession>>(raw, _documentStorage, typeof(TDoc), _documentStorage.IdType, typeof(TId), typeof(IQuerySession));
34+
// yeah, I know, this is kind of gross
35+
if (documentStorage is NulloIdentitySetter<TDoc, TId>)
36+
{
37+
_aggregator = (IAggregator<TDoc, TId, IQuerySession>?)raw;
38+
}
39+
else
40+
{
41+
var simpleType = events.StreamIdentity == StreamIdentity.AsGuid ? typeof(Guid) : typeof(string);
42+
var idType = documentStorage is IDocumentStorage<TDoc, TId> s ? s.IdType : typeof(TId);
43+
44+
// The goofy identity forwarding thing is to deal with custom value types. Of course.
45+
_aggregator = raw as IAggregator<TDoc, TId, IQuerySession>
46+
?? typeof(IdentityForwardingAggregator<,,,>).CloseAndBuildAs<IAggregator<TDoc, TId, IQuerySession>>(raw, _documentStorage, typeof(TDoc), idType, simpleType, typeof(IQuerySession));
47+
}
3548
}
3649

3750
public bool IsGlobal { get; }
Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
using System.Diagnostics.CodeAnalysis;
2+
using JasperFx.Events.Aggregation;
23
using Marten.Internal.Storage;
34

45
namespace Marten.Events.Fetching;
56

67
public interface IFetchPlanner
78
{
8-
bool TryMatch<TDoc, TId>(IDocumentStorage<TDoc, TId> storage, IEventIdentityStrategy<TId> identity,
9-
StoreOptions options, [NotNullWhen(true)]out IAggregateFetchPlan<TDoc, TId>? plan) where TDoc : class where TId : notnull;
9+
bool TryMatch<TDoc, TId>(IEventIdentityStrategy<TId> identity,
10+
StoreOptions options, [NotNullWhen(true)] out IAggregateFetchPlan<TDoc, TId>? plan) where TDoc : class where TId : notnull;
1011
}

src/Marten/Events/Fetching/InlineFetchPlanner.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
using System.Diagnostics.CodeAnalysis;
2+
using JasperFx.Events.Aggregation;
23
using JasperFx.Events.Projections;
34
using Marten.Internal.Storage;
45

56
namespace Marten.Events.Fetching;
67

78
internal class InlineFetchPlanner : IFetchPlanner
89
{
9-
public bool TryMatch<TDoc, TId>(IDocumentStorage<TDoc, TId> storage, IEventIdentityStrategy<TId> identity, StoreOptions options,
10-
[NotNullWhen(true)]out IAggregateFetchPlan<TDoc, TId>? plan) where TDoc : class where TId : notnull
10+
public bool TryMatch<TDoc, TId>(IEventIdentityStrategy<TId> identity,
11+
StoreOptions options,
12+
[NotNullWhen(true)] out IAggregateFetchPlan<TDoc, TId>? plan) where TDoc : class where TId : notnull
1113
{
1214
if (options.Projections.TryFindAggregate(typeof(TDoc), out var projection))
1315
{

0 commit comments

Comments
 (0)