Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 76 additions & 85 deletions src/core/Akka.Remote.Tests/Transport/TestTransportSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,213 +7,204 @@

using System;
using System.Linq;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Remote.Transport;
using Akka.TestKit;
using Akka.TestKit.Extensions;
using Akka.Util.Internal;
using Google.Protobuf;
using Xunit;

namespace Akka.Remote.Tests.Transport{


namespace Akka.Remote.Tests.Transport
{
public class TestTransportSpec : AkkaSpec
{
#region Setup / Teardown

protected Address addressA = new Address("test", "testsystemA", "testhostA", 4321);
protected Address addressB = new Address("test", "testsystemB", "testhostB", 5432);
protected Address nonExistantAddress = new Address("test", "nosystem", "nohost", 0);
private readonly Address _addressA = new Address("test", "testsystemA", "testhostA", 4321);
private readonly Address _addressB = new Address("test", "testsystemB", "testhostB", 5432);
private readonly Address _nonExistantAddress = new Address("test", "nosystem", "nohost", 0);

public IActorRef Self { get { return TestActor; } }
private TimeSpan DefaultTimeout { get { return Dilated(TestKitSettings.DefaultTimeout); } }
private TimeSpan DefaultTimeout => Dilated(TestKitSettings.DefaultTimeout);
#endregion

#region Tests

[Fact]
public void TestTransport_must_return_an_Address_and_TaskCompletionSource_on_Listen()
public async Task TestTransport_must_return_an_Address_and_TaskCompletionSource_on_Listen()
{
//arrange
var registry = new AssociationRegistry();
var transportA = new TestTransport(addressA, registry);
var transportA = new TestTransport(_addressA, registry);

//act
var result = transportA.Listen();
result.Wait(DefaultTimeout);
var result = await transportA.Listen().WithTimeout(DefaultTimeout);

//assert
Assert.Equal(addressA, result.Result.Item1);
Assert.NotNull(result.Result.Item2);
Assert.Equal(_addressA, result.Item1);
Assert.NotNull(result.Item2);

var snapshot = registry.LogSnapshot();
Assert.Equal(1, snapshot.Count);
Assert.IsType<ListenAttempt>(snapshot[0]);
Assert.Equal(addressA, ((ListenAttempt)snapshot[0]).BoundAddress);
Assert.Equal(_addressA, ((ListenAttempt)snapshot[0]).BoundAddress);
}

[Fact]
public void TestTransport_must_associate_successfully_with_another_TestTransport()
public async Task TestTransport_must_associate_successfully_with_another_TestTransport()
{
//arrange
var registry = new AssociationRegistry();
var transportA = new TestTransport(addressA, registry);
var transportB = new TestTransport(addressB, registry);
var transportA = new TestTransport(_addressA, registry);
var transportB = new TestTransport(_addressB, registry);

//act

//must complete returned promises to receive events
var localConnectionFuture = transportA.Listen();
localConnectionFuture.Wait(DefaultTimeout);
localConnectionFuture.Result.Item2.SetResult(new ActorAssociationEventListener(Self));
var localConnectionFuture = await transportA.Listen().WithTimeout(DefaultTimeout);
localConnectionFuture.Item2.SetResult(new ActorAssociationEventListener(TestActor));

var remoteConnectionFuture = transportB.Listen();
remoteConnectionFuture.Wait(DefaultTimeout);
remoteConnectionFuture.Result.Item2.SetResult(new ActorAssociationEventListener(Self));
var remoteConnectionFuture = await transportB.Listen().WithTimeout(DefaultTimeout);
remoteConnectionFuture.Item2.SetResult(new ActorAssociationEventListener(TestActor));

var ready = registry.TransportsReady(addressA, addressB);
var ready = registry.TransportsReady(_addressA, _addressB);
Assert.True(ready);

transportA.Associate(addressB);
ExpectMsgPf<AssociationHandle>(DefaultTimeout, "Expect InboundAssociation from A",
// task is deliberately not awaited
var task = transportA.Associate(_addressB);
await ExpectMsgOfAsync(DefaultTimeout, "Expect InboundAssociation from A",
m => m.AsInstanceOf<InboundAssociation>().Association);

//assert
var associateAttempt = (registry.LogSnapshot().Single(x => x is AssociateAttempt)).AsInstanceOf<AssociateAttempt>();
Assert.Equal(addressA, associateAttempt.LocalAddress);
Assert.Equal(addressB, associateAttempt.RemoteAddress);
Assert.Equal(_addressA, associateAttempt.LocalAddress);
Assert.Equal(_addressB, associateAttempt.RemoteAddress);
}

[Fact]
public void TestTransport_fail_to_association_with_nonexisting_Address()
public async Task TestTransport_fail_to_association_with_non_existing_Address()
{
//arrange
var registry = new AssociationRegistry();
var transportA = new TestTransport(addressA, registry);
var transportA = new TestTransport(_addressA, registry);

//act
var result = transportA.Listen();
result.Wait(DefaultTimeout);
result.Result.Item2.SetResult(new ActorAssociationEventListener(Self));
var result = await transportA.Listen().WithTimeout(DefaultTimeout);
result.Item2.SetResult(new ActorAssociationEventListener(TestActor));

//assert
XAssert.Throws<InvalidAssociationException>(() =>
await Assert.ThrowsAsync<InvalidAssociationException>(async () =>
{
var associateTask = transportA.Associate(nonExistantAddress);
associateTask.Wait(DefaultTimeout);
var fireException = associateTask.Result;
await transportA.Associate(_nonExistantAddress).WithTimeout(DefaultTimeout);
});
}

[Fact]
public void TestTransport_should_emulate_sending_PDUs()
public async Task TestTransport_should_emulate_sending_PDUs()
{
//arrange
var registry = new AssociationRegistry();
var transportA = new TestTransport(addressA, registry);
var transportB = new TestTransport(addressB, registry);
var transportA = new TestTransport(_addressA, registry);
var transportB = new TestTransport(_addressB, registry);

//act

//must complete returned promises to receive events
var localConnectionFuture = transportA.Listen();
localConnectionFuture.Wait(DefaultTimeout);
localConnectionFuture.Result.Item2.SetResult(new ActorAssociationEventListener(Self));
var localConnectionFuture = await transportA.Listen().WithTimeout(DefaultTimeout);
localConnectionFuture.Item2.SetResult(new ActorAssociationEventListener(TestActor));

var remoteConnectionFuture = transportB.Listen();
remoteConnectionFuture.Wait(DefaultTimeout);
remoteConnectionFuture.Result.Item2.SetResult(new ActorAssociationEventListener(Self));
var remoteConnectionFuture = await transportB.Listen().WithTimeout(DefaultTimeout);
remoteConnectionFuture.Item2.SetResult(new ActorAssociationEventListener(TestActor));

var ready = registry.TransportsReady(addressA, addressB);
var ready = registry.TransportsReady(_addressA, _addressB);
Assert.True(ready);

var associate = transportA.Associate(addressB);
var handleB = ExpectMsgPf<AssociationHandle>(DefaultTimeout, "Expect InboundAssociation from A", o =>
var associate = transportA.Associate(_addressB);
var handleB = await ExpectMsgOfAsync(DefaultTimeout, "Expect InboundAssociation from A", o =>
{
var handle = o as InboundAssociation;
if (handle != null && handle.Association.RemoteAddress.Equals(addressA)) return handle.Association;
if (o is InboundAssociation handle && handle.Association.RemoteAddress.Equals(_addressA))
return handle.Association;
return null;
});
handleB.ReadHandlerSource.SetResult(new ActorHandleEventListener(Self));
handleB.ReadHandlerSource.SetResult(new ActorHandleEventListener(TestActor));

associate.Wait(DefaultTimeout);
await associate.WithTimeout(DefaultTimeout);
var handleA = associate.Result;

//Initialize handles
handleA.ReadHandlerSource.SetResult(new ActorHandleEventListener(Self));
handleA.ReadHandlerSource.SetResult(new ActorHandleEventListener(TestActor));

var akkaPDU = ByteString.CopyFromUtf8("AkkaPDU");
var akkaPdu = ByteString.CopyFromUtf8("AkkaPDU");

var exists = registry.ExistsAssociation(addressA, addressB);
var exists = registry.ExistsAssociation(_addressA, _addressB);
Assert.True(exists);

handleA.Write(akkaPDU);
handleA.Write(akkaPdu);

//assert
ExpectMsgPf(DefaultTimeout, "Expect InboundPayload from A", o =>
await ExpectMsgOfAsync(DefaultTimeout, "Expect InboundPayload from A", o =>
{
var payload = o as InboundPayload;
if (payload != null && payload.Payload.Equals(akkaPDU)) return akkaPDU;
if (o is InboundPayload payload && payload.Payload.Equals(akkaPdu))
return akkaPdu;
return null;
});

var writeAttempt = (registry.LogSnapshot().Single(x => x is WriteAttempt)).AsInstanceOf<WriteAttempt>();
Assert.True(writeAttempt.Sender.Equals(addressA) && writeAttempt.Recipient.Equals(addressB)
&& writeAttempt.Payload.Equals(akkaPDU));
Assert.True(writeAttempt.Sender.Equals(_addressA) && writeAttempt.Recipient.Equals(_addressB)
&& writeAttempt.Payload.Equals(akkaPdu));
}

[Fact]
public void TestTransport_should_emulate_disassociation()
public async Task TestTransport_should_emulate_disassociation()
{
//arrange
var registry = new AssociationRegistry();
var transportA = new TestTransport(addressA, registry);
var transportB = new TestTransport(addressB, registry);
var transportA = new TestTransport(_addressA, registry);
var transportB = new TestTransport(_addressB, registry);

//act

//must complete returned promises to receive events
var localConnectionFuture = transportA.Listen();
localConnectionFuture.Wait(DefaultTimeout);
localConnectionFuture.Result.Item2.SetResult(new ActorAssociationEventListener(Self));
var localConnectionFuture = await transportA.Listen().WithTimeout(DefaultTimeout);
localConnectionFuture.Item2.SetResult(new ActorAssociationEventListener(TestActor));

var remoteConnectionFuture = transportB.Listen();
remoteConnectionFuture.Wait(DefaultTimeout);
remoteConnectionFuture.Result.Item2.SetResult(new ActorAssociationEventListener(Self));
var remoteConnectionFuture = await transportB.Listen().WithTimeout(DefaultTimeout);
remoteConnectionFuture.Item2.SetResult(new ActorAssociationEventListener(TestActor));

var ready = registry.TransportsReady(addressA, addressB);
var ready = registry.TransportsReady(_addressA, _addressB);
Assert.True(ready);

var associate = transportA.Associate(addressB);
var handleB = ExpectMsgPf<AssociationHandle>(DefaultTimeout, "Expect InboundAssociation from A", o =>
var associate = transportA.Associate(_addressB);
var handleB = await ExpectMsgOfAsync(DefaultTimeout, "Expect InboundAssociation from A", o =>
{
var handle = o as InboundAssociation;
if (handle != null && handle.Association.RemoteAddress.Equals(addressA)) return handle.Association;
if (o is InboundAssociation handle && handle.Association.RemoteAddress.Equals(_addressA))
return handle.Association;
return null;
});
handleB.ReadHandlerSource.SetResult(new ActorHandleEventListener(Self));
handleB.ReadHandlerSource.SetResult(new ActorHandleEventListener(TestActor));

associate.Wait(DefaultTimeout);
await associate.WithTimeout(DefaultTimeout);
var handleA = associate.Result;

//Initialize handles
handleA.ReadHandlerSource.SetResult(new ActorHandleEventListener(Self));
handleA.ReadHandlerSource.SetResult(new ActorHandleEventListener(TestActor));

var exists = registry.ExistsAssociation(addressA, addressB);
var exists = registry.ExistsAssociation(_addressA, _addressB);
Assert.True(exists);

handleA.Disassociate();
handleA.Disassociate("Disassociation test", Log);

var msg = ExpectMsgPf(DefaultTimeout, "Expected Disassociated", o => o.AsInstanceOf<Disassociated>());
var msg = await ExpectMsgOfAsync(DefaultTimeout, "Expected Disassociated", o => o.AsInstanceOf<Disassociated>());

//assert
Assert.NotNull(msg);

exists = registry.ExistsAssociation(addressA, addressB);
exists = registry.ExistsAssociation(_addressA, _addressB);
Assert.True(!exists, "Association should no longer exist");

var disassociateAttempt = registry.LogSnapshot().Single(x => x is DisassociateAttempt).AsInstanceOf<DisassociateAttempt>();
Assert.True(disassociateAttempt.Requestor.Equals(addressA) && disassociateAttempt.Remote.Equals(addressB));
Assert.True(disassociateAttempt.Requestor.Equals(_addressA) && disassociateAttempt.Remote.Equals(_addressB));
}

#endregion
Expand Down