Skip to content

Commit 691c908

Browse files
committed
Disallowing the usage of "sticky" connections in projections or subscription usage. Closes GH-3939
1 parent c268df0 commit 691c908

File tree

3 files changed

+55
-40
lines changed

3 files changed

+55
-40
lines changed

src/Marten/Events/Daemon/Internals/ProjectionDocumentSession.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using Marten.Internal.Sessions;
66
using Marten.Internal.Storage;
77
using Marten.Services;
8+
using Npgsql;
89

910
namespace Marten.Events.Daemon.Internals;
1011

@@ -23,6 +24,15 @@ public ProjectionDocumentSession(DocumentStore store,
2324
Mode = mode;
2425
}
2526

27+
public override NpgsqlConnection Connection
28+
{
29+
get
30+
{
31+
throw new NotSupportedException(
32+
"It is not supported to use \"sticky\" connections inside of projections or subscriptions");
33+
}
34+
}
35+
2636
internal override DocumentTracking TrackingMode => SessionOptions.Tracking;
2737

2838
protected internal override IDocumentStorage<T> selectStorage<T>(DocumentProvider<T> provider) =>

src/Marten/Internal/Sessions/DocumentSessionBase.cs

Lines changed: 44 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,10 @@
1-
#nullable enable
21
using System;
32
using System.Collections.Generic;
43
using System.Diagnostics.CodeAnalysis;
54
using System.Linq;
6-
using System.Threading.Tasks;
75
using JasperFx.Core;
86
using JasperFx.Core.Reflection;
97
using Marten.Events;
10-
using Marten.Events.Aggregation;
11-
using Marten.Events.Daemon.Internals;
128
using Marten.Exceptions;
139
using Marten.Internal.Operations;
1410
using Marten.Internal.Storage;
@@ -54,7 +50,6 @@ public void EjectAllPendingChanges()
5450
ChangeTrackers.Clear();
5551
}
5652

57-
5853
public void Store<T>(IEnumerable<T> entities) where T : notnull
5954
{
6055
Store(entities?.ToArray()!);
@@ -94,6 +89,7 @@ public void UpdateRevision<T>(T entity, int revision) where T : notnull
9489
{
9590
r.Revision = revision;
9691
}
92+
9793
_workTracker.Add(op);
9894
}
9995

@@ -109,6 +105,7 @@ public void TryUpdateRevision<T>(T entity, int revision) where T : notnull
109105
r.Revision = revision;
110106
r.IgnoreConcurrencyViolation = true;
111107
}
108+
112109
_workTracker.Add(op);
113110
}
114111

@@ -141,7 +138,11 @@ public void Insert<T>(params T[] entities) where T : notnull
141138
{
142139
storage.Store(this, entity);
143140
var op = storage.Insert(entity, this, TenantId);
144-
if (op is IRevisionedOperation r) r.Revision = 1;
141+
if (op is IRevisionedOperation r)
142+
{
143+
r.Revision = 1;
144+
}
145+
145146
_workTracker.Add(op);
146147
}
147148
}
@@ -206,15 +207,17 @@ public void InsertObjects(IEnumerable<object> documents)
206207

207208
public void QueueSqlCommand(string sql, params object[] parameterValues)
208209
{
209-
QueueSqlCommand(DefaultParameterPlaceholder, sql, parameterValues: parameterValues);
210+
QueueSqlCommand(DefaultParameterPlaceholder, sql, parameterValues);
210211
}
211212

