Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 55 additions & 53 deletions src/core/Akka.Cluster.Tests/ClusterDomainEventPublisherSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

using System;
using System.Collections.Immutable;
using System.Linq;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.TestKit;
using FluentAssertions;
Expand Down Expand Up @@ -71,138 +73,138 @@ public ClusterDomainEventPublisherSpec() : base(Config)
}

[Fact]
public void ClusterDomainEventPublisher_must_publish_MemberJoined()
public async Task ClusterDomainEventPublisher_must_publish_MemberJoined()
{
_publisher.Tell(new InternalClusterAction.PublishChanges(state1));
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberJoined(cJoining));
await _memberSubscriber.ExpectMsgAsync(new ClusterEvent.MemberJoined(cJoining));
}

[Fact]
public void ClusterDomainEventPublisher_must_publish_MemberUp()
public async Task ClusterDomainEventPublisher_must_publish_MemberUp()
{
_publisher.Tell(new InternalClusterAction.PublishChanges(state2));
_publisher.Tell(new InternalClusterAction.PublishChanges(state3));
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberExited(bExiting));
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberUp(cUp));
await _memberSubscriber.ExpectMsgAsync(new ClusterEvent.MemberExited(bExiting));
await _memberSubscriber.ExpectMsgAsync(new ClusterEvent.MemberUp(cUp));
}

[Fact]
public void ClusterDomainEventPublisher_must_publish_leader_changed()
public async Task ClusterDomainEventPublisher_must_publish_leader_changed()
{
_publisher.Tell(new InternalClusterAction.PublishChanges(state4));
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberUp(a51Up));
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberExited(bExiting));
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberUp(cUp));
_memberSubscriber.ExpectMsg(new ClusterEvent.LeaderChanged(a51Up.Address));
_memberSubscriber.ExpectNoMsg(500.Milliseconds());
await _memberSubscriber.ExpectMsgAsync(new ClusterEvent.MemberUp(a51Up));
await _memberSubscriber.ExpectMsgAsync(new ClusterEvent.MemberExited(bExiting));
await _memberSubscriber.ExpectMsgAsync(new ClusterEvent.MemberUp(cUp));
await _memberSubscriber.ExpectMsgAsync(new ClusterEvent.LeaderChanged(a51Up.Address));
await _memberSubscriber.ExpectNoMsgAsync(500.Milliseconds());
}

[Fact]
public void ClusterDomainEventPublisher_must_publish_leader_changed_when_old_leader_leaves_and_is_removed()
public async Task ClusterDomainEventPublisher_must_publish_leader_changed_when_old_leader_leaves_and_is_removed()
{
_publisher.Tell(new InternalClusterAction.PublishChanges(state3));
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberExited(bExiting));
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberUp(cUp));
await _memberSubscriber.ExpectMsgAsync(new ClusterEvent.MemberExited(bExiting));
await _memberSubscriber.ExpectMsgAsync(new ClusterEvent.MemberUp(cUp));
_publisher.Tell(new InternalClusterAction.PublishChanges(state6));
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberLeft(aLeaving));
await _memberSubscriber.ExpectMsgAsync(new ClusterEvent.MemberLeft(aLeaving));
_publisher.Tell(new InternalClusterAction.PublishChanges(state7));
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberExited(aExiting));
_memberSubscriber.ExpectMsg(new ClusterEvent.LeaderChanged(cUp.Address));
_memberSubscriber.ExpectNoMsg(500.Milliseconds());
await _memberSubscriber.ExpectMsgAsync(new ClusterEvent.MemberExited(aExiting));
await _memberSubscriber.ExpectMsgAsync(new ClusterEvent.LeaderChanged(cUp.Address));
await _memberSubscriber.ExpectNoMsgAsync(500.Milliseconds());
// at the removed member a an empty gossip is the last thing
_publisher.Tell(new InternalClusterAction.PublishChanges(_emptyMembershipState));
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberRemoved(aRemoved, MemberStatus.Exiting));
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberRemoved(bRemoved, MemberStatus.Exiting));
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberRemoved(cRemoved, MemberStatus.Up));
_memberSubscriber.ExpectMsg(new ClusterEvent.LeaderChanged(null));
await _memberSubscriber.ExpectMsgAsync(new ClusterEvent.MemberRemoved(aRemoved, MemberStatus.Exiting));
await _memberSubscriber.ExpectMsgAsync(new ClusterEvent.MemberRemoved(bRemoved, MemberStatus.Exiting));
await _memberSubscriber.ExpectMsgAsync(new ClusterEvent.MemberRemoved(cRemoved, MemberStatus.Up));
await _memberSubscriber.ExpectMsgAsync(new ClusterEvent.LeaderChanged(null));
}

