Skip to content

Commit 3639414

Browse files
committed
Inline side effects, ameliorations for concurrent access to messaging in side effect processing
1 parent 10584fb commit 3639414

12 files changed

+59
-31
lines changed

src/Marten/Events/Aggregation/AggregationRuntime.cs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,13 @@ public AggregationRuntime(IDocumentStore store, IAggregateProjection projection,
8484
public IEventSlicer<TDoc, TId> Slicer { get; }
8585
public IDocumentStorage<TDoc, TId> Storage { get; }
8686

87+
private bool shouldProcessSideEffects(DocumentSessionBase session, ProjectionLifecycle lifecycle)
88+
{
89+
if (session is ProjectionDocumentSession { Mode: ShardExecutionMode.Continuous }) return true;
90+
91+
return lifecycle == ProjectionLifecycle.Inline && session.Options.Events.EnableSideEffectsOnInlineProjections;
92+
}
93+
8794
public async ValueTask ApplyChangesAsync(DocumentSessionBase session,
8895
EventSlice<TDoc, TId> slice, CancellationToken cancellation,
8996
ProjectionLifecycle lifecycle = ProjectionLifecycle.Inline)
@@ -94,7 +101,7 @@ public async ValueTask ApplyChangesAsync(DocumentSessionBase session,
94101
{
95102
var operation = Storage.DeleteForId(slice.Id, slice.Tenant.TenantId);
96103

97-
if (session is ProjectionDocumentSession { Mode: ShardExecutionMode.Continuous })
104+
if (shouldProcessSideEffects(session, lifecycle))
98105
{
99106
await processPossibleSideEffects(session, slice).ConfigureAwait(false);
100107
}
@@ -148,7 +155,7 @@ public async ValueTask ApplyChangesAsync(DocumentSessionBase session,
148155

149156
maybeArchiveStream(session, slice);
150157

151-
if (session is ProjectionDocumentSession { Mode: ShardExecutionMode.Continuous })
158+
if (shouldProcessSideEffects(session, lifecycle))
152159
{
153160
// Need to set the aggregate in case it didn't exist upfront
154161
slice.Aggregate = aggregate;

src/Marten/Events/Aggregation/Rebuilds/AggregatePageHandler.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,8 @@ public async Task ProcessPageAsync(IDaemonRuntime runtime, ShardName shardName,
153153
ShouldApplyListeners = false
154154
};
155155

156+
await batch.InitializeMessageBatch().ConfigureAwait(false);
157+
156158
// Gotta use the current tenant if using conjoined tenancy
157159
var sessionOptions = SessionOptions.ForDatabase(_session.TenantId, _session.Database);
158160

src/Marten/Events/Aggregation/SideEffectCode.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ public interface IMessageSink
1010
ValueTask PublishAsync<T>(T message);
1111
}
1212

13-
public interface ITenantedMessageSink
13+
public interface ITenantedMessageSink : IMessageSink
1414
{
1515
ValueTask PublishAsync<T>(T message, string tenantId);
1616
}

src/Marten/Events/Daemon/Internals/GroupedProjectionExecution.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,8 @@ private async Task<ProjectionUpdateBatch> buildBatchAsync(EventRangeGroup group,
220220
ShouldApplyListeners = group.Agent.Mode == ShardExecutionMode.Continuous && group.Range.Events.Any()
221221
};
222222

223+
await batch.InitializeMessageBatch().ConfigureAwait(false);
224+
223225
// Mark the progression
224226
batch.Queue.Post(group.Range.BuildProgressionOperation(_store.Events));
225227

src/Marten/Events/Daemon/Internals/ProjectionUpdateBatch.cs

Lines changed: 10 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ public class ProjectionUpdateBatch: IUpdateBatch, IAsyncDisposable, IDisposable,
2525
private readonly DaemonSettings _settings;
2626
private readonly CancellationToken _token;
2727
private OperationPage? _current;
28-
private DocumentSessionBase? _session;
28+
private DocumentSessionBase _session;
2929

3030
private IMartenSession Session
3131
{
@@ -41,6 +41,7 @@ private IMartenSession Session
4141
internal ProjectionUpdateBatch(DaemonSettings settings,
4242
DocumentSessionBase? session, ShardExecutionMode mode, CancellationToken token)
4343
{
44+
4445
_settings = settings;
4546
_session = session ?? throw new ArgumentNullException(nameof(session));
4647
_token = token;
@@ -54,6 +55,12 @@ internal ProjectionUpdateBatch(DaemonSettings settings,
5455
startNewPage(session);
5556
}
5657

58+
public async Task InitializeMessageBatch()
59+
{
60+
_batch = await _session!.Options.Events.MessageOutbox.CreateBatch(_session).ConfigureAwait(false);
61+
Listeners.Add(_batch);
62+
}
63+
5764
public async Task WaitForCompletion()
5865
{
5966
Queue.Complete();
@@ -335,25 +342,8 @@ protected void Dispose(bool disposing)
335342
}
336343

337344
private IMessageBatch? _batch;
338-
private readonly SemaphoreSlim _semaphore = new(1, 1);
339-
public async ValueTask<IMessageBatch> CurrentMessageBatch(DocumentSessionBase session)
345+
public ValueTask<IMessageBatch> CurrentMessageBatch(DocumentSessionBase session)
340346
{
341-
if (_batch != null) return _batch;
342-
343-
await _semaphore.WaitAsync(_token).ConfigureAwait(false);
344-
345-
if (_batch != null) return _batch;
346-
347-
try
348-
{
349-
_batch = await _session.Options.Events.MessageOutbox.CreateBatch(session).ConfigureAwait(false);
350-
Listeners.Add(_batch);
351-
352-
return _batch;
353-
}
354-
finally
355-
{
356-
_semaphore.Release();
357-
}
347+
return new ValueTask<IMessageBatch>(_batch);
358348
}
359349
}

src/Marten/Events/EventGraph.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,12 @@ internal EventGraph(StoreOptions options)
115115
/// </summary>
116116
public bool EnableUniqueIndexOnEventId { get; set; } = false;
117117

118+
/// <summary>
119+
/// Opt into having Marten process "side effects" on aggregation projections (SingleStreamProjection/MultiStreamProjection) while
120+
/// running in an Inline lifecycle. Default is false;
121+
/// </summary>
122+
public bool EnableSideEffectsOnInlineProjections { get; set; } = false;
123+
118124
/// <summary>
119125
/// Configure whether event streams are identified with Guid or strings
120126
/// </summary>

src/Marten/Events/IEventStoreOptions.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,12 @@ bool UseIdentityMapForInlineAggregates
106106
/// </summary>
107107
public bool UseMandatoryStreamTypeDeclaration { get; set; }
108108

109+
/// <summary>
110+
/// Opt into having Marten process "side effects" on aggregation projections (SingleStreamProjection/MultiStreamProjection) while
111+
/// running in an Inline lifecycle. Default is false;
112+
/// </summary>
113+
bool EnableSideEffectsOnInlineProjections { get; set; }
114+
109115
/// <summary>
110116
/// Register an event type with Marten. This isn't strictly necessary for normal usage,
111117
/// but can help Marten with asynchronous projections where Marten hasn't yet encountered

src/Marten/Events/IReadOnlyEventStoreOptions.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,4 +83,10 @@ public interface IReadOnlyEventStoreOptions
8383
/// but this will be true in 8.0
8484
/// </summary>
8585
bool UseMandatoryStreamTypeDeclaration { get; set; }
86+
87+
/// <summary>
88+
/// Opt into having Marten process "side effects" on aggregation projections (SingleStreamProjection/MultiStreamProjection) while
89+
/// running in an Inline lifecycle. Default is false;
90+
/// </summary>
91+
public bool EnableSideEffectsOnInlineProjections { get; set; }
8692
}

src/Marten/Events/Projections/ProjectionOptions.cs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -356,11 +356,6 @@ public void Add<TProjection>(
356356
)
357357
where TProjection : GeneratedProjection, new()
358358
{
359-
if (lifecycle == ProjectionLifecycle.Live)
360-
{
361-
throw new InvalidOperationException("The generic overload of Add does not support Live projections, please use the non-generic overload.");
362-
}
363-
364359
var projection = new TProjection { Lifecycle = lifecycle };
365360

366361
asyncConfiguration?.Invoke(projection.Options);

src/Marten/Internal/Sessions/DocumentSessionBase.SaveChanges.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,12 +104,17 @@ public async Task SaveChangesAsync(CancellationToken token = default)
104104

105105
await ExecuteBatchAsync(batch, token).ConfigureAwait(false);
106106

107+
if (_messageBatch != null)
108+
{
109+
await _messageBatch.AfterCommitAsync(this, _workTracker, token).ConfigureAwait(false);
110+
}
111+
107112
resetDirtyChecking();
108113

109114
EjectPatchedTypes(_workTracker);
110115
Logger.RecordSavedChanges(this, _workTracker);
111116

112-
foreach (var listener in Listeners)
117+
foreach (var listener in Listeners.ToArray())
113118
{
114119
await listener.AfterCommitAsync(this, _workTracker, token).ConfigureAwait(false);
115120
}

0 commit comments

Comments
 (0)