Skip to content

Commit a24705b

Browse files
committed
xds: Support localities in multiple priorities
Additional logic to support for the same locality appearing under multiple priorities.
1 parent a82ea0c commit a24705b

File tree

5 files changed

+126
-59
lines changed

5 files changed

+126
-59
lines changed

xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,6 @@ private void handleEndpointResourceUpdate() {
216216
List<EquivalentAddressGroup> addresses = new ArrayList<>();
217217
Map<String, PriorityChildConfig> priorityChildConfigs = new HashMap<>();
218218
List<String> priorities = new ArrayList<>(); // totally ordered priority list
219-
Map<Locality, Integer> localityWeights = new HashMap<>();
220219

221220
Status endpointNotFound = Status.OK;
222221
for (String cluster : clusters) {
@@ -229,7 +228,6 @@ private void handleEndpointResourceUpdate() {
229228
addresses.addAll(state.result.addresses);
230229
priorityChildConfigs.putAll(state.result.priorityChildConfigs);
231230
priorities.addAll(state.result.priorities);
232-
localityWeights.putAll(state.result.localityWeights);
233231
} else {
234232
endpointNotFound = state.status;
235233
}
@@ -260,9 +258,6 @@ private void handleEndpointResourceUpdate() {
260258
resolvedAddresses.toBuilder()
261259
.setLoadBalancingPolicyConfig(childConfig)
262260
.setAddresses(Collections.unmodifiableList(addresses))
263-
.setAttributes(resolvedAddresses.getAttributes().toBuilder()
264-
.set(InternalXdsAttributes.ATTR_LOCALITY_WEIGHTS,
265-
Collections.unmodifiableMap(localityWeights)).build())
266261
.build());
267262
}
268263

@@ -396,7 +391,6 @@ public void run() {
396391
}
397392
Map<Locality, LocalityLbEndpoints> localityLbEndpoints =
398393
update.localityLbEndpointsMap;
399-
Map<Locality, Integer> localityWeights = new HashMap<>();
400394
List<DropOverload> dropOverloads = update.dropPolicies;
401395
List<EquivalentAddressGroup> addresses = new ArrayList<>();
402396
Map<String, Map<Locality, Integer>> prioritizedLocalityWeights = new HashMap<>();
@@ -415,6 +409,8 @@ public void run() {
415409
Attributes attr =
416410
endpoint.eag().getAttributes().toBuilder()
417411
.set(InternalXdsAttributes.ATTR_LOCALITY, locality)
412+
.set(InternalXdsAttributes.ATTR_LOCALITY_WEIGHT,
413+
localityLbInfo.localityWeight())
418414
.set(InternalXdsAttributes.ATTR_SERVER_WEIGHT, weight)
419415
.build();
420416
EquivalentAddressGroup eag = new EquivalentAddressGroup(
@@ -429,7 +425,6 @@ public void run() {
429425
"Discard locality {0} with 0 healthy endpoints", locality);
430426
continue;
431427
}
432-
localityWeights.put(locality, localityLbInfo.localityWeight());
433428
if (!prioritizedLocalityWeights.containsKey(priorityName)) {
434429
prioritizedLocalityWeights.put(priorityName, new HashMap<Locality, Integer>());
435430
}
@@ -450,7 +445,7 @@ public void run() {
450445
status = Status.OK;
451446
resolved = true;
452447
result = new ClusterResolutionResult(addresses, priorityChildConfigs,
453-
sortedPriorityNames, localityWeights);
448+
sortedPriorityNames);
454449
handleEndpointResourceUpdate();
455450
}
456451
}
@@ -690,23 +685,18 @@ private static class ClusterResolutionResult {
690685
private final Map<String, PriorityChildConfig> priorityChildConfigs;
691686
// List of priority names ordered in descending priorities.
692687
private final List<String> priorities;
693-
// Most recent view on how localities in the cluster should be wighted. Only set for EDS
694-
// clusters that support the concept.
695-
private final Map<Locality, Integer> localityWeights;
696688

697689
ClusterResolutionResult(List<EquivalentAddressGroup> addresses, String priority,
698690
PriorityChildConfig config) {
699691
this(addresses, Collections.singletonMap(priority, config),
700-
Collections.singletonList(priority), Collections.emptyMap());
692+
Collections.singletonList(priority));
701693
}
702694

