Skip to content

Commit 35b0d9d

Browse files
committed
Ability to use multiple streams at once w/ the aggregate handler workflow. Closes GH-1638
1 parent 9c4addf commit 35b0d9d

File tree

4 files changed

+156
-9
lines changed

4 files changed

+156
-9
lines changed
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
using Marten;
2+
using Shouldly;
3+
using WolverineWebApi.Accounts;
4+
5+
namespace Wolverine.Http.Tests.Marten;
6+
7+
public class working_against_multiple_streams : IntegrationContext
8+
{
9+
public working_against_multiple_streams(AppFixture fixture) : base(fixture)
10+
{
11+
}
12+
13+
private async Task<Guid> createAccount(double amount)
14+
{
15+
var created = new AccountCreated(amount);
16+
using var session = Host.DocumentStore().LightweightSession();
17+
var id = session.Events.StartStream<Account>(created).Id;
18+
await session.SaveChangesAsync();
19+
return id;
20+
}
21+
22+
private async Task<double> fetchAmount(Guid id)
23+
{
24+
using var session = Host.DocumentStore().LightweightSession();
25+
var account = await session.Events.FetchLatest<Account>(id);
26+
return account.Amount;
27+
}
28+
29+
[Fact]
30+
public async Task happy_path_found_both_accounts_append_to_both()
31+
{
32+
var from = await createAccount(1000);
33+
var to = await createAccount(100);
34+
35+
await Scenario(x =>
36+
{
37+
x.Post.Json(new TransferMoney(from, to, 150)).ToUrl("/accounts/transfer");
38+
x.StatusCodeShouldBe(204);
39+
});
40+
41+
(await fetchAmount(from)).ShouldBe(850);
42+
(await fetchAmount(to)).ShouldBe(250);
43+
}
44+
45+
[Fact]
46+
public async Task reject_when_the_from_does_not_have_enough_funds()
47+
{
48+
var from = await createAccount(1000);
49+
var to = await createAccount(100);
50+
51+
await Scenario(x =>
52+
{
53+
x.Post.Json(new TransferMoney(from, to, 2000)).ToUrl("/accounts/transfer");
54+
x.StatusCodeShouldBe(204);
55+
});
56+
57+
(await fetchAmount(from)).ShouldBe(1000);
58+
(await fetchAmount(to)).ShouldBe(100);
59+
}
60+
61+
[Fact]
62+
public async Task return_404_when_first_account_does_not_exist()
63+
{
64+
//var from = await createAccount(1000);
65+
var to = await createAccount(100);
66+
67+
await Scenario(x =>
68+
{
69+
x.Post.Json(new TransferMoney(Guid.NewGuid(), to, 2000)).ToUrl("/accounts/transfer");
70+
x.StatusCodeShouldBe(404);
71+
});
72+
}
73+
74+
[Fact]
75+
public async Task return_404_when_second_account_does_not_exist()
76+
{
77+
var from = await createAccount(1000);
78+
//var to = await createAccount(100);
79+
80+
await Scenario(x =>
81+
{
82+
x.Post.Json(new TransferMoney(from, Guid.NewGuid(), 2000)).ToUrl("/accounts/transfer");
83+
x.StatusCodeShouldBe(404);
84+
});
85+
}
86+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
using JasperFx.Events;
2+
using Marten.Events;
3+
using Wolverine.Http;
4+
using Wolverine.Http.Marten;
5+
6+
namespace WolverineWebApi.Accounts;
7+
8+
public record AccountCreated(double InitialAmount);
9+
public record Debited(double Amount);
10+
public record Withdrawn(double Amount);
11+
12+
public class Account
13+
{
14+
public Guid Id { get; set; }
15+
public double Amount { get; set; }
16+
17+
public static Account Create(IEvent<AccountCreated> e)
18+
=> new Account { Id = e.StreamId, Amount = e.Data.InitialAmount};
19+
20+
public void Apply(Debited e) => Amount += e.Amount;
21+
public void Apply(Withdrawn e) => Amount -= e.Amount;
22+
}
23+
24+
public record TransferMoney(Guid FromId, Guid ToId, double Amount);
25+
26+
public static class TransferMoneyEndpoint
27+
{
28+
[WolverinePost("/accounts/transfer")]
29+
public static void Post(
30+
TransferMoney command,
31+
32+
[Aggregate(nameof(TransferMoney.FromId))] IEventStream<Account> fromAccount,
33+
34+
[Aggregate(nameof(TransferMoney.ToId))] IEventStream<Account> toAccount)
35+
{
36+
// Would already 404 if either referenced account does not exist
37+
if (fromAccount.Aggregate.Amount >= command.Amount)
38+
{
39+
fromAccount.AppendOne(new Withdrawn(command.Amount));
40+
toAccount.AppendOne(new Debited(command.Amount));
41+
}
42+
}
43+
}

src/Persistence/Wolverine.Marten/AggregateHandling.cs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System.Diagnostics;
22
using System.Reflection;
3+
using ImTools;
34
using JasperFx.CodeGeneration.Frames;
45
using JasperFx.CodeGeneration.Model;
56
using JasperFx.Core;
@@ -29,6 +30,7 @@ internal record AggregateHandling(IDataRequirement Requirement)
2930

3031
public ConcurrencyStyle LoadStyle { get; init; }
3132
public Variable? Version { get; init; }
33+
public ParameterInfo? Parameter { get; set; }
3234

3335
public Variable Apply(IChain chain, IServiceContainer container)
3436
{
@@ -40,6 +42,10 @@ public Variable Apply(IChain chain, IServiceContainer container)
4042
var firstCall = chain.HandlerCalls().First();
4143

4244
var eventStream = loader.ReturnVariable!;
45+
if (Parameter != null)
46+
{
47+
eventStream.OverrideName("stream_" + Parameter.Name);
48+
}
4349

4450
if (AggregateType == firstCall.HandlerType)
4551
{
@@ -52,6 +58,11 @@ public Variable Apply(IChain chain, IServiceContainer container)
5258
ValidateMethodSignatureForEmittedEvents(chain, firstCall, chain);
5359
var aggregate = RelayAggregateToHandlerMethod(eventStream, chain, firstCall, AggregateType);
5460

61+
if (Parameter != null && Parameter.ParameterType.Closes(typeof(IEventStream<>)))
62+
{
63+
return eventStream;
64+
}
65+
5566
return aggregate;
5667
}
5768

@@ -203,7 +214,14 @@ internal Variable RelayAggregateToHandlerMethod(Variable eventStream, IChain cha
203214
}
204215
else
205216
{
206-
firstCall.TrySetArgument(aggregateVariable);
217+
if (!firstCall.TrySetArgument(aggregateVariable))
218+
{
219+
if (Parameter != null && Parameter.ParameterType.Closes(typeof(IEventStream<>)))
220+
{
221+
var index = firstCall.Method.GetParameters().IndexOf(x => x.Name == Parameter.Name);
222+
firstCall.Arguments[index] = eventStream;
223+
}
224+
};
207225
}
208226

209227
foreach (var methodCall in chain.Middleware.OfType<MethodCall>())

src/Persistence/Wolverine.Marten/WriteAggregateAttribute.cs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using JasperFx.Core;
55
using JasperFx.Core.Reflection;
66
using Marten;
7+
using Marten.Events;
78
using Wolverine.Attributes;
89
using Wolverine.Configuration;
910
using Wolverine.Persistence;
@@ -41,19 +42,17 @@ public WriteAggregateAttribute(string? routeOrParameterName)
4142

4243
public override Variable Modify(IChain chain, ParameterInfo parameter, IServiceContainer container, GenerationRules rules)
4344
{
44-
// TODO -- this goes away soon-ish
45-
if (chain.HandlerCalls().First().Method.GetParameters().Count(x => x.HasAttribute<WriteAggregateAttribute>()) > 1)
46-
{
47-
throw new InvalidOperationException(
48-
"It is only possible (today) to use a single [Aggregate] or [WriteAggregate] attribute on an HTTP handler method. Maybe use [ReadAggregate] if all you need is the projected data");
49-
}
50-
5145
var aggregateType = parameter.ParameterType;
5246
if (aggregateType.IsNullable())
5347
{
5448
aggregateType = aggregateType.GetInnerTypeFromNullable();
5549
}
5650

51+
if (aggregateType.Closes(typeof(IEventStream<>)))
52+
{
53+
aggregateType = aggregateType.GetGenericArguments()[0];
54+
}
55+
5756
var store = container.GetInstance<IDocumentStore>();
5857
var idType = store.Options.FindOrResolveDocumentType(aggregateType).IdType;
5958

@@ -75,7 +74,8 @@ public override Variable Modify(IChain chain, ParameterInfo parameter, IServiceC
7574
AggregateType = aggregateType,
7675
AggregateId = identity,
7776
LoadStyle = LoadStyle,
78-
Version = version
77+
Version = version,
78+
Parameter = parameter
7979
};
8080

8181
return handling.Apply(chain, container);

0 commit comments

Comments
 (0)