[Fact]
public void ClusterDomainEventPublisher_must_not_publish_leader_changed_when_same_leader()
public async Task ClusterDomainEventPublisher_must_not_publish_leader_changed_when_same_leader()
{
_publisher.Tell(new InternalClusterAction.PublishChanges(state4));
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberUp(a51Up));
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberExited(bExiting));
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberUp(cUp));
_memberSubscriber.ExpectMsg(new ClusterEvent.LeaderChanged(a51Up.Address));
await _memberSubscriber.ExpectMsgAsync(new ClusterEvent.MemberUp(a51Up));
await _memberSubscriber.ExpectMsgAsync(new ClusterEvent.MemberExited(bExiting));
await _memberSubscriber.ExpectMsgAsync(new ClusterEvent.MemberUp(cUp));
await _memberSubscriber.ExpectMsgAsync(new ClusterEvent.LeaderChanged(a51Up.Address));

_publisher.Tell(new InternalClusterAction.PublishChanges(state5));
_memberSubscriber.ExpectNoMsg(500.Milliseconds());
await _memberSubscriber.ExpectNoMsgAsync(500.Milliseconds());
}

[Fact]
public void ClusterDomainEventPublisher_must_publish_role_leader_changed()
public async Task ClusterDomainEventPublisher_must_publish_role_leader_changed()
{
var subscriber = CreateTestProbe();
_publisher.Tell(new InternalClusterAction.Subscribe(subscriber.Ref, ClusterEvent.SubscriptionInitialStateMode.InitialStateAsSnapshot, ImmutableHashSet.Create(typeof(ClusterEvent.RoleLeaderChanged))));
subscriber.ExpectMsg<ClusterEvent.CurrentClusterState>();
await subscriber.ExpectMsgAsync<ClusterEvent.CurrentClusterState>();
_publisher.Tell(new InternalClusterAction.PublishChanges(new MembershipState(new Gossip(ImmutableSortedSet.Create(cJoining, dUp)), dUp.UniqueAddress)));
subscriber.ExpectMsg(new ClusterEvent.RoleLeaderChanged("GRP", dUp.Address));
await subscriber.ExpectMsgAsync(new ClusterEvent.RoleLeaderChanged("GRP", dUp.Address));
_publisher.Tell(new InternalClusterAction.PublishChanges(new MembershipState(new Gossip(ImmutableSortedSet.Create(cUp, dUp)), dUp.UniqueAddress)));
subscriber.ExpectMsg(new ClusterEvent.RoleLeaderChanged("GRP", cUp.Address));
await subscriber.ExpectMsgAsync(new ClusterEvent.RoleLeaderChanged("GRP", cUp.Address));
}

[Fact]
public void ClusterDomainEventPublisher_must_send_CurrentClusterState_when_subscribe()
public async Task ClusterDomainEventPublisher_must_send_CurrentClusterState_when_subscribe()
{
var subscriber = CreateTestProbe();
_publisher.Tell(new InternalClusterAction.Subscribe(subscriber.Ref, ClusterEvent.SubscriptionInitialStateMode.InitialStateAsSnapshot, ImmutableHashSet.Create(typeof(ClusterEvent.IClusterDomainEvent))));
subscriber.ExpectMsg<ClusterEvent.CurrentClusterState>();
await subscriber.ExpectMsgAsync<ClusterEvent.CurrentClusterState>();
// but only to the new subscriber
_memberSubscriber.ExpectNoMsg(500.Milliseconds());
await _memberSubscriber.ExpectNoMsgAsync(500.Milliseconds());
}

