Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Net;
using DotNext.Collections.Generic;
using Kurrent.Surge;
using KurrentDB.Common.Utils;
using KurrentDB.Core;
using KurrentDB.Core.Bus;
using KurrentDB.Core.Certificates;
Expand Down Expand Up @@ -92,7 +93,7 @@ static ClusterVNodeOptions GetClusterVNodeOptions(Dictionary<string, string?> se
class NodeReadinessProbe : IHandle<SystemMessage.SystemReady> {
static readonly Serilog.ILogger Log = Serilog.Log.Logger.ForContext<NodeReadinessProbe>();

TaskCompletionSource Ready { get; } = new();
TaskCompletionSource Ready { get; } = TaskCompletionSourceFactory.CreateDefault();

void IHandle<SystemMessage.SystemReady>.Handle(SystemMessage.SystemReady message) {
if (!Ready.Task.IsCompleted)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements.
// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md).

using KurrentDB.Common.Utils;
using static System.Threading.Tasks.TaskCreationOptions;

namespace KurrentDB.Connectors.Tests.Infrastructure.Http;
Expand All @@ -10,7 +11,7 @@ protected override Task<HttpResponseMessage> SendAsync(HttpRequestMessage reques
send(request, cancellationToken);

public static TestHttpMessageHandler Create(Func<HttpRequestMessage, Task<HttpResponseMessage>> send) {
var tcs = new TaskCompletionSource<HttpResponseMessage>(RunContinuationsAsynchronously);
var tcs = TaskCompletionSourceFactory.CreateDefault<HttpResponseMessage>(RunContinuationsAsynchronously);
return new TestHttpMessageHandler(async (req, ct) => {
await using var registration = ct.Register(() => tcs.TrySetCanceled());
var result = await Task.WhenAny(send(req), tcs.Task);
Expand Down
3 changes: 2 additions & 1 deletion src/Connectors/KurrentDB.Connectors.Tests/MessageBus.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements.
// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md).

using KurrentDB.Common.Utils;
using KurrentDB.Core.Bus;
using KurrentDB.Core.Messaging;

Expand All @@ -19,7 +20,7 @@ public void Subscribe<T>(HandleMessage<T> handler) where T : Message =>
});

public Task SubscribeAndWait<T>(HandleMessageAsync<T> handler, CancellationToken timeoutToken = default) where T : Message {
var completion = new TaskCompletionSource();
var completion = TaskCompletionSourceFactory.CreateDefault();

timeoutToken.Register(() => completion.SetCanceled(timeoutToken));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

using System.Net;
using Kurrent.Surge;
using KurrentDB.Common.Utils;
using KurrentDB.Connectors.Infrastructure.System.Node;
using KurrentDB.Connectors.Infrastructure.System.Node.NodeSystemInfo;
using KurrentDB.Core.Cluster;
Expand Down Expand Up @@ -70,7 +71,7 @@ public Task waits_for_leadership_again_when_leadership_revoked() => Fixture.Test

MessageBus.Publish(new SystemMessage.BecomeFollower(Guid.NewGuid(), FakeMemberInfo));

TaskCompletionSource<SystemMessage.ComponentTerminated> componentTerminated = new();
TaskCompletionSource<SystemMessage.ComponentTerminated> componentTerminated = TaskCompletionSourceFactory.CreateDefault<SystemMessage.ComponentTerminated>();
MessageBus.Subscribe<SystemMessage.ComponentTerminated>((message, _) => {
if (message.ComponentName == serviceName)
componentTerminated.SetResult(message);
Expand Down Expand Up @@ -123,7 +124,7 @@ public Task waits_for_leadership_again_when_leadership_revoked_multiple_times()
await sut.WaitUntilExecuted();
await Task.Delay(1000, cancellator.Token);

TaskCompletionSource<SystemMessage.ComponentTerminated> componentTerminated = new();
TaskCompletionSource<SystemMessage.ComponentTerminated> componentTerminated = TaskCompletionSourceFactory.CreateDefault<SystemMessage.ComponentTerminated>();
MessageBus.Subscribe<SystemMessage.ComponentTerminated>((message, _) => {
if (message.ComponentName == serviceName)
componentTerminated.SetResult(message);
Expand Down Expand Up @@ -178,7 +179,7 @@ public Task stops_gracefully_when_leadership_assigned_and_service_is_stopped() =
// Act
MessageBus.Publish(new SystemMessage.BecomeLeader(Guid.NewGuid()));

TaskCompletionSource<SystemMessage.ComponentTerminated> componentTerminated = new();
TaskCompletionSource<SystemMessage.ComponentTerminated> componentTerminated = TaskCompletionSourceFactory.CreateDefault<SystemMessage.ComponentTerminated>();
MessageBus.Subscribe<SystemMessage.ComponentTerminated>((message, _) => {
if (message.ComponentName == serviceName)
componentTerminated.SetResult(message);
Expand Down Expand Up @@ -234,7 +235,7 @@ public Task stops_gracefully_when_leadership_assigned_and_stopping_token_is_canc

await sut.WaitUntilExecuting();

TaskCompletionSource<SystemMessage.ComponentTerminated> componentTerminated = new();
TaskCompletionSource<SystemMessage.ComponentTerminated> componentTerminated = TaskCompletionSourceFactory.CreateDefault<SystemMessage.ComponentTerminated>();
MessageBus.Subscribe<SystemMessage.ComponentTerminated>((message, _) => {
if (message.ComponentName == serviceName)
componentTerminated.SetResult(message);
Expand Down Expand Up @@ -272,7 +273,7 @@ public Task stops_gracefully_when_leadership_revoked_and_waiting_for_leadership(

await sut.WaitUntilExecuted();

TaskCompletionSource<SystemMessage.ComponentTerminated> componentTerminated = new();
TaskCompletionSource<SystemMessage.ComponentTerminated> componentTerminated = TaskCompletionSourceFactory.CreateDefault<SystemMessage.ComponentTerminated>();
MessageBus.Subscribe<SystemMessage.ComponentTerminated>((message, _) => {
if (message.ComponentName == serviceName)
componentTerminated.SetResult(message);
Expand All @@ -287,8 +288,8 @@ public Task stops_gracefully_when_leadership_revoked_and_waiting_for_leadership(

class TestLeadershipAwareService(string serviceName, MessageBus bus, NodeSystemInfo nodeSystemInfo, ILoggerFactory loggerFactory)
: LeaderNodeBackgroundService(bus, bus, _ => new ValueTask<NodeSystemInfo>(nodeSystemInfo), loggerFactory, serviceName) {
volatile TaskCompletionSource<(NodeSystemInfo NodeInfo, CancellationToken StoppingToken)> _executingCompletionSource = new();
volatile TaskCompletionSource<(NodeSystemInfo NodeInfo, CancellationToken StoppingToken)> _executedCompletionSource = new();
volatile TaskCompletionSource<(NodeSystemInfo NodeInfo, CancellationToken StoppingToken)> _executingCompletionSource = TaskCompletionSourceFactory.CreateDefault<(NodeSystemInfo NodeInfo, CancellationToken StoppingToken)>();
volatile TaskCompletionSource<(NodeSystemInfo NodeInfo, CancellationToken StoppingToken)> _executedCompletionSource = TaskCompletionSourceFactory.CreateDefault<(NodeSystemInfo NodeInfo, CancellationToken StoppingToken)>();

public TimeSpan ExecuteDelay { get; set; } = TimeSpan.FromMinutes(10);

Expand All @@ -300,13 +301,13 @@ protected override async Task Execute(NodeSystemInfo nodeInfo, CancellationToken

public async Task<(NodeSystemInfo NodeInfo, CancellationToken StoppingToken)> WaitUntilExecuting() {
var result = await _executingCompletionSource.Task;
_executingCompletionSource = new();
_executingCompletionSource = TaskCompletionSourceFactory.CreateDefault<(NodeSystemInfo NodeInfo, CancellationToken StoppingToken)>();
return result;
}

public async Task<(NodeSystemInfo NodeInfo, CancellationToken StoppingToken)> WaitUntilExecuted() {
var result = await _executedCompletionSource.Task;
_executedCompletionSource = new();
_executedCompletionSource = TaskCompletionSourceFactory.CreateDefault<(NodeSystemInfo NodeInfo, CancellationToken StoppingToken)>();
return result;
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements.
// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md).

using KurrentDB.Common.Utils;
using KurrentDB.Core.Bus;
using KurrentDB.Core.Messages;
using Microsoft.Extensions.Hosting;
Expand Down Expand Up @@ -85,7 +86,7 @@ public virtual async Task StopAsync(CancellationToken cancellationToken) {
}
finally {
// Wait until the task completes or the stop token triggers
var completion = new TaskCompletionSource();
var completion = TaskCompletionSourceFactory.CreateDefault();

await using var registration = cancellationToken
.Register(tcs => ((TaskCompletionSource)tcs!).SetCanceled(CancellationToken.None), completion);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

// ReSharper disable CheckNamespace

using KurrentDB.Common.Utils;
using KurrentDB.Core.Bus;
using KurrentDB.Core.Data;
using KurrentDB.Core.Messages;
Expand All @@ -19,7 +20,7 @@ public static class PublisherManagementExtensions {
public static Task<(Position Position, StreamRevision Revision)> DeleteStream(this IPublisher publisher, string stream, long expectedRevision = -2, bool hardDelete = false, CancellationToken cancellationToken = default) {
cancellationToken.ThrowIfCancellationRequested();

var operation = new TaskCompletionSource<(Position Position, StreamRevision StreamRevision)>(TaskCreationOptions.RunContinuationsAsynchronously);
var operation = TaskCompletionSourceFactory.CreateDefault<(Position Position, StreamRevision StreamRevision)>(TaskCreationOptions.RunContinuationsAsynchronously);

var cid = Guid.NewGuid();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

// ReSharper disable CheckNamespace

using KurrentDB.Common.Utils;
using KurrentDB.Core.Bus;
using KurrentDB.Core.Data;
using KurrentDB.Core.Messages;
Expand Down Expand Up @@ -87,7 +88,7 @@ public static async Task<WriteEventsResult> WriteEvents(
this IPublisher publisher, string stream, Event[] events, long expectedRevision = ExpectedVersion.Any,
CancellationToken cancellationToken = default
) {
var operation = new TaskCompletionSource<WriteEventsResult>(TaskCreationOptions.RunContinuationsAsynchronously);
var operation = TaskCompletionSourceFactory.CreateDefault<WriteEventsResult>(TaskCreationOptions.RunContinuationsAsynchronously);

await publisher.WriteEvents(
stream,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md).

using Kurrent.Surge;
using KurrentDB.Common.Utils;
using KurrentDB.Connectors.Infrastructure.System.Node.NodeSystemInfo;
using KurrentDB.Core.Bus;
using KurrentDB.Core.Messages;
Expand All @@ -15,7 +16,7 @@ public interface ISystemReadinessProbe {
[UsedImplicitly]
public class SystemReadinessProbe : IHandle<SystemMessage.BecomeLeader>, IHandle<SystemMessage.BecomeFollower>, IHandle<SystemMessage.BecomeReadOnlyReplica> {
public SystemReadinessProbe(ISubscriber subscriber, GetNodeSystemInfo getNodeSystemInfo) {
CompletionSource = new();
CompletionSource = TaskCompletionSourceFactory.CreateDefault();

Subscriber = subscriber.With(x => {
x.Subscribe<SystemMessage.BecomeLeader>(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using Kurrent.Surge.Connectors.Sinks;
using KurrentDB.Core.Services.Transport.Enumerators;
using static System.StringComparison;
using KurrentDB.Common.Utils;

namespace KurrentDB.Connectors.Planes.Management.Data;

Expand Down Expand Up @@ -101,7 +102,7 @@ public ConnectorsStateProjection(ISnapshotProjectionsStore store, string snapsho
});
}

TaskCompletionSource<(LogPosition Position, DateTimeOffset Timestamp)> HasCaughtUpTaskCompletionSource { get; } = new();
TaskCompletionSource<(LogPosition Position, DateTimeOffset Timestamp)> HasCaughtUpTaskCompletionSource { get; } = TaskCompletionSourceFactory.CreateDefault<(LogPosition Position, DateTimeOffset Timestamp)>();

public Task<(LogPosition Position, DateTimeOffset Timestamp)> WaitUntilCaughtUp => HasCaughtUpTaskCompletionSource.Task;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements.
// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md).

using KurrentDB.Common.Utils;
using KurrentDB.Connectors.Infrastructure.System.Node;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
Expand All @@ -9,7 +10,7 @@
namespace KurrentDB.Connectors.Planes.Management;

internal class SystemStartupManager(IServiceProvider serviceProvider) : BackgroundService, IStartupWorkCompletionMonitor {
private readonly TaskCompletionSource _completed = new();
private readonly TaskCompletionSource _completed = TaskCompletionSourceFactory.CreateDefault();

protected override async Task ExecuteAsync(CancellationToken stoppingToken) {
var workers = serviceProvider.GetServices<SystemStartupTaskWorker>().ToList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

<ItemGroup>
<ProjectReference Include="..\KurrentDB.Auth.Ldaps\KurrentDB.Auth.Ldaps.csproj" />
<ProjectReference Include="..\KurrentDB.Common\KurrentDB.Common.csproj" />
</ItemGroup>

<ItemGroup>
Expand Down
13 changes: 7 additions & 6 deletions src/KurrentDB.Auth.Ldaps.Tests/LdapsPluginTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.IO;
using System.Threading.Tasks;
using EventStore.Plugins.Licensing;
using KurrentDB.Common.Utils;
using KurrentDB.Plugins.TestHelpers;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.Configuration;
Expand Down Expand Up @@ -38,7 +39,7 @@ private async Task<LdapsFixture> CreateAndStartFixture() {
// There is no health check in this container
// Simply try a few requests until one succeeds
for (var i = 0; i < 5; i++) {
var completionSource = new TaskCompletionSource<TestAuthenticationResponse>();
var completionSource = TaskCompletionSourceFactory.CreateDefault<TestAuthenticationResponse>();
var request = CreateAuthenticationRequest("professor", "professor", completionSource);
var task = completionSource.Task;
sut.Authenticate(request);
Expand All @@ -61,7 +62,7 @@ public async Task authenticate_admin_user_returns_admin_role() {

using var fixture = await CreateAndStartFixture();

var completionSource = new TaskCompletionSource<TestAuthenticationResponse>();
var completionSource = TaskCompletionSourceFactory.CreateDefault<TestAuthenticationResponse>();
var request = CreateAuthenticationRequest("professor", "professor", completionSource);
sut.Authenticate(request);

Expand All @@ -84,7 +85,7 @@ public async Task authenticate_with_incorrect_password_returns_unauthorized() {

using var fixture = await CreateAndStartFixture();

var completionSource = new TaskCompletionSource<TestAuthenticationResponse>();
var completionSource = TaskCompletionSourceFactory.CreateDefault<TestAuthenticationResponse>();
var request = CreateAuthenticationRequest("professor", "wrong", completionSource);
sut.Authenticate(request);

Expand All @@ -104,7 +105,7 @@ public async Task authenticate_with_non_existent_user_returns_unauthorized() {

using var fixture = await CreateAndStartFixture();

var completionSource = new TaskCompletionSource<TestAuthenticationResponse>();
var completionSource = TaskCompletionSourceFactory.CreateDefault<TestAuthenticationResponse>();
var request = CreateAuthenticationRequest("wrong", "wrong", completionSource);
sut.Authenticate(request);

Expand All @@ -124,7 +125,7 @@ public async Task authenticate_user_with_custom_role_returns_expected_roles() {

using var fixture = await CreateAndStartFixture();

var completionSource = new TaskCompletionSource<TestAuthenticationResponse>();
var completionSource = TaskCompletionSourceFactory.CreateDefault<TestAuthenticationResponse>();
var request = CreateAuthenticationRequest("fry", "fry", completionSource);
sut.Authenticate(request);

Expand All @@ -147,7 +148,7 @@ public async Task authenticate_user_with_no_roles_returns_no_roles() {

using var fixture = await CreateAndStartFixture();

var completionSource = new TaskCompletionSource<TestAuthenticationResponse>();
var completionSource = TaskCompletionSourceFactory.CreateDefault<TestAuthenticationResponse>();
var request = CreateAuthenticationRequest("amy", "amy", completionSource);
sut.Authenticate(request);

Expand Down
3 changes: 2 additions & 1 deletion src/KurrentDB.Auth.OAuth.Tests/IdpFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using Ductus.FluentDocker.Model.Builders;
using Ductus.FluentDocker.Services;
using IdentityModel.Client;
using KurrentDB.Common.Utils;
using Polly;
using Xunit.Abstractions;

Expand Down Expand Up @@ -51,7 +52,7 @@ public IdpFixture(ITestOutputHelper output) {
.Build();

_identityServer.StopOnDispose = true;
_discoveryDocumentSource = new TaskCompletionSource<DiscoveryDocumentResponse>();
_discoveryDocumentSource = TaskCompletionSourceFactory.CreateDefault<DiscoveryDocumentResponse>();

HttpClient = new HttpClient(new SocketsHttpHandler {
SslOptions = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\KurrentDB.Auth.OAuth\KurrentDB.Auth.OAuth.csproj" />
<ProjectReference Include="..\KurrentDB.Common\KurrentDB.Common.csproj" />
</ItemGroup>
<ItemGroup>
<Content Include="conf\idsrv4.conf.json">
Expand Down
Loading
Loading