212213
public void QueueSqlCommand(char placeholder, string sql, params object[] parameterValues)
213214
{
214215
sql = sql.TrimEnd(';');
215216
if (sql.Contains(';'))
217+
{
216218
throw new ArgumentOutOfRangeException(nameof(sql),
217219
"You must specify one SQL command at a time because of Marten's usage of command batching. ';' cannot be used as a command separator here.");
220+
}
218221

219222
var operation = new ExecuteSqlStorageOperation(placeholder, sql, parameterValues);
220223
QueueOperation(operation);
@@ -335,7 +338,8 @@ private void store<T>(IEnumerable<T> entities) where T : notnull
335338
{
336339
var storage = StorageFor<T>();
337340

338-
if (Concurrency == ConcurrencyChecks.Disabled && (storage.UseOptimisticConcurrency || storage.UseNumericRevisions))
341+
if (Concurrency == ConcurrencyChecks.Disabled &&
342+
(storage.UseOptimisticConcurrency || storage.UseNumericRevisions))
339343
{
340344
foreach (var entity in entities)
341345
{
@@ -385,36 +389,6 @@ public void EjectPatchedTypes(IUnitOfWork changes)
385389
foreach (var type in patchedTypes) EjectAllOfType(type);
386390
}
387391

388-
internal interface IObjectHandler
389-
{
390-
void Execute(IDocumentSession session, IEnumerable<object> objects);
391-
}
392-
393-
internal class StoreHandler<T>: IObjectHandler where T : notnull
394-
{
395-
public void Execute(IDocumentSession session, IEnumerable<object> objects)
396-
{
397-
// Delegate to the Store<T>() method
398-
session.Store(objects.OfType<T>().ToArray());
399-
}
400-
}
401-
402-
internal class InsertHandler<T>: IObjectHandler where T : notnull
403-
{
404-
public void Execute(IDocumentSession session, IEnumerable<object> objects)
405-
{
406-
session.Insert(objects.OfType<T>().ToArray());
407-
}
408-
}
409-
410-
internal class DeleteHandler<T>: IObjectHandler where T : notnull
411-
{
412-
public void Execute(IDocumentSession session, IEnumerable<object> objects)
413-
{
414-
foreach (var document in objects.OfType<T>()) session.Delete(document);
415-
}
416-
}
417-
418392
internal void StoreDocumentInItemMap<TDoc, TId>(TId id, TDoc document) where TDoc : class where TId : notnull
419393
{
420394
if (ItemMap.ContainsKey(typeof(TDoc)))
@@ -429,7 +403,8 @@ internal void StoreDocumentInItemMap<TDoc, TId>(TId id, TDoc document) where TDo
429403
}
430404
}
431405

432-
internal bool TryGetAggregateFromIdentityMap<TDoc, TId>(TId id, [NotNullWhen(true)]out TDoc? document) where TDoc: notnull where TId : notnull
406+
internal bool TryGetAggregateFromIdentityMap<TDoc, TId>(TId id, [NotNullWhen(true)] out TDoc? document)
407+
where TDoc : notnull where TId : notnull
433408
{
434409
if (Options.EventGraph.UseIdentityMapForAggregates)
435410
{
@@ -449,4 +424,34 @@ internal bool TryGetAggregateFromIdentityMap<TDoc, TId>(TId id, [NotNullWhen(tru
449424
document = default;
450425
return false;
451426
}
427+
428+
internal interface IObjectHandler
429+
{
430+
void Execute(IDocumentSession session, IEnumerable<object> objects);
431+
}
432+
433+
internal class StoreHandler<T>: IObjectHandler where T : notnull
434+
{
435+
public void Execute(IDocumentSession session, IEnumerable<object> objects)
436+
{
437+
// Delegate to the Store<T>() method
438+
session.Store(objects.OfType<T>().ToArray());
439+
}
440+
}
441+
442+
internal class InsertHandler<T>: IObjectHandler where T : notnull
443+
{
444+
public void Execute(IDocumentSession session, IEnumerable<object> objects)
445+
{
446+
session.Insert(objects.OfType<T>().ToArray());
447+
}
448+
}
449+
450+
internal class DeleteHandler<T>: IObjectHandler where T : notnull
451+
{
452+
public void Execute(IDocumentSession session, IEnumerable<object> objects)
453+
{
454+
foreach (var document in objects.OfType<T>()) session.Delete(document);
455+
}
456+
}
452457
}

src/Marten/Internal/Sessions/QuerySession.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ internal QuerySession(
9090

9191
public ConcurrencyChecks Concurrency { get; protected set; } = ConcurrencyChecks.Enabled;
9292

93-
public NpgsqlConnection Connection
93+
public virtual NpgsqlConnection Connection
9494
{
9595
get
9696
{

0 commit comments

Comments
 (0)