703695
ClusterResolutionResult(List<EquivalentAddressGroup> addresses,
704-
Map<String, PriorityChildConfig> configs, List<String> priorities,
705-
Map<Locality, Integer> localityWeights) {
696+
Map<String, PriorityChildConfig> configs, List<String> priorities) {
706697
this.addresses = addresses;
707698
this.priorityChildConfigs = configs;
708699
this.priorities = priorities;
709-
this.localityWeights = localityWeights;
710700
}
711701
}
712702

xds/src/main/java/io/grpc/xds/InternalXdsAttributes.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import io.grpc.internal.ObjectPool;
2525
import io.grpc.xds.XdsNameResolverProvider.CallCounterProvider;
2626
import io.grpc.xds.internal.security.SslContextProviderSupplier;
27-
import java.util.Map;
2827

2928
/**
3029
* Internal attributes used for xDS implementation. Do not use.
@@ -58,8 +57,8 @@ public final class InternalXdsAttributes {
5857
* Map from localities to their weights.
5958
*/
6059
@NameResolver.ResolutionResultAttr
61-
static final Attributes.Key<Map<Locality, Integer>> ATTR_LOCALITY_WEIGHTS =
62-
Attributes.Key.create("io.grpc.xds.InternalXdsAttributes.localityWeights");
60+
static final Attributes.Key<Integer> ATTR_LOCALITY_WEIGHT =
61+
Attributes.Key.create("io.grpc.xds.InternalXdsAttributes.localityWeight");
6362