[Fact]
public void ClusterDomainEventPublisher_must_send_events_corresponding_to_current_state_when_subscribe()
public async Task ClusterDomainEventPublisher_must_send_events_corresponding_to_current_state_when_subscribe()
{
var subscriber = CreateTestProbe();
_publisher.Tell(new InternalClusterAction.PublishChanges(state8));
_publisher.Tell(new InternalClusterAction.Subscribe(subscriber.Ref, ClusterEvent.SubscriptionInitialStateMode.InitialStateAsEvents, ImmutableHashSet.Create(typeof(ClusterEvent.IMemberEvent), typeof(ClusterEvent.ReachabilityEvent))));

subscriber.ReceiveN(4).Should().BeEquivalentTo(
(await subscriber.ReceiveNAsync(4).ToListAsync()).Should().BeEquivalentTo(
new ClusterEvent.MemberUp(aUp),
new ClusterEvent.MemberUp(cUp),
new ClusterEvent.MemberUp(dUp),
new ClusterEvent.MemberExited(bExiting));

subscriber.ExpectMsg(new ClusterEvent.UnreachableMember(dUp));
subscriber.ExpectNoMsg(500.Milliseconds());
await subscriber.ExpectMsgAsync(new ClusterEvent.UnreachableMember(dUp));
await subscriber.ExpectNoMsgAsync(500.Milliseconds());
}

[Fact]
public void ClusterDomainEventPublisher_should_support_unsubscribe()
public async Task ClusterDomainEventPublisher_should_support_unsubscribe()
{
var subscriber = CreateTestProbe();
_publisher.Tell(new InternalClusterAction.Subscribe(subscriber.Ref, ClusterEvent.SubscriptionInitialStateMode.InitialStateAsSnapshot, ImmutableHashSet.Create(typeof(ClusterEvent.IMemberEvent))));
subscriber.ExpectMsg<ClusterEvent.CurrentClusterState>();
await subscriber.ExpectMsgAsync<ClusterEvent.CurrentClusterState>();
_publisher.Tell(new InternalClusterAction.Unsubscribe(subscriber.Ref, typeof(ClusterEvent.IMemberEvent)));
_publisher.Tell(new InternalClusterAction.PublishChanges(state3));
subscriber.ExpectNoMsg(500.Milliseconds());
await subscriber.ExpectNoMsgAsync(500.Milliseconds());
// but memberSubscriber is still subscriber
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberExited(bExiting));
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberUp(cUp));
await _memberSubscriber.ExpectMsgAsync(new ClusterEvent.MemberExited(bExiting));
await _memberSubscriber.ExpectMsgAsync(new ClusterEvent.MemberUp(cUp));
}

[Fact]
public void ClusterDomainEventPublisher_must_publish_seen_changed()
public async Task ClusterDomainEventPublisher_must_publish_seen_changed()
{
var subscriber = CreateTestProbe();
_publisher.Tell(new InternalClusterAction.Subscribe(subscriber.Ref, ClusterEvent.SubscriptionInitialStateMode.InitialStateAsSnapshot, ImmutableHashSet.Create(typeof(ClusterEvent.SeenChanged))));
subscriber.ExpectMsg<ClusterEvent.CurrentClusterState>();
await subscriber.ExpectMsgAsync<ClusterEvent.CurrentClusterState>();
_publisher.Tell(new InternalClusterAction.PublishChanges(state2));
subscriber.ExpectMsg<ClusterEvent.SeenChanged>();
subscriber.ExpectNoMsg(500.Milliseconds());
await subscriber.ExpectMsgAsync<ClusterEvent.SeenChanged>();
await subscriber.ExpectNoMsgAsync(500.Milliseconds());
_publisher.Tell(new InternalClusterAction.PublishChanges(state3));
subscriber.ExpectMsg<ClusterEvent.SeenChanged>();
subscriber.ExpectNoMsg(500.Milliseconds());
await subscriber.ExpectMsgAsync<ClusterEvent.SeenChanged>();
await subscriber.ExpectNoMsgAsync(500.Milliseconds());
}

[Fact]
public void ClusterDomainEventPublisher_must_publish_removed_when_stopped()
public async Task ClusterDomainEventPublisher_must_publish_removed_when_stopped()
{
_publisher.Tell(PoisonPill.Instance);
_memberSubscriber.ExpectMsg(ClusterEvent.ClusterShuttingDown.Instance);
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberRemoved(aRemoved, MemberStatus.Up));
await _memberSubscriber.ExpectMsgAsync(ClusterEvent.ClusterShuttingDown.Instance);
await _memberSubscriber.ExpectMsgAsync(new ClusterEvent.MemberRemoved(aRemoved, MemberStatus.Up));
}

}
Expand Down