Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 56 additions & 35 deletions src/core/Akka.TestKit/TestKitBase_Receive.cs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ public async Task<ArrayList> WaitForRadioSilenceAsync(TimeSpan? max = null, uint
}
});
}

/// <summary>
/// Receive one message from the internal queue of the TestActor.
/// This method blocks the specified duration or until a message
Expand Down Expand Up @@ -203,45 +202,56 @@ public bool TryReceiveOne(out MessageEnvelope envelope, TimeSpan? max, Cancellat

private bool InternalTryReceiveOne(out MessageEnvelope envelope, TimeSpan? max, CancellationToken cancellationToken, bool shouldLog)
{
bool didTake;
var task = InternalTryReceiveOneAsync(max, cancellationToken, shouldLog).AsTask();
task.Wait();
var received = task.Result;
envelope = received.envelope;
return received.success;
}

private async ValueTask<(bool success, MessageEnvelope envelope)> InternalTryReceiveOneAsync(TimeSpan? max, CancellationToken cancellationToken, bool shouldLog)
{
(bool didTake, MessageEnvelope env) take;
var maxDuration = GetTimeoutOrDefault(max);
var start = Now;
var start = Now;
if (maxDuration.IsZero())
{
ConditionalLog(shouldLog, "Trying to receive message from TestActor queue. Will not wait.");
didTake = _testState.Queue.TryTake(out envelope);
var didTake = _testState.Queue.TryTake(out var item, cancellationToken);
take = (didTake, item);
}
else if (maxDuration.IsPositiveFinite())
{
ConditionalLog(shouldLog, "Trying to receive message from TestActor queue within {0}", maxDuration);
didTake = _testState.Queue.TryTake(out envelope, (int)maxDuration.TotalMilliseconds, cancellationToken);
take = await _testState.Queue.TryTakeAsync((int)maxDuration.TotalMilliseconds, cancellationToken)
.ConfigureAwait(false);
}
else if (maxDuration == Timeout.InfiniteTimeSpan)
{
ConditionalLog(shouldLog, "Trying to receive message from TestActor queue. Will wait indefinitely.");
didTake = _testState.Queue.TryTake(out envelope, -1, cancellationToken);
take = await _testState.Queue.TryTakeAsync(-1, cancellationToken).ConfigureAwait(false);
}
else
{
ConditionalLog(shouldLog, "Trying to receive message from TestActor queue with negative timeout.");
//Negative
envelope = null;
didTake = false;
take = (false, null);
}

_testState.LastWasNoMsg = false;
if(envelope == null)
envelope = NullMessageEnvelope.Instance;
_testState.LastMessage = envelope;

if (didTake)
if (take.env == null)
take.env = NullMessageEnvelope.Instance;

_testState.LastMessage = take.env;

if (take.didTake)
{
ConditionalLog(shouldLog, "Received message after {0}.", Now - start);
return true;
return take;
}

ConditionalLog(shouldLog, "Received no message after {0}.{1}", Now - start, cancellationToken.IsCancellationRequested ? " Was canceled" : "");
return false;
return take;
}

#region Peek methods
Expand All @@ -258,7 +268,7 @@ private bool InternalTryReceiveOne(out MessageEnvelope envelope, TimeSpan? max,
/// <returns>The message if one was received; <c>null</c> otherwise</returns>
public object PeekOne(TimeSpan? max = null)
{
if(InternalTryPeekOne(out var envelope, max, CancellationToken.None, true))
if (InternalTryPeekOne(out var envelope, max, CancellationToken.None, true))
return envelope.Message;
return null;
}
Expand All @@ -271,7 +281,7 @@ public object PeekOne(TimeSpan? max = null)
/// <returns>The message if one was received; <c>null</c> otherwise</returns>
public object PeekOne(CancellationToken cancellationToken)
{
if(InternalTryPeekOne(out var envelope, Timeout.InfiniteTimeSpan, cancellationToken, true))
if (InternalTryPeekOne(out var envelope, Timeout.InfiniteTimeSpan, cancellationToken, true))
return envelope.Message;
return null;
}
Expand Down Expand Up @@ -315,50 +325,61 @@ public bool TryPeekOne(out MessageEnvelope envelope, TimeSpan? max = null)
public bool TryPeekOne(out MessageEnvelope envelope, TimeSpan? max, CancellationToken cancellationToken)
{
return InternalTryPeekOne(out envelope, max, cancellationToken, true);
}
}

private bool InternalTryPeekOne(out MessageEnvelope envelope, TimeSpan? max, CancellationToken cancellationToken, bool shouldLog)
{
bool didPeek;
var task = InternalTryPeekOneAsync(max, cancellationToken, shouldLog).AsTask();
task.Wait();
var received = task.Result;
envelope = received.envelope;
return received.success;
}
private async ValueTask<(bool success, MessageEnvelope envelope)> InternalTryPeekOneAsync(TimeSpan? max, CancellationToken cancellationToken, bool shouldLog)
{
(bool didPeek, MessageEnvelope env) peek;
var maxDuration = GetTimeoutOrDefault(max);
var start = Now;

if (maxDuration.IsZero())
{
ConditionalLog(shouldLog, "Trying to peek message from TestActor queue. Will not wait.");
didPeek = _testState.Queue.TryPeek(out envelope);
var peeked = _testState.Queue.TryPeek(out var item);
peek = (peeked, item);
}
else if (maxDuration.IsPositiveFinite())
{
ConditionalLog(shouldLog, "Trying to peek message from TestActor queue within {0}", maxDuration);
didPeek = _testState.Queue.TryPeek(out envelope, (int)maxDuration.TotalMilliseconds, cancellationToken);
peek = await _testState.Queue.TryPeekAsync((int)maxDuration.TotalMilliseconds, cancellationToken)
.ConfigureAwait(false);
}
else if (maxDuration == Timeout.InfiniteTimeSpan)
{
ConditionalLog(shouldLog, "Trying to peek message from TestActor queue. Will wait indefinitely.");
didPeek = _testState.Queue.TryPeek(out envelope, -1, cancellationToken);
peek = await _testState.Queue.TryPeekAsync(-1, cancellationToken).ConfigureAwait(false);
}
else
{
ConditionalLog(shouldLog, "Trying to peek message from TestActor queue with negative timeout.");
//Negative
envelope = null;
didPeek = false;
peek = (false, null);
}

_testState.LastWasNoMsg = false;
if(envelope == null)
envelope = NullMessageEnvelope.Instance;
_testState.LastMessage = envelope;

if (didPeek)

if (peek.env == null)
peek.env = NullMessageEnvelope.Instance;

_testState.LastMessage = peek.env;

if (peek.didPeek)
{
ConditionalLog(shouldLog, "Peeked message after {0}.", Now - start);
return true;
return peek;
}

ConditionalLog(shouldLog, "Peeked no message after {0}.{1}", Now - start, cancellationToken.IsCancellationRequested ? " Was canceled" : "");
return false;
return peek;
}
#endregion

Expand Down