6463
/**
6564
* Name of the cluster that provides this EquivalentAddressGroup.

xds/src/main/java/io/grpc/xds/WrrLocalityLoadBalancer.java

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import static io.grpc.xds.XdsLbPolicies.WEIGHTED_TARGET_POLICY_NAME;
2222

2323
import com.google.common.base.MoreObjects;
24+
import io.grpc.Attributes;
25+
import io.grpc.EquivalentAddressGroup;
2426
import io.grpc.InternalLogId;
2527
import io.grpc.LoadBalancer;
2628
import io.grpc.LoadBalancerRegistry;
@@ -68,15 +70,32 @@ public boolean acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
6870
// to produce the weighted target LB config.
6971
WrrLocalityConfig wrrLocalityConfig
7072
= (WrrLocalityConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
71-
Map<Locality, Integer> localityWeights = resolvedAddresses.getAttributes()
72-
.get(InternalXdsAttributes.ATTR_LOCALITY_WEIGHTS);
73-
74-
// Not having locality weights is a misconfiguration, and we have to return with an error.
75-
if (localityWeights == null) {
76-
Status unavailable =
77-
Status.UNAVAILABLE.withDescription("wrr_locality error: no locality weights provided");
78-
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(unavailable));
79-
return false;
73+
74+
// A map of locality weights is built up from the locality weight attributes in each address.
75+
Map<Locality, Integer> localityWeights = new HashMap<>();
76+
for (EquivalentAddressGroup eag : resolvedAddresses.getAddresses()) {
77+
Attributes eagAttrs = eag.getAttributes();
78+
Locality locality = eagAttrs.get(InternalXdsAttributes.ATTR_LOCALITY);
79+
Integer localityWeight = eagAttrs.get(InternalXdsAttributes.ATTR_LOCALITY_WEIGHT);
80+
81+
if (locality == null) {
82+
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(
83+
Status.UNAVAILABLE.withDescription("wrr_locality error: no locality provided")));
84+
return false;
85+
}
86+
if (localityWeight == null) {
87+
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(
88+
Status.UNAVAILABLE.withDescription("wrr_locality error: no locality weight provided")));
89+
return false;
90+
}
91+
92+
if (!localityWeights.containsKey(locality)) {
93+
localityWeights.put(locality, localityWeight);
94+
} else if (!localityWeights.get(locality).equals(localityWeight)) {
95+
logger.log(XdsLogLevel.WARNING,
96+
"Locality {0} has both weights {1} and {2}, using weight {1}", locality,
97+
localityWeights.get(locality), localityWeight);
98+
}
8099
}
81100

82101
// Weighted target LB expects a WeightedPolicySelection for each locality as it will create a
@@ -88,13 +107,13 @@ public boolean acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
88107
wrrLocalityConfig.childPolicy));
89108
}
90109

91-
// Remove the locality weights attribute now that we have consumed it. This is done simply for
110+
// Remove the locality weight attribute now that we have consumed it. This is done simply for
92111
// ease of debugging for the unsupported (and unlikely) scenario where WrrLocalityConfig has
93112
// another wrr_locality as the child policy. The missing locality weight attribute would make
94113
// the child wrr_locality fail early.
95114
resolvedAddresses = resolvedAddresses.toBuilder()
96115
.setAttributes(resolvedAddresses.getAttributes().toBuilder()
97-
.discard(InternalXdsAttributes.ATTR_LOCALITY_WEIGHTS).build()).build();
116+
.discard(InternalXdsAttributes.ATTR_LOCALITY_WEIGHT).build()).build();
98117

99118
switchLb.switchTo(lbRegistry.getProvider(WEIGHTED_TARGET_POLICY_NAME));
100119
switchLb.handleResolvedAddresses(

xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -327,8 +327,8 @@ public void edsClustersWithLeastRequestEndpointLbPolicy() {
327327
"least_request_experimental");
328328

329329
assertThat(
330-
childBalancer.attributes.get(InternalXdsAttributes.ATTR_LOCALITY_WEIGHTS)).containsEntry(
331-
locality1, 100);
330+
childBalancer.addresses.get(0).getAttributes()
331+
.get(InternalXdsAttributes.ATTR_LOCALITY_WEIGHT)).isEqualTo(100);
332332
}
333333

334334
@Test
@@ -410,8 +410,8 @@ public void edsClustersWithOutlierDetection() {
410410
"least_request_experimental");
411411

412412
assertThat(
413-
childBalancer.attributes.get(InternalXdsAttributes.ATTR_LOCALITY_WEIGHTS)).containsEntry(
414-
locality1, 100);
413+
childBalancer.addresses.get(0).getAttributes()
414+
.get(InternalXdsAttributes.ATTR_LOCALITY_WEIGHT)).isEqualTo(100);
415415
}
416416

417417

@@ -507,11 +507,20 @@ public void onlyEdsClusters_receivedEndpoints() {
507507
assertThat(wrrLocalityConfig3.childPolicy.getProvider().getPolicyName()).isEqualTo(
508508
"round_robin");
509509

510-
Map<Locality, Integer> localityWeights = childBalancer.attributes.get(
511-
InternalXdsAttributes.ATTR_LOCALITY_WEIGHTS);
512-
assertThat(localityWeights).containsEntry(locality1, 70);
513-
assertThat(localityWeights).containsEntry(locality2, 10);
514-
assertThat(localityWeights).containsEntry(locality3, 20);
510+
for (EquivalentAddressGroup eag : childBalancer.addresses) {
511+
if (eag.getAttributes().get(InternalXdsAttributes.ATTR_LOCALITY) == locality1) {
512+
assertThat(eag.getAttributes().get(InternalXdsAttributes.ATTR_LOCALITY_WEIGHT))
513+
.isEqualTo(70);
514+
}
515+
if (eag.getAttributes().get(InternalXdsAttributes.ATTR_LOCALITY) == locality2) {
516+
assertThat(eag.getAttributes().get(InternalXdsAttributes.ATTR_LOCALITY_WEIGHT))
517+
.isEqualTo(10);
518+
}
519+
if (eag.getAttributes().get(InternalXdsAttributes.ATTR_LOCALITY) == locality3) {
520+
assertThat(eag.getAttributes().get(InternalXdsAttributes.ATTR_LOCALITY_WEIGHT))
521+
.isEqualTo(20);
522+
}
523+
}
515524
}
516525

517526
@SuppressWarnings("unchecked")
@@ -682,14 +691,15 @@ public void handleEdsResource_ignoreLocalitiesWithNoHealthyEndpoints() {
682691
LocalityLbEndpoints.create(
683692
Collections.singletonList(LbEndpoint.create(endpoint2, 100, true /* isHealthy */)),
684693
10 /* localityWeight */, 1 /* priority */);
694+
String priority = CLUSTER1 + "[priority1]";
685695
xdsClient.deliverClusterLoadAssignment(
686696
EDS_SERVICE_NAME1,
687697
ImmutableMap.of(locality1, localityLbEndpoints1, locality2, localityLbEndpoints2));
688698

