Skip to content

Commit 16a1bb4

Browse files
authored
Log BalancerAddress instead of DnsEndPoint in load balancing (#1504)
1 parent 78bf205 commit 16a1bb4

File tree

10 files changed

+78
-71
lines changed

10 files changed

+78
-71
lines changed

src/Grpc.Net.Client/Balancer/Internal/BalancerHttpHandler.cs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
#if SUPPORT_LOAD_BALANCING
2020
using System;
21+
using System.Diagnostics;
2122
using System.IO;
2223
using System.Net.Http;
2324
using System.Threading;
@@ -30,6 +31,7 @@ internal class BalancerHttpHandler : DelegatingHandler
3031
{
3132
internal const string WaitForReadyKey = "WaitForReady";
3233
internal const string SubchannelKey = "Subchannel";
34+
internal const string CurrentAddressKey = "CurrentAddress";
3335

3436
private readonly ConnectionManager _manager;
3537

@@ -52,10 +54,15 @@ private async ValueTask<Stream> OnConnect(SocketsHttpConnectionContext context,
5254
{
5355
if (!context.InitialRequestMessage.TryGetOption<Subchannel>(SubchannelKey, out var subchannel))
5456
{
55-
throw new InvalidOperationException();
57+
throw new InvalidOperationException($"Unable to get subchannel from {nameof(HttpRequestMessage)}.");
58+
}
59+
if (!context.InitialRequestMessage.TryGetOption<BalancerAddress>(CurrentAddressKey, out var currentAddress))
60+
{
61+
throw new InvalidOperationException($"Unable to get current address from {nameof(HttpRequestMessage)}.");
5662
}
5763

58-
return await subchannel.Transport.GetStreamAsync(context.DnsEndPoint, cancellationToken).ConfigureAwait(false);
64+
Debug.Assert(context.DnsEndPoint.Equals(currentAddress.EndPoint), "Context endpoint should equal address endpoint.");
65+
return await subchannel.Transport.GetStreamAsync(currentAddress, cancellationToken).ConfigureAwait(false);
5966
}
6067
#endif
6168

@@ -76,7 +83,7 @@ protected override async Task<HttpResponseMessage> SendAsync(
7683
await _manager.ConnectAsync(waitForReady: false, cancellationToken).ConfigureAwait(false);
7784
var pickContext = new PickContext { Request = request };
7885
var result = await _manager.PickAsync(pickContext, waitForReady, cancellationToken).ConfigureAwait(false);
79-
var address = result.Address!;
86+
var address = result.Address;
8087
var addressEndpoint = address.EndPoint;
8188

8289
// Update request host if required.
@@ -94,6 +101,7 @@ protected override async Task<HttpResponseMessage> SendAsync(
94101
// Set sub-connection onto request.
95102
// Will be used to get a stream in SocketsHttpHandler.ConnectCallback.
96103
request.SetOption(SubchannelKey, result.Subchannel);
104+
request.SetOption(CurrentAddressKey, address);
97105
#endif
98106

99107
try

src/Grpc.Net.Client/Balancer/Internal/ConnectionManager.cs

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -302,13 +302,7 @@ public void UpdateState(BalancerState state)
302302
}
303303
}
304304

305-
public async
306-
#if !NETSTANDARD2_0
307-
ValueTask<(Subchannel Subchannel, BalancerAddress Address, Action<CompletionContext> OnComplete)>
308-
#else
309-
Task<(Subchannel Subchannel, DnsEndPoint Address, Action<CompleteContext> OnComplete)>
310-
#endif
311-
PickAsync(PickContext context, bool waitForReady, CancellationToken cancellationToken)
305+
public async ValueTask<(Subchannel Subchannel, BalancerAddress Address, Action<CompletionContext> OnComplete)> PickAsync(PickContext context, bool waitForReady, CancellationToken cancellationToken)
312306
{
313307
SubchannelPicker? previousPicker = null;
314308

@@ -330,7 +324,7 @@ public async
330324

331325
if (address != null)
332326
{
333-
ConnectionManagerLog.PickResultSuccessful(Logger, subchannel.Id, address.EndPoint);
327+
ConnectionManagerLog.PickResultSuccessful(Logger, subchannel.Id, address);
334328
return (subchannel, address, result.Complete);
335329
}
336330
else
@@ -500,8 +494,8 @@ internal static class ConnectionManagerLog
500494
private static readonly Action<ILogger, Exception?> _pickStarted =
501495
LoggerMessage.Define(LogLevel.Trace, new EventId(5, "PickStarted"), "Pick started.");
502496

503-
private static readonly Action<ILogger, int, DnsEndPoint, Exception?> _pickResultSuccessful =
504-
LoggerMessage.Define<int, DnsEndPoint>(LogLevel.Debug, new EventId(6, "PickResultSuccessful"), "Successfully picked subchannel id '{SubchannelId}' with address {CurrentAddress}.");
497+
private static readonly Action<ILogger, int, BalancerAddress, Exception?> _pickResultSuccessful =
498+
LoggerMessage.Define<int, BalancerAddress>(LogLevel.Debug, new EventId(6, "PickResultSuccessful"), "Successfully picked subchannel id '{SubchannelId}' with address {CurrentAddress}.");
505499

506500
private static readonly Action<ILogger, int, Exception?> _pickResultSubchannelNoCurrentAddress =
507501
LoggerMessage.Define<int>(LogLevel.Debug, new EventId(7, "PickResultSubchannelNoCurrentAddress"), "Picked subchannel id '{SubchannelId}' doesn't have a current address.");
@@ -550,7 +544,7 @@ public static void PickStarted(ILogger logger)
550544
_pickStarted(logger, null);
551545
}
552546

553-
public static void PickResultSuccessful(ILogger logger, int subchannelId, DnsEndPoint currentAddress)
547+
public static void PickResultSuccessful(ILogger logger, int subchannelId, BalancerAddress currentAddress)
554548
{
555549
_pickResultSuccessful(logger, subchannelId, currentAddress, null);
556550
}

src/Grpc.Net.Client/Balancer/Internal/ISubchannelTransport.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ internal interface ISubchannelTransport : IDisposable
3636
BalancerAddress? CurrentAddress { get; }
3737

3838
#if NET5_0_OR_GREATER
39-
ValueTask<Stream> GetStreamAsync(DnsEndPoint endPoint, CancellationToken cancellationToken);
39+
ValueTask<Stream> GetStreamAsync(BalancerAddress address, CancellationToken cancellationToken);
4040
#endif
4141

4242
#if !NETSTANDARD2_0

src/Grpc.Net.Client/Balancer/Internal/PassiveSubchannelTransport.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ public void Dispose()
8484
}
8585

8686
#if NET5_0_OR_GREATER
87-
public ValueTask<Stream> GetStreamAsync(DnsEndPoint endPoint, CancellationToken cancellationToken)
87+
public ValueTask<Stream> GetStreamAsync(BalancerAddress address, CancellationToken cancellationToken)
8888
{
8989
throw new NotSupportedException();
9090
}

src/Grpc.Net.Client/Balancer/Internal/SocketConnectivitySubchannelTransport.cs

Lines changed: 45 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -49,15 +49,17 @@ namespace Grpc.Net.Client.Balancer.Internal
4949
/// </summary>
5050
internal class SocketConnectivitySubchannelTransport : ISubchannelTransport, IDisposable
5151
{
52+
internal record struct ActiveStream(BalancerAddress Address, Socket Socket, Stream? Stream);
53+
5254
private readonly ILogger _logger;
5355
private readonly Subchannel _subchannel;
5456
private readonly TimeSpan _socketPingInterval;
55-
private readonly List<(DnsEndPoint EndPoint, Socket Socket, Stream? Stream)> _activeStreams;
57+
private readonly List<ActiveStream> _activeStreams;
5658
private readonly Timer _socketConnectedTimer;
5759

5860
private int _lastEndPointIndex;
5961
private Socket? _initialSocket;
60-
private DnsEndPoint? _initialSocketEndPoint;
62+
private BalancerAddress? _initialSocketAddress;
6163
private bool _disposed;
6264
private BalancerAddress? _currentAddress;
6365

@@ -66,7 +68,7 @@ public SocketConnectivitySubchannelTransport(Subchannel subchannel, TimeSpan soc
6668
_logger = loggerFactory.CreateLogger<SocketConnectivitySubchannelTransport>();
6769
_subchannel = subchannel;
6870
_socketPingInterval = socketPingInterval;
69-
_activeStreams = new List<(DnsEndPoint, Socket, Stream?)>();
71+
_activeStreams = new List<ActiveStream>();
7072
_socketConnectedTimer = new Timer(OnCheckSocketConnection, state: null, Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan);
7173
}
7274

@@ -75,7 +77,7 @@ public SocketConnectivitySubchannelTransport(Subchannel subchannel, TimeSpan soc
7577
public bool HasStream { get; }
7678

7779
// For testing. Take a copy under lock for thread-safety.
78-
internal IReadOnlyList<(DnsEndPoint EndPoint, Socket Socket, Stream? Stream)> GetActiveStreams()
80+
internal IReadOnlyList<ActiveStream> GetActiveStreams()
7981
{
8082
lock (Lock)
8183
{
@@ -89,7 +91,7 @@ public void Disconnect()
8991
{
9092
_initialSocket?.Dispose();
9193
_initialSocket = null;
92-
_initialSocketEndPoint = null;
94+
_initialSocketAddress = null;
9395
_lastEndPointIndex = 0;
9496
_socketConnectedTimer.Change(TimeSpan.Zero, TimeSpan.Zero);
9597
_currentAddress = null;
@@ -119,16 +121,16 @@ public async ValueTask<bool> TryConnectAsync(CancellationToken cancellationToken
119121

120122
try
121123
{
122-
SocketConnectivitySubchannelTransportLog.ConnectingSocket(_logger, currentAddress.EndPoint);
124+
SocketConnectivitySubchannelTransportLog.ConnectingSocket(_logger, currentAddress);
123125
await socket.ConnectAsync(currentAddress.EndPoint, cancellationToken).ConfigureAwait(false);
124-
SocketConnectivitySubchannelTransportLog.ConnectedSocket(_logger, currentAddress.EndPoint);
126+
SocketConnectivitySubchannelTransportLog.ConnectedSocket(_logger, currentAddress);
125127

126128
lock (Lock)
127129
{
128130
_currentAddress = currentAddress;
129131
_lastEndPointIndex = currentIndex;
130132
_initialSocket = socket;
131-
_initialSocketEndPoint = currentAddress.EndPoint;
133+
_initialSocketAddress = currentAddress;
132134
_socketConnectedTimer.Change(_socketPingInterval, _socketPingInterval);
133135
}
134136

@@ -137,7 +139,7 @@ public async ValueTask<bool> TryConnectAsync(CancellationToken cancellationToken
137139
}
138140
catch (Exception ex)
139141
{
140-
SocketConnectivitySubchannelTransportLog.ErrorConnectingSocket(_logger, currentAddress.EndPoint, ex);
142+
SocketConnectivitySubchannelTransportLog.ErrorConnectingSocket(_logger, currentAddress, ex);
141143

142144
if (firstConnectionError == null)
143145
{
@@ -167,14 +169,14 @@ private async void OnCheckSocketConnection(object? state)
167169
var socket = _initialSocket;
168170
if (socket != null)
169171
{
170-
CompatibilityHelpers.Assert(_initialSocketEndPoint != null);
172+
CompatibilityHelpers.Assert(_initialSocketAddress != null);
171173

172174
var closeSocket = false;
173175
Exception? sendException = null;
174176
try
175177
{
176178
// Check the socket is still valid by doing a zero byte send.
177-
SocketConnectivitySubchannelTransportLog.CheckingSocket(_logger, _initialSocketEndPoint);
179+
SocketConnectivitySubchannelTransportLog.CheckingSocket(_logger, _initialSocketAddress);
178180
await socket.SendAsync(Array.Empty<byte>(), SocketFlags.None).ConfigureAwait(false);
179181

180182
// Also poll socket to check if it can be read from.
@@ -184,7 +186,7 @@ private async void OnCheckSocketConnection(object? state)
184186
{
185187
closeSocket = true;
186188
sendException = ex;
187-
SocketConnectivitySubchannelTransportLog.ErrorCheckingSocket(_logger, _initialSocketEndPoint, ex);
189+
SocketConnectivitySubchannelTransportLog.ErrorCheckingSocket(_logger, _initialSocketAddress, ex);
188190
}
189191

190192
if (closeSocket)
@@ -195,7 +197,7 @@ private async void OnCheckSocketConnection(object? state)
195197
{
196198
_initialSocket.Dispose();
197199
_initialSocket = null;
198-
_initialSocketEndPoint = null;
200+
_initialSocketAddress = null;
199201
_currentAddress = null;
200202
_lastEndPointIndex = 0;
201203
}
@@ -210,20 +212,20 @@ private async void OnCheckSocketConnection(object? state)
210212
}
211213
}
212214

213-
public async ValueTask<Stream> GetStreamAsync(DnsEndPoint endPoint, CancellationToken cancellationToken)
215+
public async ValueTask<Stream> GetStreamAsync(BalancerAddress address, CancellationToken cancellationToken)
214216
{
215-
SocketConnectivitySubchannelTransportLog.CreatingStream(_logger, endPoint);
217+
SocketConnectivitySubchannelTransportLog.CreatingStream(_logger, address);
216218

217219
Socket? socket = null;
218220
lock (Lock)
219221
{
220222
if (_initialSocket != null &&
221-
_initialSocketEndPoint != null &&
222-
Equals(_initialSocketEndPoint, endPoint))
223+
_initialSocketAddress != null &&
224+
Equals(_initialSocketAddress, address))
223225
{
224226
socket = _initialSocket;
225227
_initialSocket = null;
226-
_initialSocketEndPoint = null;
228+
_initialSocketAddress = null;
227229
}
228230
}
229231

@@ -242,7 +244,7 @@ public async ValueTask<Stream> GetStreamAsync(DnsEndPoint endPoint, Cancellation
242244
if (socket == null)
243245
{
244246
socket = new Socket(SocketType.Stream, ProtocolType.Tcp) { NoDelay = true };
245-
await socket.ConnectAsync(endPoint, cancellationToken).ConfigureAwait(false);
247+
await socket.ConnectAsync(address.EndPoint, cancellationToken).ConfigureAwait(false);
246248
}
247249

248250
var networkStream = new NetworkStream(socket, ownsSocket: true);
@@ -252,7 +254,7 @@ public async ValueTask<Stream> GetStreamAsync(DnsEndPoint endPoint, Cancellation
252254

253255
lock (Lock)
254256
{
255-
_activeStreams.Add((endPoint, socket, stream));
257+
_activeStreams.Add(new ActiveStream(address, socket, stream));
256258
}
257259

258260
return stream;
@@ -282,7 +284,7 @@ private void OnStreamDisposed(Stream streamWrapper)
282284
if (t.Stream == streamWrapper)
283285
{
284286
_activeStreams.RemoveAt(i);
285-
SocketConnectivitySubchannelTransportLog.DisposingStream(_logger, t.EndPoint);
287+
SocketConnectivitySubchannelTransportLog.DisposingStream(_logger, t.Address);
286288

287289
// If the last active streams is removed then there is no active connection.
288290
disconnect = _activeStreams.Count == 0;
@@ -318,51 +320,51 @@ public void OnRequestComplete(CompletionContext context)
318320

319321
internal static class SocketConnectivitySubchannelTransportLog
320322
{
321-
private static readonly Action<ILogger, DnsEndPoint, Exception?> _connectingSocket =
322-
LoggerMessage.Define<DnsEndPoint>(LogLevel.Trace, new EventId(1, "ConnectingSocket"), "Connecting socket to '{Address}'.");
323+
private static readonly Action<ILogger, BalancerAddress, Exception?> _connectingSocket =
324+
LoggerMessage.Define<BalancerAddress>(LogLevel.Trace, new EventId(1, "ConnectingSocket"), "Connecting socket to {Address}.");
323325

324-
private static readonly Action<ILogger, DnsEndPoint, Exception?> _connectedSocket =
325-
LoggerMessage.Define<DnsEndPoint>(LogLevel.Debug, new EventId(2, "ConnectedSocket"), "Connected to socket '{Address}'.");
326+
private static readonly Action<ILogger, BalancerAddress, Exception?> _connectedSocket =
327+
LoggerMessage.Define<BalancerAddress>(LogLevel.Debug, new EventId(2, "ConnectedSocket"), "Connected to socket {Address}.");
326328

327-
private static readonly Action<ILogger, DnsEndPoint, Exception?> _errorConnectingSocket =
328-
LoggerMessage.Define<DnsEndPoint>(LogLevel.Error, new EventId(3, "ErrorConnectingSocket"), "Error connecting to socket '{Address}'.");
329+
private static readonly Action<ILogger, BalancerAddress, Exception?> _errorConnectingSocket =
330+
LoggerMessage.Define<BalancerAddress>(LogLevel.Error, new EventId(3, "ErrorConnectingSocket"), "Error connecting to socket {Address}.");
329331

330-
private static readonly Action<ILogger, DnsEndPoint, Exception?> _checkingSocket =
331-
LoggerMessage.Define<DnsEndPoint>(LogLevel.Trace, new EventId(4, "CheckingSocket"), "Checking socket '{Address}'.");
332+
private static readonly Action<ILogger, BalancerAddress, Exception?> _checkingSocket =
333+
LoggerMessage.Define<BalancerAddress>(LogLevel.Trace, new EventId(4, "CheckingSocket"), "Checking socket {Address}.");
332334

333-
private static readonly Action<ILogger, DnsEndPoint, Exception?> _errorCheckingSocket =
334-
LoggerMessage.Define<DnsEndPoint>(LogLevel.Error, new EventId(5, "ErrorCheckingSocket"), "Error checking socket '{Address}'.");
335+
private static readonly Action<ILogger, BalancerAddress, Exception?> _errorCheckingSocket =
336+
LoggerMessage.Define<BalancerAddress>(LogLevel.Error, new EventId(5, "ErrorCheckingSocket"), "Error checking socket {Address}.");
335337

336338
private static readonly Action<ILogger, Exception?> _errorSocketTimer =
337-
LoggerMessage.Define(LogLevel.Error, new EventId(1, "ErrorSocketTimer"), "Unexpected error in check socket timer.");
339+
LoggerMessage.Define(LogLevel.Error, new EventId(6, "ErrorSocketTimer"), "Unexpected error in check socket timer.");
338340

339-
private static readonly Action<ILogger, DnsEndPoint, Exception?> _creatingStream =
340-
LoggerMessage.Define<DnsEndPoint>(LogLevel.Trace, new EventId(6, "CreatingStream"), "Creating stream for '{Address}'.");
341+
private static readonly Action<ILogger, BalancerAddress, Exception?> _creatingStream =
342+
LoggerMessage.Define<BalancerAddress>(LogLevel.Trace, new EventId(7, "CreatingStream"), "Creating stream for {Address}.");
341343

342-
private static readonly Action<ILogger, DnsEndPoint, Exception?> _disposingStream =
343-
LoggerMessage.Define<DnsEndPoint>(LogLevel.Trace, new EventId(7, "DisposingStream"), "Disposing stream for '{Address}'.");
344+
private static readonly Action<ILogger, BalancerAddress, Exception?> _disposingStream =
345+
LoggerMessage.Define<BalancerAddress>(LogLevel.Trace, new EventId(8, "DisposingStream"), "Disposing stream for {Address}.");
344346

345-
public static void ConnectingSocket(ILogger logger, DnsEndPoint address)
347+
public static void ConnectingSocket(ILogger logger, BalancerAddress address)
346348
{
347349
_connectingSocket(logger, address, null);
348350
}
349351

350-
public static void ConnectedSocket(ILogger logger, DnsEndPoint address)
352+
public static void ConnectedSocket(ILogger logger, BalancerAddress address)
351353
{
352354
_connectedSocket(logger, address, null);
353355
}
354356

355-
public static void ErrorConnectingSocket(ILogger logger, DnsEndPoint address, Exception ex)
357+
public static void ErrorConnectingSocket(ILogger logger, BalancerAddress address, Exception ex)
356358
{
357359
_errorConnectingSocket(logger, address, ex);
358360
}
359361

360-
public static void CheckingSocket(ILogger logger, DnsEndPoint address)
362+
public static void CheckingSocket(ILogger logger, BalancerAddress address)
361363
{
362364
_checkingSocket(logger, address, null);
363365
}
364366

365-
public static void ErrorCheckingSocket(ILogger logger, DnsEndPoint address, Exception ex)
367+
public static void ErrorCheckingSocket(ILogger logger, BalancerAddress address, Exception ex)
366368
{
367369
_errorCheckingSocket(logger, address, ex);
368370
}
@@ -372,12 +374,12 @@ public static void ErrorSocketTimer(ILogger logger, Exception ex)
372374
_errorSocketTimer(logger, ex);
373375
}
374376

375-
public static void CreatingStream(ILogger logger, DnsEndPoint address)
377+
public static void CreatingStream(ILogger logger, BalancerAddress address)
376378
{
377379
_creatingStream(logger, address, null);
378380
}
379381

380-
public static void DisposingStream(ILogger logger, DnsEndPoint address)
382+
public static void DisposingStream(ILogger logger, BalancerAddress address)
381383
{
382384
_disposingStream(logger, address, null);
383385
}

0 commit comments

Comments
 (0)