6
6
using System . Linq ;
7
7
using System . Threading ;
8
8
using System . Threading . Tasks ;
9
+ using ImTools ;
9
10
using JasperFx ;
10
11
using JasperFx . Descriptors ;
11
12
using Marten . Events . Operations ;
17
18
namespace Marten . Internal . Sessions ;
18
19
19
20
internal class EventTracingConnectionLifetime :
20
- IConnectionLifetime
21
+ IConnectionLifetime , ITransactionStarter
21
22
{
22
23
private const string MartenCommandExecutionStarted = "marten.command.execution.started" ;
23
24
private const string MartenBatchExecutionStarted = "marten.batch.execution.started" ;
24
25
private const string MartenBatchPagesExecutionStarted = "marten.batch.pages.execution.started" ;
25
- private readonly IConnectionLifetime _innerConnectionLifetime ;
26
26
private readonly OpenTelemetryOptions _telemetryOptions ;
27
27
private readonly Activity ? _databaseActivity ;
28
+ private readonly string _tenantId ;
28
29
29
30
public EventTracingConnectionLifetime ( IConnectionLifetime innerConnectionLifetime , string tenantId ,
30
31
OpenTelemetryOptions telemetryOptions )
@@ -38,24 +39,36 @@ public EventTracingConnectionLifetime(IConnectionLifetime innerConnectionLifetim
38
39
39
40
Logger = innerConnectionLifetime . Logger ;
40
41
CommandTimeout = innerConnectionLifetime . CommandTimeout ;
41
- _innerConnectionLifetime = innerConnectionLifetime ;
42
+ InnerConnectionLifetime = innerConnectionLifetime ;
42
43
_telemetryOptions = telemetryOptions ;
43
44
44
45
var currentActivity = Activity . Current ?? null ;
45
- var tags = new ActivityTagsCollection ( new [ ] { new KeyValuePair < string , object ? > ( OtelConstants . TenantId , tenantId ) } ) ;
46
+ var tags = new ActivityTagsCollection ( [ new KeyValuePair < string , object ? > ( OtelConstants . TenantId , tenantId ) ] ) ;
46
47
_databaseActivity = MartenTracing . StartConnectionActivity ( currentActivity , tags ) ;
48
+
49
+ _tenantId = tenantId ;
47
50
}
48
51
52
+ public EventTracingConnectionLifetime ( OpenTelemetryOptions telemetryOptions , Activity ? databaseActivity , IConnectionLifetime innerConnectionLifetime , IMartenSessionLogger logger )
53
+ {
54
+ _telemetryOptions = telemetryOptions ;
55
+ _databaseActivity = databaseActivity ;
56
+ InnerConnectionLifetime = innerConnectionLifetime ;
57
+ Logger = logger ;
58
+ }
59
+
60
+ public IConnectionLifetime InnerConnectionLifetime { get ; }
61
+
49
62
public ValueTask DisposeAsync ( )
50
63
{
51
64
_databaseActivity ? . Stop ( ) ;
52
- return _innerConnectionLifetime . DisposeAsync ( ) ;
65
+ return InnerConnectionLifetime . DisposeAsync ( ) ;
53
66
}
54
67
55
68
public void Dispose ( )
56
69
{
57
70
_databaseActivity ? . Stop ( ) ;
58
- _innerConnectionLifetime . Dispose ( ) ;
71
+ InnerConnectionLifetime . Dispose ( ) ;
59
72
}
60
73
61
74
public IMartenSessionLogger Logger { get ; set ; }
@@ -66,7 +79,7 @@ public int Execute(NpgsqlCommand cmd)
66
79
67
80
try
68
81
{
69
- return _innerConnectionLifetime . Execute ( cmd ) ;
82
+ return InnerConnectionLifetime . Execute ( cmd ) ;
70
83
}
71
84
catch ( Exception e )
72
85
{
@@ -82,7 +95,7 @@ public int Execute(NpgsqlCommand cmd)
82
95
83
96
try
84
97
{
85
- return await _innerConnectionLifetime . ExecuteAsync ( command , token ) . ConfigureAwait ( false ) ;
98
+ return await InnerConnectionLifetime . ExecuteAsync ( command , token ) . ConfigureAwait ( false ) ;
86
99
}
87
100
catch ( Exception e )
88
101
{
@@ -98,7 +111,7 @@ public DbDataReader ExecuteReader(NpgsqlCommand command)
98
111
99
112
try
100
113
{
101
- return _innerConnectionLifetime . ExecuteReader ( command ) ;
114
+ return InnerConnectionLifetime . ExecuteReader ( command ) ;
102
115
}
103
116
catch ( Exception e )
104
117
{
@@ -114,7 +127,7 @@ public async Task<DbDataReader> ExecuteReaderAsync(NpgsqlCommand command, Cancel
114
127
115
128
try
116
129
{
117
- return await _innerConnectionLifetime . ExecuteReaderAsync ( command , token ) . ConfigureAwait ( false ) ;
130
+ return await InnerConnectionLifetime . ExecuteReaderAsync ( command , token ) . ConfigureAwait ( false ) ;
118
131
}
119
132
catch ( Exception e )
120
133
{
@@ -130,7 +143,7 @@ public DbDataReader ExecuteReader(NpgsqlBatch batch)
130
143
131
144
try
132
145
{
133
- return _innerConnectionLifetime . ExecuteReader ( batch ) ;
146
+ return InnerConnectionLifetime . ExecuteReader ( batch ) ;
134
147
}
135
148
catch ( Exception e )
136
149
{
@@ -146,7 +159,7 @@ public async Task<DbDataReader> ExecuteReaderAsync(NpgsqlBatch batch, Cancellati
146
159
147
160
try
148
161
{
149
- return await _innerConnectionLifetime . ExecuteReaderAsync ( batch , token ) . ConfigureAwait ( false ) ;
162
+ return await InnerConnectionLifetime . ExecuteReaderAsync ( batch , token ) . ConfigureAwait ( false ) ;
150
163
}
151
164
catch ( Exception e )
152
165
{
@@ -162,7 +175,7 @@ public void ExecuteBatchPages(IReadOnlyList<OperationPage> pages, List<Exception
162
175
163
176
try
164
177
{
165
- _innerConnectionLifetime . ExecuteBatchPages ( pages , exceptions ) ;
178
+ InnerConnectionLifetime . ExecuteBatchPages ( pages , exceptions ) ;
166
179
writeVerboseEvents ( pages ) ;
167
180
}
168
181
catch ( AggregateException e )
@@ -185,7 +198,7 @@ public async Task ExecuteBatchPagesAsync(IReadOnlyList<OperationPage> pages, Lis
185
198
186
199
try
187
200
{
188
- await _innerConnectionLifetime . ExecuteBatchPagesAsync ( pages , exceptions , token ) . ConfigureAwait ( false ) ;
201
+ await InnerConnectionLifetime . ExecuteBatchPagesAsync ( pages , exceptions , token ) . ConfigureAwait ( false ) ;
189
202
190
203
writeVerboseEvents ( pages ) ;
191
204
}
@@ -223,4 +236,20 @@ private void writeVerboseEvents(IReadOnlyList<OperationPage> pages)
223
236
}
224
237
}
225
238
}
239
+
240
+ public IAlwaysConnectedLifetime Start ( )
241
+ {
242
+ if ( InnerConnectionLifetime is ITransactionStarter starter ) return starter . Start ( ) ;
243
+
244
+ throw new InvalidOperationException (
245
+ $ "The inner connection lifetime { InnerConnectionLifetime } does not implement { nameof ( ITransactionStarter ) } ") ;
246
+ }
247
+
248
+ public Task < IAlwaysConnectedLifetime > StartAsync ( CancellationToken token )
249
+ {
250
+ if ( InnerConnectionLifetime is ITransactionStarter starter ) return starter . StartAsync ( token ) ;
251
+
252
+ throw new InvalidOperationException (
253
+ $ "The inner connection lifetime { InnerConnectionLifetime } does not implement { nameof ( ITransactionStarter ) } ") ;
254
+ }
226
255
}
0 commit comments