689699
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
690-
Map<Locality, Integer> localityWeights = childBalancer.attributes.get(
691-
InternalXdsAttributes.ATTR_LOCALITY_WEIGHTS);
692-
assertThat(localityWeights.keySet()).containsExactly(locality2);
700+
for (EquivalentAddressGroup eag : childBalancer.addresses) {
701+
assertThat(eag.getAttributes().get(InternalXdsAttributes.ATTR_LOCALITY)).isEqualTo(locality2);
702+
}
693703
}
694704

695705
@Test

xds/src/test/java/io/grpc/xds/WrrLocalityLoadBalancerTest.java

Lines changed: 67 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import static org.mockito.Mockito.when;
2626

2727
import com.google.common.collect.ImmutableList;
28-
import com.google.common.collect.ImmutableMap;
2928
import com.google.common.testing.EqualsTester;
3029
import io.grpc.Attributes;
3130
import io.grpc.ConnectivityState;
@@ -44,7 +43,9 @@
4443
import io.grpc.xds.WrrLocalityLoadBalancer.WrrLocalityConfig;
4544
import io.grpc.xds.XdsSubchannelPickers.ErrorPicker;
4645
import java.net.SocketAddress;
47-
import java.util.Map;
46+
import java.util.Collections;
47+
import java.util.List;
48+
import java.util.Objects;
4849
import org.junit.Before;
4950
import org.junit.Rule;
5051
import org.junit.Test;
@@ -124,8 +125,10 @@ public void handleResolvedAddresses() {
124125
// The child config is delivered wrapped in the wrr_locality config and the locality weights
125126
// in a ResolvedAddresses attribute.
126127
WrrLocalityConfig wlConfig = new WrrLocalityConfig(childPolicy);
127-
Map<Locality, Integer> localityWeights = ImmutableMap.of(localityOne, 1, localityTwo, 2);
128-
deliverAddresses(wlConfig, localityWeights);
128+
deliverAddresses(wlConfig,
129+
ImmutableList.of(
130+
makeAddress("addr1", localityOne, 1),
131+
makeAddress("addr2", localityTwo, 2)));
129132

130133
// Assert that the child policy and the locality weights were correctly mapped to a
131134
// WeightedTargetConfig.
@@ -148,7 +151,8 @@ public void handleResolvedAddresses_noLocalityWeights() {
148151
// The child config is delivered wrapped in the wrr_locality config and the locality weights
149152
// in a ResolvedAddresses attribute.
150153
WrrLocalityConfig wlConfig = new WrrLocalityConfig(childPolicy);
151-
deliverAddresses(wlConfig, null);
154+
deliverAddresses(wlConfig, ImmutableList.of(
155+
makeAddress("addr", Locality.create("test-region", "test-zone", "test-subzone"), null)));
152156

153157
// With no locality weights, we should get a TRANSIENT_FAILURE.
154158
verify(mockHelper).getAuthority();
@@ -170,8 +174,8 @@ public void handleNameResolutionError_noChildLb() {
170174
@Test
171175
public void handleNameResolutionError_withChildLb() {
172176
deliverAddresses(new WrrLocalityConfig(new PolicySelection(mockChildProvider, null)),
173-
ImmutableMap.of(
174-
Locality.create("region", "zone", "subzone"), 1));
177+
ImmutableList.of(
178+
makeAddress("addr1", Locality.create("test-region1", "test-zone", "test-subzone"), 1)));
175179
loadBalancer.handleNameResolutionError(Status.DEADLINE_EXCEEDED);
176180

177181
verify(mockHelper, never()).updateBalancingState(isA(ConnectivityState.class),
@@ -185,21 +189,22 @@ public void localityWeightAttributeNotPropagated() {
185189
PolicySelection childPolicy = new PolicySelection(mockChildProvider, null);
186190

187191
WrrLocalityConfig wlConfig = new WrrLocalityConfig(childPolicy);
188-
Map<Locality, Integer> localityWeights = ImmutableMap.of(locality, 1);
189-
deliverAddresses(wlConfig, localityWeights);
192+
deliverAddresses(wlConfig, ImmutableList.of(
193+
makeAddress("addr1", Locality.create("test-region1", "test-zone", "test-subzone"), 1)));
190194

191195
// Assert that the child policy and the locality weights were correctly mapped to a
192196
// WeightedTargetConfig.
193197
verify(mockWeightedTargetLb).handleResolvedAddresses(resolvedAddressesCaptor.capture());
194-
assertThat(resolvedAddressesCaptor.getValue().getAttributes()
195-
.get(InternalXdsAttributes.ATTR_LOCALITY_WEIGHTS)).isNull();
198+
199+
//assertThat(resolvedAddressesCaptor.getValue().getAttributes()
200+
// .get(InternalXdsAttributes.ATTR_LOCALITY_WEIGHTS)).isNull();
196201
}
197202

198203
@Test
199204
public void shutdown() {
200205
deliverAddresses(new WrrLocalityConfig(new PolicySelection(mockChildProvider, null)),
201-
ImmutableMap.of(
202-
Locality.create("region", "zone", "subzone"), 1));
206+
ImmutableList.of(
207+
makeAddress("addr", Locality.create("test-region", "test-zone", "test-subzone"), 1)));
203208
loadBalancer.shutdown();
204209

205210
verify(mockWeightedTargetLb).shutdown();
@@ -218,11 +223,55 @@ public void configEquality() {
218223
.testEquals();
219224
}
220225

221-
private void deliverAddresses(WrrLocalityConfig config, Map<Locality, Integer> localityWeights) {
226+
private void deliverAddresses(WrrLocalityConfig config, List<EquivalentAddressGroup> addresses) {
222227
loadBalancer.handleResolvedAddresses(
223-
ResolvedAddresses.newBuilder().setAddresses(ImmutableList.of(eag)).setAttributes(
224-
Attributes.newBuilder()
225-
.set(InternalXdsAttributes.ATTR_LOCALITY_WEIGHTS, localityWeights).build())
226-
.setLoadBalancingPolicyConfig(config).build());
228+
ResolvedAddresses.newBuilder().setAddresses(addresses).setLoadBalancingPolicyConfig(config)
229+
.build());
230+
}
231+
232+
/**
233+
* Create a locality-labeled address.
234+
*/
235+
private static EquivalentAddressGroup makeAddress(final String name, Locality locality,
236+
Integer localityWeight) {
237+
class FakeSocketAddress extends SocketAddress {
238+
private final String name;
239+
240+
private FakeSocketAddress(String name) {
241+
this.name = name;
242+
}
243+
244+
@Override
245+
public int hashCode() {
246+
return Objects.hash(name);
247+
}
248+
249+
@Override
250+
public boolean equals(Object o) {
251+
if (this == o) {
252+
return true;
253+
}
254+
if (!(o instanceof FakeSocketAddress)) {
255+
return false;
256+
}
257+
FakeSocketAddress that = (FakeSocketAddress) o;
258+
return Objects.equals(name, that.name);
259+
}
260+
261+
@Override
262+
public String toString() {
263+
return name;
264+
}
265+
}
266+
267+
Attributes.Builder attrBuilder = Attributes.newBuilder()
268+
.set(InternalXdsAttributes.ATTR_LOCALITY, locality);
269+
if (localityWeight != null) {
270+
attrBuilder.set(InternalXdsAttributes.ATTR_LOCALITY_WEIGHT, localityWeight);
271+
}
272+
273+
EquivalentAddressGroup eag = new EquivalentAddressGroup(new FakeSocketAddress(name),
274+
attrBuilder.build());
275+
return AddressFilter.setPathFilter(eag, Collections.singletonList(locality.toString()));
227276
}
228277
}

0 commit comments

Comments
 (0)