Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/articles/actors/io.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,11 @@ The following code example shows a simple server that echo's data received from
[!code-csharp[Main](../../../src/core/Akka.Docs.Tests/Networking/IO/EchoServer.cs?name=echoServer)]

[!code-csharp[Main](../../../src/core/Akka.Docs.Tests/Networking/IO/EchoConnection.cs?name=echoConnection)]

### TCP Listener Statistics
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added documentation along with a small example on how to use this


If you're building a long-running TCP server you can subscribe to the `TcpListener` actor's statistics via the `Tcp.SubscribeToTcpListenerStats` message:

[!code-csharp[Main](../../../src/core/Akka.Docs.Tests/Networking/IO/EchoServer.cs?name=echoServerWithStats)]

This will result in a `Tcp.TcpListenerStatistics` message being delivered with updated, rolling statistics once every 10 seconds or so roughly. Each independent `TcpListener` maintains its own statistics and they can support an arbitrary number of subscribers.
Original file line number Diff line number Diff line change
Expand Up @@ -4097,6 +4097,7 @@ namespace Akka.IO
{
public Event() { }
}
public interface ITcpQuery : Akka.Actor.INoSerializationVerificationNeeded { }
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hate that Message is the base class for all of these TCP messages. They really just should implement an interface, so I did it that way for these new query types.

public class Message : Akka.Actor.INoSerializationVerificationNeeded
{
public Message() { }
Expand Down Expand Up @@ -4148,10 +4149,24 @@ namespace Akka.IO
public bool WantsAck { get; }
public Akka.IO.Tcp.CompoundWrite Append(Akka.IO.Tcp.WriteCommand that) { }
}
public sealed class SubscribeToTcpListenerStats : Akka.Actor.INoSerializationVerificationNeeded, Akka.IO.Tcp.ITcpQuery, System.IEquatable<Akka.IO.Tcp.SubscribeToTcpListenerStats>
{
public SubscribeToTcpListenerStats(Akka.Actor.IActorRef Subscriber) { }
public Akka.Actor.IActorRef Subscriber { get; set; }
}
public sealed class SuspendReading : Akka.IO.Tcp.Command
{
public static readonly Akka.IO.Tcp.SuspendReading Instance;
}
[System.Runtime.CompilerServices.NullableAttribute(0)]
public sealed class TcpListenerStatistics : Akka.Actor.INoSerializationVerificationNeeded, Akka.IO.Tcp.ITcpQuery, System.IEquatable<Akka.IO.Tcp.TcpListenerStatistics>
{
public TcpListenerStatistics() { }
public long AcceptedIncomingConnections { get; set; }
public long FailedIncomingConnections { get; set; }
public long IncomingConnectionsClosed { get; set; }
public long RetriedIncomingConnections { get; set; }
}
public class Unbind : Akka.IO.Tcp.Command
{
public static readonly Akka.IO.Tcp.Unbind Instance;
Expand All @@ -4160,6 +4175,11 @@ namespace Akka.IO
{
public static readonly Akka.IO.Tcp.Unbound Instance;
}
public sealed class UnsubscribeFromTcpListenerStats : Akka.Actor.INoSerializationVerificationNeeded, Akka.IO.Tcp.ITcpQuery, System.IEquatable<Akka.IO.Tcp.UnsubscribeFromTcpListenerStats>
{
public UnsubscribeFromTcpListenerStats(Akka.Actor.IActorRef Subscriber) { }
public Akka.Actor.IActorRef Subscriber { get; set; }
}
public sealed class Write : Akka.IO.Tcp.SimpleWriteCommand
{
public static readonly Akka.IO.Tcp.Write Empty;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4087,6 +4087,7 @@ namespace Akka.IO
{
public Event() { }
}
public interface ITcpQuery : Akka.Actor.INoSerializationVerificationNeeded { }
public class Message : Akka.Actor.INoSerializationVerificationNeeded
{
public Message() { }
Expand Down Expand Up @@ -4138,10 +4139,24 @@ namespace Akka.IO
public bool WantsAck { get; }
public Akka.IO.Tcp.CompoundWrite Append(Akka.IO.Tcp.WriteCommand that) { }
}
public sealed class SubscribeToTcpListenerStats : Akka.Actor.INoSerializationVerificationNeeded, Akka.IO.Tcp.ITcpQuery, System.IEquatable<Akka.IO.Tcp.SubscribeToTcpListenerStats>
{
public SubscribeToTcpListenerStats(Akka.Actor.IActorRef Subscriber) { }
public Akka.Actor.IActorRef Subscriber { get; set; }
}
public sealed class SuspendReading : Akka.IO.Tcp.Command
{
public static readonly Akka.IO.Tcp.SuspendReading Instance;
}
[System.Runtime.CompilerServices.NullableAttribute(0)]
public sealed class TcpListenerStatistics : Akka.Actor.INoSerializationVerificationNeeded, Akka.IO.Tcp.ITcpQuery, System.IEquatable<Akka.IO.Tcp.TcpListenerStatistics>
{
public TcpListenerStatistics() { }
public long AcceptedIncomingConnections { get; set; }
public long FailedIncomingConnections { get; set; }
public long IncomingConnectionsClosed { get; set; }
public long RetriedIncomingConnections { get; set; }
}
public class Unbind : Akka.IO.Tcp.Command
{
public static readonly Akka.IO.Tcp.Unbind Instance;
Expand All @@ -4150,6 +4165,11 @@ namespace Akka.IO
{
public static readonly Akka.IO.Tcp.Unbound Instance;
}
public sealed class UnsubscribeFromTcpListenerStats : Akka.Actor.INoSerializationVerificationNeeded, Akka.IO.Tcp.ITcpQuery, System.IEquatable<Akka.IO.Tcp.UnsubscribeFromTcpListenerStats>
{
public UnsubscribeFromTcpListenerStats(Akka.Actor.IActorRef Subscriber) { }
public Akka.Actor.IActorRef Subscriber { get; set; }
}
public sealed class Write : Akka.IO.Tcp.SimpleWriteCommand
{
public static readonly Akka.IO.Tcp.Write Empty;
Expand Down
53 changes: 46 additions & 7 deletions src/core/Akka.Docs.Tests/Networking/IO/EchoServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,57 @@ public EchoServer(int port)

protected override void OnReceive(object message)
{
if (message is Tcp.Bound bound)
switch (message)
{
Console.WriteLine("Listening on {0}", bound.LocalAddress);
case Tcp.Bound bound:
Console.WriteLine("Listening on {0}", bound.LocalAddress);
break;
case Tcp.Connected:
{
var connection = Context.ActorOf(Props.Create(() => new EchoConnection(Sender)));
Sender.Tell(new Tcp.Register(connection));
break;
}
default:
Unhandled(message);
break;
}
else if (message is Tcp.Connected)
}
}

// </echoServer>

// <echoServerWithStats>
public class EchoServerWithStats : UntypedActor
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sample for the documentation

{
public EchoServerWithStats(int port)
{
Context.System.Tcp().Tell(new Tcp.Bind(Self, new IPEndPoint(IPAddress.Any, port)));
}

protected override void OnReceive(object message)
{
switch (message)
{
var connection = Context.ActorOf(Props.Create(() => new EchoConnection(Sender)));
Sender.Tell(new Tcp.Register(connection));
case Tcp.Bound bound:
Sender.Tell(new Tcp.SubscribeToTcpListenerStats(Self));
Console.WriteLine("Listening on {0}", bound.LocalAddress);
break;
case Tcp.TcpListenerStatistics stats:
Console.WriteLine("Received TCP stats: {0}", stats);
break;
case Tcp.Connected:
{
var connection = Context.ActorOf(Props.Create(() => new EchoConnection(Sender)));
Sender.Tell(new Tcp.Register(connection));
break;
}
default:
Unhandled(message);
break;
}
else Unhandled(message);
}
}

// </echoServer>
// </echoServerWithStats>
}
83 changes: 52 additions & 31 deletions src/core/Akka.Tests/IO/TcpListenerSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,16 @@ public TcpListenerSpec()
akka.io.tcp.direct-buffer-size = 512
akka.actor.serialize-creators = on
akka.io.tcp.batch-accept-limit = 2")
{ }
{
}

[Fact]
public async Task A_TCP_Listener_must_let_the_bind_commander_know_when_binding_is_complete()
{
await new TestSetup(this, pullMode: false).RunAsync(async x =>
{
await x.BindCommander.ExpectMsgAsync<Tcp.Bound>();
});
});
}

