Skip to content

Commit c155e29

Browse files
experiment: make all TCSs continue asynchronously.
1 parent f284010 commit c155e29

File tree

131 files changed

+703
-240
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

131 files changed

+703
-240
lines changed

src/Connectors/KurrentDB.Connectors.Tests/ClusterVNodeApp.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
using System.Net;
88
using DotNext.Collections.Generic;
99
using Kurrent.Surge;
10+
using KurrentDB.Common.Utils;
1011
using KurrentDB.Core;
1112
using KurrentDB.Core.Bus;
1213
using KurrentDB.Core.Certificates;
@@ -92,7 +93,7 @@ static ClusterVNodeOptions GetClusterVNodeOptions(Dictionary<string, string?> se
9293
class NodeReadinessProbe : IHandle<SystemMessage.SystemReady> {
9394
static readonly Serilog.ILogger Log = Serilog.Log.Logger.ForContext<NodeReadinessProbe>();
9495

95-
TaskCompletionSource Ready { get; } = new();
96+
TaskCompletionSource Ready { get; } = TaskCompletionSourceFactory.CreateDefault();
9697

9798
void IHandle<SystemMessage.SystemReady>.Handle(SystemMessage.SystemReady message) {
9899
if (!Ready.Task.IsCompleted)

src/Connectors/KurrentDB.Connectors.Tests/Infrastructure/Http/TestHttpMessageHandler.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements.
22
// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md).
33

4+
using KurrentDB.Common.Utils;
45
using static System.Threading.Tasks.TaskCreationOptions;
56

67
namespace KurrentDB.Connectors.Tests.Infrastructure.Http;
@@ -10,7 +11,7 @@ protected override Task<HttpResponseMessage> SendAsync(HttpRequestMessage reques
1011
send(request, cancellationToken);
1112

1213
public static TestHttpMessageHandler Create(Func<HttpRequestMessage, Task<HttpResponseMessage>> send) {
13-
var tcs = new TaskCompletionSource<HttpResponseMessage>(RunContinuationsAsynchronously);
14+
var tcs = TaskCompletionSourceFactory.CreateDefault<HttpResponseMessage>(RunContinuationsAsynchronously);
1415
return new TestHttpMessageHandler(async (req, ct) => {
1516
await using var registration = ct.Register(() => tcs.TrySetCanceled());
1617
var result = await Task.WhenAny(send(req), tcs.Task);

src/Connectors/KurrentDB.Connectors.Tests/MessageBus.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements.
22
// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md).
33

4+
using KurrentDB.Common.Utils;
45
using KurrentDB.Core.Bus;
56
using KurrentDB.Core.Messaging;
67

@@ -19,7 +20,7 @@ public void Subscribe<T>(HandleMessage<T> handler) where T : Message =>
1920
});
2021

2122
public Task SubscribeAndWait<T>(HandleMessageAsync<T> handler, CancellationToken timeoutToken = default) where T : Message {
22-
var completion = new TaskCompletionSource();
23+
var completion = TaskCompletionSourceFactory.CreateDefault();
2324

2425
timeoutToken.Register(() => completion.SetCanceled(timeoutToken));
2526

src/Connectors/KurrentDB.Connectors.Tests/System/LeaderNodeBackgroundServiceTests.cs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
using System.Net;
88
using Kurrent.Surge;
9+
using KurrentDB.Common.Utils;
910
using KurrentDB.Connectors.Infrastructure.System.Node;
1011
using KurrentDB.Connectors.Infrastructure.System.Node.NodeSystemInfo;
1112
using KurrentDB.Core.Cluster;
@@ -70,7 +71,7 @@ public Task waits_for_leadership_again_when_leadership_revoked() => Fixture.Test
7071

7172
MessageBus.Publish(new SystemMessage.BecomeFollower(Guid.NewGuid(), FakeMemberInfo));
7273

73-
TaskCompletionSource<SystemMessage.ComponentTerminated> componentTerminated = new();
74+
TaskCompletionSource<SystemMessage.ComponentTerminated> componentTerminated = TaskCompletionSourceFactory.CreateDefault<SystemMessage.ComponentTerminated>();
7475
MessageBus.Subscribe<SystemMessage.ComponentTerminated>((message, _) => {
7576
if (message.ComponentName == serviceName)
7677
componentTerminated.SetResult(message);
@@ -123,7 +124,7 @@ public Task waits_for_leadership_again_when_leadership_revoked_multiple_times()
123124
await sut.WaitUntilExecuted();
124125
await Task.Delay(1000, cancellator.Token);
125126

126-
TaskCompletionSource<SystemMessage.ComponentTerminated> componentTerminated = new();
127+
TaskCompletionSource<SystemMessage.ComponentTerminated> componentTerminated = TaskCompletionSourceFactory.CreateDefault<SystemMessage.ComponentTerminated>();
127128
MessageBus.Subscribe<SystemMessage.ComponentTerminated>((message, _) => {
128129
if (message.ComponentName == serviceName)
129130
componentTerminated.SetResult(message);
@@ -178,7 +179,7 @@ public Task stops_gracefully_when_leadership_assigned_and_service_is_stopped() =
178179
// Act
179180
MessageBus.Publish(new SystemMessage.BecomeLeader(Guid.NewGuid()));
180181

181-
TaskCompletionSource<SystemMessage.ComponentTerminated> componentTerminated = new();
182+
TaskCompletionSource<SystemMessage.ComponentTerminated> componentTerminated = TaskCompletionSourceFactory.CreateDefault<SystemMessage.ComponentTerminated>();
182183
MessageBus.Subscribe<SystemMessage.ComponentTerminated>((message, _) => {
183184
if (message.ComponentName == serviceName)
184185
componentTerminated.SetResult(message);
@@ -234,7 +235,7 @@ public Task stops_gracefully_when_leadership_assigned_and_stopping_token_is_canc
234235

235236
await sut.WaitUntilExecuting();
236237

237-
TaskCompletionSource<SystemMessage.ComponentTerminated> componentTerminated = new();
238+
TaskCompletionSource<SystemMessage.ComponentTerminated> componentTerminated = TaskCompletionSourceFactory.CreateDefault<SystemMessage.ComponentTerminated>();
238239
MessageBus.Subscribe<SystemMessage.ComponentTerminated>((message, _) => {
239240
if (message.ComponentName == serviceName)
240241
componentTerminated.SetResult(message);
@@ -272,7 +273,7 @@ public Task stops_gracefully_when_leadership_revoked_and_waiting_for_leadership(
272273

273274
await sut.WaitUntilExecuted();
274275

275-
TaskCompletionSource<SystemMessage.ComponentTerminated> componentTerminated = new();
276+
TaskCompletionSource<SystemMessage.ComponentTerminated> componentTerminated = TaskCompletionSourceFactory.CreateDefault<SystemMessage.ComponentTerminated>();
276277
MessageBus.Subscribe<SystemMessage.ComponentTerminated>((message, _) => {
277278
if (message.ComponentName == serviceName)
278279
componentTerminated.SetResult(message);
@@ -287,8 +288,8 @@ public Task stops_gracefully_when_leadership_revoked_and_waiting_for_leadership(
287288

288289
class TestLeadershipAwareService(string serviceName, MessageBus bus, NodeSystemInfo nodeSystemInfo, ILoggerFactory loggerFactory)
289290
: LeaderNodeBackgroundService(bus, bus, _ => new ValueTask<NodeSystemInfo>(nodeSystemInfo), loggerFactory, serviceName) {
290-
volatile TaskCompletionSource<(NodeSystemInfo NodeInfo, CancellationToken StoppingToken)> _executingCompletionSource = new();
291-
volatile TaskCompletionSource<(NodeSystemInfo NodeInfo, CancellationToken StoppingToken)> _executedCompletionSource = new();
291+
volatile TaskCompletionSource<(NodeSystemInfo NodeInfo, CancellationToken StoppingToken)> _executingCompletionSource = TaskCompletionSourceFactory.CreateDefault<(NodeSystemInfo NodeInfo, CancellationToken StoppingToken)>();
292+
volatile TaskCompletionSource<(NodeSystemInfo NodeInfo, CancellationToken StoppingToken)> _executedCompletionSource = TaskCompletionSourceFactory.CreateDefault<(NodeSystemInfo NodeInfo, CancellationToken StoppingToken)>();
292293

293294
public TimeSpan ExecuteDelay { get; set; } = TimeSpan.FromMinutes(10);
294295

@@ -300,13 +301,13 @@ protected override async Task Execute(NodeSystemInfo nodeInfo, CancellationToken
300301

301302
public async Task<(NodeSystemInfo NodeInfo, CancellationToken StoppingToken)> WaitUntilExecuting() {
302303
var result = await _executingCompletionSource.Task;
303-
_executingCompletionSource = new();
304+
_executingCompletionSource = TaskCompletionSourceFactory.CreateDefault<(NodeSystemInfo NodeInfo, CancellationToken StoppingToken)>();
304305
return result;
305306
}
306307

307308
public async Task<(NodeSystemInfo NodeInfo, CancellationToken StoppingToken)> WaitUntilExecuted() {
308309
var result = await _executedCompletionSource.Task;
309-
_executedCompletionSource = new();
310+
_executedCompletionSource = TaskCompletionSourceFactory.CreateDefault<(NodeSystemInfo NodeInfo, CancellationToken StoppingToken)>();
310311
return result;
311312
}
312313
}

src/Connectors/KurrentDB.Connectors/Infrastructure/System/Node/NodeBackgroundService.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements.
22
// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md).
33

4+
using KurrentDB.Common.Utils;
45
using KurrentDB.Core.Bus;
56
using KurrentDB.Core.Messages;
67
using Microsoft.Extensions.Hosting;
@@ -85,7 +86,7 @@ public virtual async Task StopAsync(CancellationToken cancellationToken) {
8586
}
8687
finally {
8788
// Wait until the task completes or the stop token triggers
88-
var completion = new TaskCompletionSource();
89+
var completion = TaskCompletionSourceFactory.CreateDefault();
8990

9091
await using var registration = cancellationToken
9192
.Register(tcs => ((TaskCompletionSource)tcs!).SetCanceled(CancellationToken.None), completion);

src/Connectors/KurrentDB.Connectors/Infrastructure/System/PublisherManagementExtensions.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
// ReSharper disable CheckNamespace
55

6+
using KurrentDB.Common.Utils;
67
using KurrentDB.Core.Bus;
78
using KurrentDB.Core.Data;
89
using KurrentDB.Core.Messages;
@@ -19,7 +20,7 @@ public static class PublisherManagementExtensions {
1920
public static Task<(Position Position, StreamRevision Revision)> DeleteStream(this IPublisher publisher, string stream, long expectedRevision = -2, bool hardDelete = false, CancellationToken cancellationToken = default) {
2021
cancellationToken.ThrowIfCancellationRequested();
2122

22-
var operation = new TaskCompletionSource<(Position Position, StreamRevision StreamRevision)>(TaskCreationOptions.RunContinuationsAsynchronously);
23+
var operation = TaskCompletionSourceFactory.CreateDefault<(Position Position, StreamRevision StreamRevision)>(TaskCreationOptions.RunContinuationsAsynchronously);
2324

2425
var cid = Guid.NewGuid();
2526

src/Connectors/KurrentDB.Connectors/Infrastructure/System/PublisherWriteExtensions.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
// ReSharper disable CheckNamespace
55

6+
using KurrentDB.Common.Utils;
67
using KurrentDB.Core.Bus;
78
using KurrentDB.Core.Data;
89
using KurrentDB.Core.Messages;
@@ -87,7 +88,7 @@ public static async Task<WriteEventsResult> WriteEvents(
8788
this IPublisher publisher, string stream, Event[] events, long expectedRevision = ExpectedVersion.Any,
8889
CancellationToken cancellationToken = default
8990
) {
90-
var operation = new TaskCompletionSource<WriteEventsResult>(TaskCreationOptions.RunContinuationsAsynchronously);
91+
var operation = TaskCompletionSourceFactory.CreateDefault<WriteEventsResult>(TaskCreationOptions.RunContinuationsAsynchronously);
9192

9293
await publisher.WriteEvents(
9394
stream,

src/Connectors/KurrentDB.Connectors/Infrastructure/System/SystemReadinessProbe.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md).
33

44
using Kurrent.Surge;
5+
using KurrentDB.Common.Utils;
56
using KurrentDB.Connectors.Infrastructure.System.Node.NodeSystemInfo;
67
using KurrentDB.Core.Bus;
78
using KurrentDB.Core.Messages;
@@ -15,7 +16,7 @@ public interface ISystemReadinessProbe {
1516
[UsedImplicitly]
1617
public class SystemReadinessProbe : IHandle<SystemMessage.BecomeLeader>, IHandle<SystemMessage.BecomeFollower>, IHandle<SystemMessage.BecomeReadOnlyReplica> {
1718
public SystemReadinessProbe(ISubscriber subscriber, GetNodeSystemInfo getNodeSystemInfo) {
18-
CompletionSource = new();
19+
CompletionSource = TaskCompletionSourceFactory.CreateDefault();
1920

2021
Subscriber = subscriber.With(x => {
2122
x.Subscribe<SystemMessage.BecomeLeader>(this);

src/Connectors/KurrentDB.Connectors/Planes/Management/Data/ConnectorsStateProjection.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
using Kurrent.Surge.Connectors.Sinks;
1010
using KurrentDB.Core.Services.Transport.Enumerators;
1111
using static System.StringComparison;
12+
using KurrentDB.Common.Utils;
1213

1314
namespace KurrentDB.Connectors.Planes.Management.Data;
1415

@@ -101,7 +102,7 @@ public ConnectorsStateProjection(ISnapshotProjectionsStore store, string snapsho
101102
});
102103
}
103104

104-
TaskCompletionSource<(LogPosition Position, DateTimeOffset Timestamp)> HasCaughtUpTaskCompletionSource { get; } = new();
105+
TaskCompletionSource<(LogPosition Position, DateTimeOffset Timestamp)> HasCaughtUpTaskCompletionSource { get; } = TaskCompletionSourceFactory.CreateDefault<(LogPosition Position, DateTimeOffset Timestamp)>();
105106

106107
public Task<(LogPosition Position, DateTimeOffset Timestamp)> WaitUntilCaughtUp => HasCaughtUpTaskCompletionSource.Task;
107108
}

src/Connectors/KurrentDB.Connectors/Planes/Management/SystemStartupManager.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements.
22
// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md).
33

4+
using KurrentDB.Common.Utils;
45
using KurrentDB.Connectors.Infrastructure.System.Node;
56
using Microsoft.Extensions.DependencyInjection;
67
using Microsoft.Extensions.Hosting;
@@ -9,7 +10,7 @@
910
namespace KurrentDB.Connectors.Planes.Management;
1011

1112
internal class SystemStartupManager(IServiceProvider serviceProvider) : BackgroundService, IStartupWorkCompletionMonitor {
12-
private readonly TaskCompletionSource _completed = new();
13+
private readonly TaskCompletionSource _completed = TaskCompletionSourceFactory.CreateDefault();
1314

1415
protected override async Task ExecuteAsync(CancellationToken stoppingToken) {
1516
var workers = serviceProvider.GetServices<SystemStartupTaskWorker>().ToList();

0 commit comments

Comments
 (0)