Skip to content

Commit 6d75fca

Browse files
authored
xds: Distinct LoadStatManagers (#10009)
Currently the code maintains one LoadStatsManager2 that collects all stats. The problem with this is that in a federation situation there will be multiple LrsClients that will be periodically picking up stats from the manager and sending them to their respective control planes. This creates a first-come-first-serve situation where the stats get randomly distributed across the control planes. This change creates separate LoadStatsManagers dedicated to their own control planes, thus assuring no stats will get lost.
1 parent ec9b8e0 commit 6d75fca

File tree

3 files changed

+36
-3
lines changed

3 files changed

+36
-3
lines changed

xds/src/main/java/io/grpc/xds/LoadReportClient.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,8 @@ final class LoadReportClient {
6161
private final ScheduledExecutorService timerService;
6262
private final Stopwatch retryStopwatch;
6363
private final BackoffPolicy.Provider backoffPolicyProvider;
64-
private final LoadStatsManager2 loadStatsManager;
64+
@VisibleForTesting
65+
final LoadStatsManager2 loadStatsManager;
6566

6667
private boolean started;
6768
@Nullable

xds/src/main/java/io/grpc/xds/XdsClientImpl.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ public void uncaughtException(Thread t, Throwable e) {
9898
Map<String, ResourceSubscriber<? extends ResourceUpdate>>>
9999
resourceSubscribers = new HashMap<>();
100100
private final Map<String, XdsResourceType<?>> subscribedResourceTypeUrls = new HashMap<>();
101-
private final LoadStatsManager2 loadStatsManager;
101+
private final Map<ServerInfo, LoadStatsManager2> loadStatsManagerMap = new HashMap<>();
102102
private final Map<ServerInfo, LoadReportClient> serverLrsClientMap = new HashMap<>();
103103
private final XdsChannelFactory xdsChannelFactory;
104104
private final Bootstrapper.BootstrapInfo bootstrapInfo;
@@ -125,7 +125,6 @@ public void uncaughtException(Thread t, Throwable e) {
125125
this.bootstrapInfo = bootstrapInfo;
126126
this.context = context;
127127
this.timeService = timeService;
128-
loadStatsManager = new LoadStatsManager2(stopwatchSupplier);
129128
this.backoffPolicyProvider = backoffPolicyProvider;
130129
this.stopwatchSupplier = stopwatchSupplier;
131130
this.timeProvider = timeProvider;
@@ -155,6 +154,8 @@ private void maybeCreateXdsChannelWithLrs(ServerInfo serverInfo) {
155154
backoffPolicyProvider,
156155
stopwatchSupplier,
157156
this);
157+
LoadStatsManager2 loadStatsManager = new LoadStatsManager2(stopwatchSupplier);
158+
loadStatsManagerMap.put(serverInfo, loadStatsManager);
158159
LoadReportClient lrsClient = new LoadReportClient(
159160
loadStatsManager, xdsChannel.channel(), context, bootstrapInfo.node(), syncContext,
160161
timeService, backoffPolicyProvider, stopwatchSupplier);
@@ -342,6 +343,7 @@ public void run() {
342343
@Override
343344
ClusterDropStats addClusterDropStats(
344345
final ServerInfo serverInfo, String clusterName, @Nullable String edsServiceName) {
346+
LoadStatsManager2 loadStatsManager = loadStatsManagerMap.get(serverInfo);
345347
ClusterDropStats dropCounter =
346348
loadStatsManager.getClusterDropStats(clusterName, edsServiceName);
347349
syncContext.execute(new Runnable() {
@@ -357,6 +359,7 @@ public void run() {
357359
ClusterLocalityStats addClusterLocalityStats(
358360
final ServerInfo serverInfo, String clusterName, @Nullable String edsServiceName,
359361
Locality locality) {
362+
LoadStatsManager2 loadStatsManager = loadStatsManagerMap.get(serverInfo);
360363
ClusterLocalityStats loadCounter =
361364
loadStatsManager.getClusterLocalityStats(clusterName, edsServiceName, locality);
362365
syncContext.execute(new Runnable() {

xds/src/test/java/io/grpc/xds/XdsClientFederationTest.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import io.grpc.xds.XdsClient.ResourceWatcher;
3030
import io.grpc.xds.XdsListenerResource.LdsUpdate;
3131
import java.util.Collections;
32+
import java.util.HashSet;
3233
import java.util.Map;
3334
import java.util.Map.Entry;
3435
import java.util.UUID;
@@ -174,6 +175,34 @@ public void lrsClientsStartedForDropStats() throws InterruptedException {
174175
}
175176
}
176177

178+
/**
179+
* Assures that {@link LoadReportClient}s have distinct {@link LoadStatsManager2}s so that they
180+
* only report on the traffic for their own control plane.
181+
*/
182+
@Test
183+
public void lrsClientsHaveDistinctLoadStatsManagers() throws InterruptedException {
184+
trafficdirector.setLdsConfig(ControlPlaneRule.buildServerListener(),
185+
ControlPlaneRule.buildClientListener("test-server"));
186+
directpathPa.setLdsConfig(ControlPlaneRule.buildServerListener(),
187+
ControlPlaneRule.buildClientListener(
188+
"xdstp://server-one/envoy.config.listener.v3.Listener/test-server"));
189+
190+
xdsClient.watchXdsResource(XdsListenerResource.getInstance(), "test-server", mockWatcher);
191+
xdsClient.watchXdsResource(XdsListenerResource.getInstance(),
192+
"xdstp://server-one/envoy.config.listener.v3.Listener/test-server", mockDirectPathWatcher);
193+
194+
// With two control planes and a watcher for each, there should be two LRS clients.
195+
assertThat(xdsClient.getServerLrsClientMap().size()).isEqualTo(2);
196+
197+
// Collect the LoadStatManagers and make sure they are distinct for each control plane.
198+
HashSet<LoadStatsManager2> loadStatManagers = new HashSet<>();
199+
for (Entry<ServerInfo, LoadReportClient> entry : xdsClient.getServerLrsClientMap().entrySet()) {
200+
xdsClient.addClusterDropStats(entry.getKey(), "clusterName", "edsServiceName");
201+
loadStatManagers.add(entry.getValue().loadStatsManager);
202+
}
203+
assertThat(loadStatManagers).containsNoDuplicates();
204+
}
205+
177206
private Map<String, ?> defaultBootstrapOverride() {
178207
return ImmutableMap.of(
179208
"node", ImmutableMap.of(

0 commit comments

Comments
 (0)