Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using System.Collections.Immutable;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Cluster.Tools.Singleton;
using Akka.Configuration;
Expand Down Expand Up @@ -96,45 +97,46 @@ public void Join(ActorSystem from, ActorSystem to, IActorRef probe)
}

[Fact]
public void ClusterSingleton_that_is_leaving_must()
public async Task ClusterSingleton_that_is_leaving_must()
{
ClusterSingleton_that_is_leaving_must_join_cluster();
ClusterSingleton_that_is_leaving_must_quickly_hand_over_to_next_oldest();
await ClusterSingleton_that_is_leaving_must_join_cluster();
await ClusterSingleton_that_is_leaving_must_quickly_hand_over_to_next_oldest();
}

private void ClusterSingleton_that_is_leaving_must_join_cluster()
private async Task ClusterSingleton_that_is_leaving_must_join_cluster()
{
for (int i = 0; i < _systems.Length; i++)
for (var i = 0; i < _systems.Length; i++)
Join(_systems[i], _systems[0], _probes[i]);

// leader is most likely on system, lowest port
Join(Sys, _systems[0], TestActor);

_probes[0].ExpectMsg("started");
await _probes[0].ExpectMsgAsync("started");
}

private void ClusterSingleton_that_is_leaving_must_quickly_hand_over_to_next_oldest()
private async Task ClusterSingleton_that_is_leaving_must_quickly_hand_over_to_next_oldest()
{
var durations = new List<(TimeSpan stoppedDuration, TimeSpan startDuration)>();
var sw = new Stopwatch();
sw.Start();
for (var i = 0; i < _systems.Length; i++)
{
var leaveAddress = Cluster.Get(_systems[i]).SelfAddress;
CoordinatedShutdown.Get(_systems[i]).Run(CoordinatedShutdown.ClusterLeavingReason.Instance);
_probes[i].ExpectMsg("stopped", TimeSpan.FromSeconds(10));
await CoordinatedShutdown.Get(_systems[i]).Run(CoordinatedShutdown.ClusterLeavingReason.Instance);

await _probes[i].ExpectMsgAsync("stopped", TimeSpan.FromSeconds(10));
var stoppedDuration = sw.Elapsed;

if (i != _systems.Length - 1)
_probes[i + 1].ExpectMsg("started", TimeSpan.FromSeconds(30));
await _probes[i + 1].ExpectMsgAsync("started", TimeSpan.FromSeconds(30));
else
ExpectMsg("started", TimeSpan.FromSeconds(30));
await ExpectMsgAsync("started", TimeSpan.FromSeconds(30));

var startedDuration = sw.Elapsed;

Within(TimeSpan.FromSeconds(15), () =>
await WithinAsync(TimeSpan.FromSeconds(15), async () =>
{
AwaitAssert(() =>
await AwaitAssertAsync(() =>
{
Cluster.Get(_systems[i]).IsTerminated.Should().BeTrue();
Cluster.Get(Sys).State.Members.Select(m => m.Address).Should().NotContain(leaveAddress);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,16 @@ await Assert.ThrowsAsync<TestJournalRejectionException>(async () =>
[Fact]
public async Task delay_must_call_next_interceptor_after_specified_delay()
{
var duration = TimeSpan.FromMilliseconds(100);
var duration = TimeSpan.FromMilliseconds(200);
var epsilon = TimeSpan.FromMilliseconds(50);
var probe = new InterceptorProbe();
var delay = new JournalInterceptors.Delay(duration, probe);

var startedAt = DateTime.Now;
await delay.InterceptAsync(null);

probe.WasCalled.Should().BeTrue();
probe.CalledAt.Should().BeOnOrAfter(startedAt + duration);
probe.CalledAt.Should().BeOnOrAfter(startedAt + duration - epsilon);
}

[Fact]
Expand Down
33 changes: 23 additions & 10 deletions src/core/Akka.Remote.Tests/Transport/AkkaProtocolSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ public void ProtocolStateActor_must_in_outbound_mode_delay_readiness_until_hands
}

[Fact]
public void ProtocolStateActor_must_handle_explicit_disassociate_messages()
public async Task ProtocolStateActor_must_handle_explicit_disassociate_messages()
{
var collaborators = GetCollaborators();
collaborators.Transport.AssociateBehavior.PushConstant(collaborators.Handle);
Expand All @@ -248,25 +248,38 @@ public void ProtocolStateActor_must_handle_explicit_disassociate_messages()
statusPromise, collaborators.Transport,
new AkkaProtocolSettings(config), codec, collaborators.FailureDetector));

AwaitCondition(() => LastActivityIsAssociate(collaborators.Registry, 42), DefaultTimeout);
await AwaitConditionAsync(() => LastActivityIsAssociate(collaborators.Registry, 42), DefaultTimeout);

reader.Tell(testAssociate(33), Self);

statusPromise.Task.Wait(TimeSpan.FromSeconds(3));
statusPromise.Task.Result.Match()
.With<AkkaProtocolHandle>(h =>
{
var cts = new CancellationTokenSource();
using (cts)
{
var timeoutTask = Task.Delay(TimeSpan.FromSeconds(3));
var completeTask = await Task.WhenAny(timeoutTask, statusPromise.Task);
cts.Cancel();
if (completeTask == timeoutTask)
throw new TimeoutException();
}

var result = statusPromise.Task.Result;
switch (result)
{
case AkkaProtocolHandle h:
Assert.Equal(_remoteAkkaAddress, h.RemoteAddress);
Assert.Equal(_localAkkaAddress, h.LocalAddress);
})
.Default(msg => Assert.True(false, "Did not receive expected AkkaProtocolHandle from handshake"));
var wrappedHandle = statusPromise.Task.Result.AsInstanceOf<AkkaProtocolHandle>();
break;
default:
Assert.True(false, "Did not receive expected AkkaProtocolHandle from handshake");
break;
}

var wrappedHandle = (AkkaProtocolHandle) result;
wrappedHandle.ReadHandlerSource.SetResult(new ActorHandleEventListener(TestActor));

reader.Tell(testDisassociate(DisassociateInfo.Unknown), Self);

ExpectMsgPf<Disassociated>("expected Disassociated(DisassociateInfo.Unknown", o =>
await ExpectMsgOfAsync("expected Disassociated(DisassociateInfo.Unknown", o =>
{
var disassociated = o.AsInstanceOf<Disassociated>();

Expand Down
137 changes: 121 additions & 16 deletions src/core/Akka.Tests.Shared.Internals/AkkaSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -115,34 +115,139 @@ private static string GetCallerName()

public static Config AkkaSpecConfig { get { return _akkaSpecConfig; } }

protected T ExpectMsgPf<T>(TimeSpan? timeout, string hint, Func<object, T> function)
=> ExpectMsgPf(timeout, hint, this, function);
protected T ExpectMsgOf<T>(
TimeSpan? timeout,
string hint,
Func<object, T> function,
CancellationToken cancellationToken = default)
=> ExpectMsgOfAsync(timeout, hint, this, function, cancellationToken)
.ConfigureAwait(false).GetAwaiter().GetResult();

protected async Task<T> ExpectMsgOfAsync<T>(
TimeSpan? timeout,
string hint,
Func<object, T> function,
CancellationToken cancellationToken = default)
=> await ExpectMsgOfAsync(timeout, hint, this, function, cancellationToken)
.ConfigureAwait(false);

protected async Task<T> ExpectMsgOfAsync<T>(
TimeSpan? timeout,
string hint,
Func<object, Task<T>> function,
CancellationToken cancellationToken = default)
=> await ExpectMsgOfAsync(timeout, hint, this, function, cancellationToken)
.ConfigureAwait(false);

protected T ExpectMsgPf<T>(TimeSpan? timeout, string hint, TestKitBase probe, Func<object, T> function)
protected T ExpectMsgOf<T>(
TimeSpan? timeout,
string hint,
TestKitBase probe,
Func<object, T> function,
CancellationToken cancellationToken = default)
=> ExpectMsgOfAsync(timeout, hint, probe, function, cancellationToken)
.ConfigureAwait(false).GetAwaiter().GetResult();

protected async Task<T> ExpectMsgOfAsync<T>(
TimeSpan? timeout,
string hint,
TestKitBase probe,
Func<object, T> function,
CancellationToken cancellationToken = default)
=> await ExpectMsgOfAsync(timeout, hint, probe, o => Task.FromResult(function(o)), cancellationToken)
.ConfigureAwait(false);

protected async Task<T> ExpectMsgOfAsync<T>(
TimeSpan? timeout,
string hint,
TestKitBase probe,
Func<object, Task<T>> function,
CancellationToken cancellationToken = default)
{
MessageEnvelope envelope;
var success = probe.TryReceiveOne(out envelope, timeout);
var (success, envelope) = await probe.TryReceiveOneAsync(timeout, cancellationToken)
.ConfigureAwait(false);

if(!success)
Assertions.Fail(string.Format("expected message of type {0} but timed out after {1}", typeof(T), GetTimeoutOrDefault(timeout)));
Assertions.Fail($"expected message of type {typeof(T)} but timed out after {GetTimeoutOrDefault(timeout)}");

var message = envelope.Message;
Assertions.AssertTrue(message != null, string.Format("expected {0} but got null message", hint));
Assertions.AssertTrue(message != null, $"expected {hint} but got null message");
//TODO: Check next line.
Assertions.AssertTrue(function.GetMethodInfo().GetParameters().Any(x => x.ParameterType.IsInstanceOfType(message)), string.Format("expected {0} but got {1} instead", hint, message));
return function.Invoke(message);
Assertions.AssertTrue(
function.GetMethodInfo().GetParameters().Any(x => x.ParameterType.IsInstanceOfType(message)),
$"expected {hint} but got {message} instead");

return await function(message).ConfigureAwait(false);
}

protected T ExpectMsgPf<T>(string hint, Func<object, T> pf)
=> ExpectMsgPf(hint, this, pf);

protected T ExpectMsgPf<T>(string hint, TestKitBase probe, Func<object, T> pf)
protected T ExpectMsgOf<T>(
string hint,
TestKitBase probe,
Func<object, T> pf,
CancellationToken cancellationToken = default)
=> ExpectMsgOfAsync(hint, probe, pf, cancellationToken)
.ConfigureAwait(false).GetAwaiter().GetResult();

protected async Task<T> ExpectMsgOfAsync<T>(
string hint,
TestKitBase probe,
Func<object, T> pf,
CancellationToken cancellationToken = default)
=> await ExpectMsgOfAsync(hint, probe, o => Task.FromResult(pf(o)), cancellationToken)
.ConfigureAwait(false);

protected async Task<T> ExpectMsgOfAsync<T>(
string hint,
TestKitBase probe,
Func<object, Task<T>> pf,
CancellationToken cancellationToken = default)
{
var t = probe.ExpectMsg<T>();
var t = await probe.ExpectMsgAsync<T>(cancellationToken: cancellationToken)
.ConfigureAwait(false);

//TODO: Check if this really is needed:
Assertions.AssertTrue(pf.GetMethodInfo().GetParameters().Any(x => x.ParameterType.IsInstanceOfType(t)), string.Format("expected {0} but got {1} instead", hint, t));
return pf.Invoke(t);
Assertions.AssertTrue(pf.GetMethodInfo().GetParameters().Any(x => x.ParameterType.IsInstanceOfType(t)),
$"expected {hint} but got {t} instead");
return await pf(t);
}

protected T ExpectMsgOf<T>(
string hint,
Func<object, T> pf,
CancellationToken cancellationToken = default)
=> ExpectMsgOfAsync(hint, this, pf, cancellationToken)
.ConfigureAwait(false).GetAwaiter().GetResult();

protected async Task<T> ExpectMsgOfAsync<T>(
string hint,
Func<object, T> pf,
CancellationToken cancellationToken = default)
=> await ExpectMsgOfAsync(hint, this, pf, cancellationToken)
.ConfigureAwait(false);

protected async Task<T> ExpectMsgOfAsync<T>(
string hint,
Func<object, Task<T>> pf,
CancellationToken cancellationToken = default)
=> await ExpectMsgOfAsync(hint, this, pf, cancellationToken)
.ConfigureAwait(false);

[Obsolete("Method name typo, please use ExpectMsgOf instead")]
protected T ExpectMsgPf<T>(TimeSpan? timeout, string hint, Func<object, T> function)
=> ExpectMsgOf(timeout, hint, this, function);

[Obsolete("Method name typo, please use ExpectMsgOf instead")]
protected T ExpectMsgPf<T>(TimeSpan? timeout, string hint, TestKitBase probe, Func<object, T> function)
=> ExpectMsgOf(timeout, hint, probe, function);

[Obsolete("Method name typo, please use ExpectMsgOf instead")]
protected T ExpectMsgPf<T>(string hint, Func<object, T> pf)
=> ExpectMsgOf(hint, this, pf);

[Obsolete("Method name typo, please use ExpectMsgOf instead")]
protected T ExpectMsgPf<T>(string hint, TestKitBase probe, Func<object, T> pf)
=> ExpectMsgOf(hint, probe, pf);

/// <summary>
/// Intercept and return an exception that's expected to be thrown by the passed function value. The thrown
/// exception must be an instance of the type specified by the type parameter of this method. This method
Expand Down
21 changes: 15 additions & 6 deletions src/core/Akka.Tests/Dispatch/AsyncAwaitSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -463,15 +463,17 @@ public AsyncPipeToDelayActor()
{
ReceiveAsync<string>(async msg =>
{
Task.Run(() =>
{
Thread.Sleep(10);
return msg;
}).PipeTo(Sender, Self); //LogicalContext is lost?!?
Delayed(msg).PipeTo(Sender, Self);

Thread.Sleep(3000);
await Task.Delay(3000);
});
}

private async Task<string> Delayed(string msg)
{
await Task.Delay(10);
return msg;
}
}

public class AsyncReentrantActor : ReceiveActor
Expand All @@ -491,6 +493,13 @@ public AsyncReentrantActor()
Thread.Sleep(3000);
});
}

private async Task<string> Delayed(string msg)
{
// Sleep to make sure the task is not completed when ContinueWith is called
await Task.Delay(100);
return msg;
}
}

[Fact]
Expand Down