[Fact]
Expand All @@ -51,10 +52,26 @@ public async Task A_TCP_Listener_must_continue_to_accept_connections_after_a_pre
});
}

[Fact]
public async Task A_TCP_Listener_must_provide_metrics()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quick integration test

{
await new TestSetup(this, pullMode: false).RunAsync(async x =>
{
await x.BindListener();

await x.AttemptConnectionToEndpoint();
var probe = CreateTestProbe();
x.Listener.Tell(new Tcp.SubscribeToTcpListenerStats(probe.Ref));
var metrics = await probe.ExpectMsgAsync<Tcp.TcpListenerStatistics>();

Assert.Equal(1, metrics.AcceptedIncomingConnections);
});
}

[Fact]
public async Task A_TCP_Listener_must_react_to_unbind_commands_by_replying_with_unbound_and_stopping_itself()
{
await new TestSetup(this, pullMode:false).RunAsync(async x =>
await new TestSetup(this, pullMode: false).RunAsync(async x =>
{
await x.BindListener();

Expand All @@ -63,46 +80,44 @@ public async Task A_TCP_Listener_must_react_to_unbind_commands_by_replying_with_

await unbindCommander.ExpectMsgAsync(Tcp.Unbound.Instance);
await x.Parent.ExpectTerminatedAsync(x.Listener);
});
});
}

