Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions src/Grpc.Net.Client/Balancer/Internal/ConnectionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@ namespace Grpc.Net.Client.Balancer.Internal;
internal sealed class ConnectionManager : IDisposable, IChannelControlHelper
{
public static readonly BalancerAttributesKey<string> HostOverrideKey = new BalancerAttributesKey<string>("HostOverride");
private static int _currentChannelId;

private readonly object _lock;
internal readonly Resolver _resolver;
private readonly ISubchannelTransportFactory _subchannelTransportFactory;
private readonly List<Subchannel> _subchannels;
private readonly List<StateWatcher> _stateWatchers;
private readonly TaskCompletionSource<object?> _resolverStartedTcs;
private readonly int _channelId;

// Internal for testing
internal LoadBalancer? _balancer;
Expand All @@ -57,6 +59,7 @@ internal ConnectionManager(
_lock = new object();
_nextPickerTcs = new TaskCompletionSource<SubchannelPicker>(TaskCreationOptions.RunContinuationsAsynchronously);
_resolverStartedTcs = new TaskCompletionSource<object?>(TaskCreationOptions.RunContinuationsAsynchronously);
_channelId = GetNextChannelId();

Logger = loggerFactory.CreateLogger<ConnectionManager>();
LoggerFactory = loggerFactory;
Expand All @@ -76,6 +79,8 @@ internal ConnectionManager(
public bool DisableResolverServiceConfig { get; }
public LoadBalancerFactory[] LoadBalancerFactories { get; }

private static int GetNextChannelId() => Interlocked.Increment(ref _currentChannelId);

// For unit tests.
internal IReadOnlyList<Subchannel> GetSubchannels()
{
Expand All @@ -85,9 +90,10 @@ internal IReadOnlyList<Subchannel> GetSubchannels()
}
}

internal int GetNextId()
internal string GetNextId()
{
return Interlocked.Increment(ref _currentSubchannelId);
var nextSubchannelId = Interlocked.Increment(ref _currentSubchannelId);
return $"{_channelId}-{nextSubchannelId}";
}

public void ConfigureBalancer(Func<IChannelControlHelper, LoadBalancer> configure)
Expand Down Expand Up @@ -474,11 +480,11 @@ internal static class ConnectionManagerLog
private static readonly Action<ILogger, Exception?> _pickStarted =
LoggerMessage.Define(LogLevel.Trace, new EventId(5, "PickStarted"), "Pick started.");

private static readonly Action<ILogger, int, BalancerAddress, Exception?> _pickResultSuccessful =
LoggerMessage.Define<int, BalancerAddress>(LogLevel.Debug, new EventId(6, "PickResultSuccessful"), "Successfully picked subchannel id '{SubchannelId}' with address {CurrentAddress}.");
private static readonly Action<ILogger, string, BalancerAddress, Exception?> _pickResultSuccessful =
LoggerMessage.Define<string, BalancerAddress>(LogLevel.Debug, new EventId(6, "PickResultSuccessful"), "Successfully picked subchannel id '{SubchannelId}' with address {CurrentAddress}.");

private static readonly Action<ILogger, int, Exception?> _pickResultSubchannelNoCurrentAddress =
LoggerMessage.Define<int>(LogLevel.Debug, new EventId(7, "PickResultSubchannelNoCurrentAddress"), "Picked subchannel id '{SubchannelId}' doesn't have a current address.");
private static readonly Action<ILogger, string, Exception?> _pickResultSubchannelNoCurrentAddress =
LoggerMessage.Define<string>(LogLevel.Debug, new EventId(7, "PickResultSubchannelNoCurrentAddress"), "Picked subchannel id '{SubchannelId}' doesn't have a current address.");

private static readonly Action<ILogger, Exception?> _pickResultQueued =
LoggerMessage.Define(LogLevel.Debug, new EventId(8, "PickResultQueued"), "Picked queued.");
Expand Down Expand Up @@ -524,12 +530,12 @@ public static void PickStarted(ILogger logger)
_pickStarted(logger, null);
}

public static void PickResultSuccessful(ILogger logger, int subchannelId, BalancerAddress currentAddress)
public static void PickResultSuccessful(ILogger logger, string subchannelId, BalancerAddress currentAddress)
{
_pickResultSuccessful(logger, subchannelId, currentAddress, null);
}

public static void PickResultSubchannelNoCurrentAddress(ILogger logger, int subchannelId)
public static void PickResultSubchannelNoCurrentAddress(ILogger logger, string subchannelId)
{
_pickResultSubchannelNoCurrentAddress(logger, subchannelId, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -518,138 +518,138 @@ public void Dispose()

internal static class SocketConnectivitySubchannelTransportLog
{
private static readonly Action<ILogger, int, BalancerAddress, Exception?> _connectingSocket =
LoggerMessage.Define<int, BalancerAddress>(LogLevel.Trace, new EventId(1, "ConnectingSocket"), "Subchannel id '{SubchannelId}' connecting socket to {Address}.");
private static readonly Action<ILogger, string, BalancerAddress, Exception?> _connectingSocket =
LoggerMessage.Define<string, BalancerAddress>(LogLevel.Trace, new EventId(1, "ConnectingSocket"), "Subchannel id '{SubchannelId}' connecting socket to {Address}.");

private static readonly Action<ILogger, int, BalancerAddress, Exception?> _connectedSocket =
LoggerMessage.Define<int, BalancerAddress>(LogLevel.Debug, new EventId(2, "ConnectedSocket"), "Subchannel id '{SubchannelId}' connected to socket {Address}.");
private static readonly Action<ILogger, string, BalancerAddress, Exception?> _connectedSocket =
LoggerMessage.Define<string, BalancerAddress>(LogLevel.Debug, new EventId(2, "ConnectedSocket"), "Subchannel id '{SubchannelId}' connected to socket {Address}.");

private static readonly Action<ILogger, int, BalancerAddress, Exception> _errorConnectingSocket =
LoggerMessage.Define<int, BalancerAddress>(LogLevel.Debug, new EventId(3, "ErrorConnectingSocket"), "Subchannel id '{SubchannelId}' error connecting to socket {Address}.");
private static readonly Action<ILogger, string, BalancerAddress, Exception> _errorConnectingSocket =
LoggerMessage.Define<string, BalancerAddress>(LogLevel.Debug, new EventId(3, "ErrorConnectingSocket"), "Subchannel id '{SubchannelId}' error connecting to socket {Address}.");

private static readonly Action<ILogger, int, BalancerAddress, Exception?> _checkingSocket =
LoggerMessage.Define<int, BalancerAddress>(LogLevel.Trace, new EventId(4, "CheckingSocket"), "Subchannel id '{SubchannelId}' checking socket {Address}.");
private static readonly Action<ILogger, string, BalancerAddress, Exception?> _checkingSocket =
LoggerMessage.Define<string, BalancerAddress>(LogLevel.Trace, new EventId(4, "CheckingSocket"), "Subchannel id '{SubchannelId}' checking socket {Address}.");

private static readonly Action<ILogger, int, BalancerAddress, Exception> _errorCheckingSocket =
LoggerMessage.Define<int, BalancerAddress>(LogLevel.Debug, new EventId(5, "ErrorCheckingSocket"), "Subchannel id '{SubchannelId}' error checking socket {Address}.");
private static readonly Action<ILogger, string, BalancerAddress, Exception> _errorCheckingSocket =
LoggerMessage.Define<string, BalancerAddress>(LogLevel.Debug, new EventId(5, "ErrorCheckingSocket"), "Subchannel id '{SubchannelId}' error checking socket {Address}.");

private static readonly Action<ILogger, int, Exception> _errorSocketTimer =
LoggerMessage.Define<int>(LogLevel.Error, new EventId(6, "ErrorSocketTimer"), "Subchannel id '{SubchannelId}' unexpected error in check socket timer.");
private static readonly Action<ILogger, string, Exception> _errorSocketTimer =
LoggerMessage.Define<string>(LogLevel.Error, new EventId(6, "ErrorSocketTimer"), "Subchannel id '{SubchannelId}' unexpected error in check socket timer.");

private static readonly Action<ILogger, int, BalancerAddress, Exception?> _creatingStream =
LoggerMessage.Define<int, BalancerAddress>(LogLevel.Trace, new EventId(7, "CreatingStream"), "Subchannel id '{SubchannelId}' creating stream for {Address}.");
private static readonly Action<ILogger, string, BalancerAddress, Exception?> _creatingStream =
LoggerMessage.Define<string, BalancerAddress>(LogLevel.Trace, new EventId(7, "CreatingStream"), "Subchannel id '{SubchannelId}' creating stream for {Address}.");

private static readonly Action<ILogger, int, BalancerAddress, int, Exception?> _disposingStream =
LoggerMessage.Define<int, BalancerAddress, int>(LogLevel.Trace, new EventId(8, "DisposingStream"), "Subchannel id '{SubchannelId}' disposing stream for {Address}. Transport has {ActiveStreams} active streams.");
private static readonly Action<ILogger, string, BalancerAddress, int, Exception?> _disposingStream =
LoggerMessage.Define<string, BalancerAddress, int>(LogLevel.Trace, new EventId(8, "DisposingStream"), "Subchannel id '{SubchannelId}' disposing stream for {Address}. Transport has {ActiveStreams} active streams.");

private static readonly Action<ILogger, int, Exception?> _disposingTransport =
LoggerMessage.Define<int>(LogLevel.Trace, new EventId(9, "DisposingTransport"), "Subchannel id '{SubchannelId}' disposing transport.");
private static readonly Action<ILogger, string, Exception?> _disposingTransport =
LoggerMessage.Define<string>(LogLevel.Trace, new EventId(9, "DisposingTransport"), "Subchannel id '{SubchannelId}' disposing transport.");

private static readonly Action<ILogger, int, Exception> _errorOnDisposingStream =
LoggerMessage.Define<int>(LogLevel.Error, new EventId(10, "ErrorOnDisposingStream"), "Subchannel id '{SubchannelId}' unexpected error when reacting to transport stream dispose.");
private static readonly Action<ILogger, string, Exception> _errorOnDisposingStream =
LoggerMessage.Define<string>(LogLevel.Error, new EventId(10, "ErrorOnDisposingStream"), "Subchannel id '{SubchannelId}' unexpected error when reacting to transport stream dispose.");

private static readonly Action<ILogger, int, BalancerAddress, Exception?> _connectingOnCreateStream =
LoggerMessage.Define<int, BalancerAddress>(LogLevel.Trace, new EventId(11, "ConnectingOnCreateStream"), "Subchannel id '{SubchannelId}' doesn't have a connected socket available. Connecting new stream socket for {Address}.");
private static readonly Action<ILogger, string, BalancerAddress, Exception?> _connectingOnCreateStream =
LoggerMessage.Define<string, BalancerAddress>(LogLevel.Trace, new EventId(11, "ConnectingOnCreateStream"), "Subchannel id '{SubchannelId}' doesn't have a connected socket available. Connecting new stream socket for {Address}.");

private static readonly Action<ILogger, int, BalancerAddress, int, int, Exception?> _streamCreated =
LoggerMessage.Define<int, BalancerAddress, int, int>(LogLevel.Trace, new EventId(12, "StreamCreated"), "Subchannel id '{SubchannelId}' created stream for {Address} with {BufferedBytes} buffered bytes. Transport has {ActiveStreams} active streams.");
private static readonly Action<ILogger, string, BalancerAddress, int, int, Exception?> _streamCreated =
LoggerMessage.Define<string, BalancerAddress, int, int>(LogLevel.Trace, new EventId(12, "StreamCreated"), "Subchannel id '{SubchannelId}' created stream for {Address} with {BufferedBytes} buffered bytes. Transport has {ActiveStreams} active streams.");

private static readonly Action<ILogger, int, BalancerAddress, Exception> _errorPollingSocket =
LoggerMessage.Define<int, BalancerAddress>(LogLevel.Debug, new EventId(13, "ErrorPollingSocket"), "Subchannel id '{SubchannelId}' error checking socket {Address}.");
private static readonly Action<ILogger, string, BalancerAddress, Exception> _errorPollingSocket =
LoggerMessage.Define<string, BalancerAddress>(LogLevel.Debug, new EventId(13, "ErrorPollingSocket"), "Subchannel id '{SubchannelId}' error checking socket {Address}.");

private static readonly Action<ILogger, int, BalancerAddress, Exception?> _socketPollBadState =
LoggerMessage.Define<int, BalancerAddress>(LogLevel.Debug, new EventId(14, "SocketPollBadState"), "Subchannel id '{SubchannelId}' socket {Address} is in a bad state and can't be used.");
private static readonly Action<ILogger, string, BalancerAddress, Exception?> _socketPollBadState =
LoggerMessage.Define<string, BalancerAddress>(LogLevel.Debug, new EventId(14, "SocketPollBadState"), "Subchannel id '{SubchannelId}' socket {Address} is in a bad state and can't be used.");

private static readonly Action<ILogger, int, BalancerAddress, int, Exception?> _socketReceivingAvailable =
LoggerMessage.Define<int, BalancerAddress, int>(LogLevel.Trace, new EventId(15, "SocketReceivingAvailable"), "Subchannel id '{SubchannelId}' socket {Address} is receiving {ReadBytesAvailableCount} available bytes.");
private static readonly Action<ILogger, string, BalancerAddress, int, Exception?> _socketReceivingAvailable =
LoggerMessage.Define<string, BalancerAddress, int>(LogLevel.Trace, new EventId(15, "SocketReceivingAvailable"), "Subchannel id '{SubchannelId}' socket {Address} is receiving {ReadBytesAvailableCount} available bytes.");

private static readonly Action<ILogger, int, BalancerAddress, Exception?> _closingUnusableSocketOnCreateStream =
LoggerMessage.Define<int, BalancerAddress>(LogLevel.Debug, new EventId(16, "ClosingUnusableSocketOnCreateStream"), "Subchannel id '{SubchannelId}' socket {Address} is being closed because it can't be used. The socket either can't receive data or it has received unexpected data.");
private static readonly Action<ILogger, string, BalancerAddress, Exception?> _closingUnusableSocketOnCreateStream =
LoggerMessage.Define<string, BalancerAddress>(LogLevel.Debug, new EventId(16, "ClosingUnusableSocketOnCreateStream"), "Subchannel id '{SubchannelId}' socket {Address} is being closed because it can't be used. The socket either can't receive data or it has received unexpected data.");

private static readonly Action<ILogger, int, BalancerAddress, TimeSpan, Exception?> _closingSocketFromIdleTimeoutOnCreateStream =
LoggerMessage.Define<int, BalancerAddress, TimeSpan>(LogLevel.Debug, new EventId(16, "ClosingSocketFromIdleTimeoutOnCreateStream"), "Subchannel id '{SubchannelId}' socket {Address} is being closed because it exceeds the idle timeout of {SocketIdleTimeout}.");
private static readonly Action<ILogger, string, BalancerAddress, TimeSpan, Exception?> _closingSocketFromIdleTimeoutOnCreateStream =
LoggerMessage.Define<string, BalancerAddress, TimeSpan>(LogLevel.Debug, new EventId(16, "ClosingSocketFromIdleTimeoutOnCreateStream"), "Subchannel id '{SubchannelId}' socket {Address} is being closed because it exceeds the idle timeout of {SocketIdleTimeout}.");

public static void ConnectingSocket(ILogger logger, int subchannelId, BalancerAddress address)
public static void ConnectingSocket(ILogger logger, string subchannelId, BalancerAddress address)
{
_connectingSocket(logger, subchannelId, address, null);
}

public static void ConnectedSocket(ILogger logger, int subchannelId, BalancerAddress address)
public static void ConnectedSocket(ILogger logger, string subchannelId, BalancerAddress address)
{
_connectedSocket(logger, subchannelId, address, null);
}

public static void ErrorConnectingSocket(ILogger logger, int subchannelId, BalancerAddress address, Exception ex)
public static void ErrorConnectingSocket(ILogger logger, string subchannelId, BalancerAddress address, Exception ex)
{
_errorConnectingSocket(logger, subchannelId, address, ex);
}

public static void CheckingSocket(ILogger logger, int subchannelId, BalancerAddress address)
public static void CheckingSocket(ILogger logger, string subchannelId, BalancerAddress address)
{
_checkingSocket(logger, subchannelId, address, null);
}

public static void ErrorCheckingSocket(ILogger logger, int subchannelId, BalancerAddress address, Exception ex)
public static void ErrorCheckingSocket(ILogger logger, string subchannelId, BalancerAddress address, Exception ex)
{
_errorCheckingSocket(logger, subchannelId, address, ex);
}

public static void ErrorSocketTimer(ILogger logger, int subchannelId, Exception ex)
public static void ErrorSocketTimer(ILogger logger, string subchannelId, Exception ex)
{
_errorSocketTimer(logger, subchannelId, ex);
}

public static void CreatingStream(ILogger logger, int subchannelId, BalancerAddress address)
public static void CreatingStream(ILogger logger, string subchannelId, BalancerAddress address)
{
_creatingStream(logger, subchannelId, address, null);
}

public static void DisposingStream(ILogger logger, int subchannelId, BalancerAddress address, int activeStreams)
public static void DisposingStream(ILogger logger, string subchannelId, BalancerAddress address, int activeStreams)
{
_disposingStream(logger, subchannelId, address, activeStreams, null);
}

public static void DisposingTransport(ILogger logger, int subchannelId)
public static void DisposingTransport(ILogger logger, string subchannelId)
{
_disposingTransport(logger, subchannelId, null);
}

public static void ErrorOnDisposingStream(ILogger logger, int subchannelId, Exception ex)
public static void ErrorOnDisposingStream(ILogger logger, string subchannelId, Exception ex)
{
_errorOnDisposingStream(logger, subchannelId, ex);
}

public static void ConnectingOnCreateStream(ILogger logger, int subchannelId, BalancerAddress address)
public static void ConnectingOnCreateStream(ILogger logger, string subchannelId, BalancerAddress address)
{
_connectingOnCreateStream(logger, subchannelId, address, null);
}

public static void StreamCreated(ILogger logger, int subchannelId, BalancerAddress address, int bufferedBytes, int activeStreams)
public static void StreamCreated(ILogger logger, string subchannelId, BalancerAddress address, int bufferedBytes, int activeStreams)
{
_streamCreated(logger, subchannelId, address, bufferedBytes, activeStreams, null);
}

public static void ErrorPollingSocket(ILogger logger, int subchannelId, BalancerAddress address, Exception ex)
public static void ErrorPollingSocket(ILogger logger, string subchannelId, BalancerAddress address, Exception ex)
{
_errorPollingSocket(logger, subchannelId, address, ex);
}

public static void SocketPollBadState(ILogger logger, int subchannelId, BalancerAddress address)
public static void SocketPollBadState(ILogger logger, string subchannelId, BalancerAddress address)
{
_socketPollBadState(logger, subchannelId, address, null);
}

public static void SocketReceivingAvailable(ILogger logger, int subchannelId, BalancerAddress address, int readBytesAvailableCount)
public static void SocketReceivingAvailable(ILogger logger, string subchannelId, BalancerAddress address, int readBytesAvailableCount)
{
_socketReceivingAvailable(logger, subchannelId, address, readBytesAvailableCount, null);
}

public static void ClosingUnusableSocketOnCreateStream(ILogger logger, int subchannelId, BalancerAddress address)
public static void ClosingUnusableSocketOnCreateStream(ILogger logger, string subchannelId, BalancerAddress address)
{
_closingUnusableSocketOnCreateStream(logger, subchannelId, address, null);
}

public static void ClosingSocketFromIdleTimeoutOnCreateStream(ILogger logger, int subchannelId, BalancerAddress address, TimeSpan socketIdleTimeout)
public static void ClosingSocketFromIdleTimeoutOnCreateStream(ILogger logger, string subchannelId, BalancerAddress address, TimeSpan socketIdleTimeout)
{
_closingSocketFromIdleTimeoutOnCreateStream(logger, subchannelId, address, socketIdleTimeout, null);
}
Expand Down
Loading