class TestSetup
private class TestSetup
{
private readonly TestKitBase _kit;
private readonly bool _pullMode;

private readonly TestProbe _handler;
private readonly IActorRef _handlerRef;
private readonly TestProbe _bindCommander;
private readonly TestProbe _parent;
private readonly TestActorRef<ListenerParent> _parentRef;

public TestSetup(TestKitBase kit, bool pullMode)
{
_kit = kit;
_pullMode = pullMode;

_handler = kit.CreateTestProbe();
_handlerRef = _handler.Ref;
_bindCommander = kit.CreateTestProbe();
_parent = kit.CreateTestProbe();
BindCommander = kit.CreateTestProbe();
Parent = kit.CreateTestProbe();
SelectorRouter = kit.CreateTestProbe();

_parentRef = new TestActorRef<ListenerParent>(kit.Sys, Props.Create(() => new ListenerParent(this, pullMode)));
_parentRef =
new TestActorRef<ListenerParent>(kit.Sys, Props.Create(() => new ListenerParent(this, pullMode)));
}

public void Run(Action<TestSetup> test)
{
test(this);
}

public async Task RunAsync(Func<TestSetup, Task> test)
{
await test(this);
}

public async Task BindListener()
{
var bound = await _bindCommander.ExpectMsgAsync<Tcp.Bound>();
var bound = await BindCommander.ExpectMsgAsync<Tcp.Bound>();
LocalEndPoint = (IPEndPoint)bound.LocalAddress;
}

Expand All @@ -112,12 +127,15 @@ public async Task AttemptConnectionToEndpoint()
.ConnectAsync(LocalEndPoint);
}

public IActorRef Listener { get { return _parentRef.UnderlyingActor.Listener; } }
public IActorRef Listener
{
get { return _parentRef.UnderlyingActor.Listener; }
}

public TestProbe SelectorRouter { get; }

public TestProbe BindCommander { get { return _bindCommander; } }
public TestProbe Parent { get { return _parent; } }
public TestProbe BindCommander { get; }
public TestProbe Parent { get; }

public IPEndPoint LocalEndPoint { get; private set; }

Expand All @@ -138,25 +156,28 @@ public ListenerParent(TestSetup test, bool pullMode)
var endpoint = new IPEndPoint(IPAddress.Loopback, 0);

_listener = Context.ActorOf(Props.Create(() =>
new TcpListener(
Tcp.For(Context.System),
test._bindCommander.Ref,
new Tcp.Bind(
_test._handler.Ref,
endpoint,
100,
new Inet.SocketOption[]{ new TestSocketOption(socket => _test.AfterBind(socket)) },
pullMode)))
new TcpListener(
Tcp.For(Context.System),
test.BindCommander.Ref,
new Tcp.Bind(
_test._handler.Ref,
endpoint,
100,
new Inet.SocketOption[] { new TestSocketOption(socket => _test.AfterBind(socket)) },
pullMode)))
.WithDeploy(Deploy.Local));
_test._parent.Watch(_listener);

_test.Parent.Watch(_listener);
}

internal IActorRef Listener { get { return _listener; } }
internal IActorRef Listener
{
get { return _listener; }
}

protected override bool Receive(object message)
{
_test._parent.Forward(message);
_test.Parent.Forward(message);
return true;
}

Expand All @@ -180,4 +201,4 @@ public override void AfterBind(Socket s)
}
}
}
}
}
46 changes: 46 additions & 0 deletions src/core/Akka/IO/Tcp.cs
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,59 @@ private SocketConnected() { }
}

#endregion


/// <summary>
/// Akka.IO Tcp messages are all derived from this class.
/// </summary>
public class Message : INoSerializationVerificationNeeded { }

#region user commands

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new message types we added - pretty self-explanatory

/// <summary>
/// Queries against Akka.IO.Tcp class types
/// </summary>
public interface ITcpQuery : INoSerializationVerificationNeeded{}

/// <summary>
/// Subscribe to receive ongoing statistics from a TCP listener. See <see cref="TcpListenerStatistics" />
/// </summary>
public sealed record SubscribeToTcpListenerStats(IActorRef Subscriber) : ITcpQuery;

/// <summary>
/// Unsubscribe from receiving ongoing statistics from a TCP listener.
/// </summary>
public sealed record UnsubscribeFromTcpListenerStats(IActorRef Subscriber) : ITcpQuery;

/// <summary>
/// A set of statistics from a specific TCP listener.
/// </summary>
/// <remarks>
/// These are ongoing, rolling statistics that are updated as the listener
/// processes incoming connections. They will not reset unless the listener is killed.
/// </remarks>
public sealed record TcpListenerStatistics : ITcpQuery
{
/// <summary>
/// Total number of accepted incoming connections
/// </summary>
public long AcceptedIncomingConnections { get; init; }

/// <summary>
/// Incoming connections that could not be accepted
/// </summary>
public long FailedIncomingConnections { get; init; }

/// <summary>
/// Incoming connections that had to be retried
/// </summary>
public long RetriedIncomingConnections { get; init; }

/// <summary>
/// Total number of incoming connections that were closed
/// </summary>
public long IncomingConnectionsClosed { get; init; }
}

// COMMANDS

Expand Down
Loading
Loading