Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 17 additions & 6 deletions xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ final class WeightedRoundRobinLoadBalancer extends RoundRobinLoadBalancer {
private final AtomicInteger sequence;
private final long infTime;
private final Ticker ticker;
private String locality = "";

// The metric instruments are only registered once and shared by all instances of this LB.
static {
Expand Down Expand Up @@ -147,6 +148,12 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
handleNameResolutionError(unavailableStatus);
return unavailableStatus;
}
String locality = resolvedAddresses.getAttributes().get(WeightedTargetLoadBalancer.CHILD_NAME);
if (locality != null) {
this.locality = locality;
} else {
this.locality = "";
}
config =
(WeightedRoundRobinLoadBalancerConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
AcceptResolvedAddrRetVal acceptRetVal;
Expand Down Expand Up @@ -179,7 +186,8 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
@Override
public SubchannelPicker createReadyPicker(Collection<ChildLbState> activeList) {
return new WeightedRoundRobinPicker(ImmutableList.copyOf(activeList),
config.enableOobLoadReport, config.errorUtilizationPenalty, sequence, getHelper());
config.enableOobLoadReport, config.errorUtilizationPenalty, sequence, getHelper(),
locality);
}

@VisibleForTesting
Expand Down Expand Up @@ -373,10 +381,12 @@ static final class WeightedRoundRobinPicker extends SubchannelPicker {
private final AtomicInteger sequence;
private final int hashCode;
private final LoadBalancer.Helper helper;
private final String locality;
private volatile StaticStrideScheduler scheduler;

WeightedRoundRobinPicker(List<ChildLbState> children, boolean enableOobLoadReport,
float errorUtilizationPenalty, AtomicInteger sequence, LoadBalancer.Helper helper) {
float errorUtilizationPenalty, AtomicInteger sequence, LoadBalancer.Helper helper,
String locality) {
checkNotNull(children, "children");
Preconditions.checkArgument(!children.isEmpty(), "empty child list");
this.children = children;
Expand All @@ -391,6 +401,7 @@ static final class WeightedRoundRobinPicker extends SubchannelPicker {
this.errorUtilizationPenalty = errorUtilizationPenalty;
this.sequence = checkNotNull(sequence, "sequence");
this.helper = helper;
this.locality = checkNotNull(locality, "locality");

// For equality we treat children as a set; use hash code as defined by Set
int sum = 0;
Expand Down Expand Up @@ -434,29 +445,29 @@ private void updateWeight() {
helper.getMetricRecorder()
.recordDoubleHistogram(ENDPOINT_WEIGHTS_HISTOGRAM, newWeight,
ImmutableList.of(helper.getChannelTarget()),
ImmutableList.of(""));
ImmutableList.of(locality));
newWeights[i] = newWeight > 0 ? (float) newWeight : 0.0f;
}
if (staleEndpoints.get() > 0) {
// TODO: add locality label once available
helper.getMetricRecorder()
.addLongCounter(ENDPOINT_WEIGHT_STALE_COUNTER, staleEndpoints.get(),
ImmutableList.of(helper.getChannelTarget()),
ImmutableList.of(""));
ImmutableList.of(locality));
}
if (notYetUsableEndpoints.get() > 0) {
// TODO: add locality label once available
helper.getMetricRecorder()
.addLongCounter(ENDPOINT_WEIGHT_NOT_YET_USEABLE_COUNTER, notYetUsableEndpoints.get(),
ImmutableList.of(helper.getChannelTarget()), ImmutableList.of(""));
ImmutableList.of(helper.getChannelTarget()), ImmutableList.of(locality));
}

this.scheduler = new StaticStrideScheduler(newWeights, sequence);
if (this.scheduler.usesRoundRobin()) {
// TODO: locality label once available
helper.getMetricRecorder()
.addLongCounter(RR_FALLBACK_COUNTER, 1, ImmutableList.of(helper.getChannelTarget()),
ImmutableList.of(""));
ImmutableList.of(locality));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ public void uncaughtException(Thread t, Throwable e) {
});

private String channelTarget = "channel-target";
private String locality = "locality";

public WeightedRoundRobinLoadBalancerTest() {
testHelperInstance = new TestHelper();
Expand Down Expand Up @@ -1135,9 +1136,11 @@ public void removingAddressShutsdownSubchannel() {
@Test
public void metrics() {
// Give WRR some valid addresses to work with.
Attributes attributesWithLocality = Attributes.newBuilder()
.set(WeightedTargetLoadBalancer.CHILD_NAME, locality).build();
syncContext.execute(() -> wrr.acceptResolvedAddresses(ResolvedAddresses.newBuilder()
.setAddresses(servers).setLoadBalancingPolicyConfig(weightedConfig)
.setAttributes(affinity).build()));
.setAttributes(attributesWithLocality).build()));

// Flip the three subchannels to READY state to initiate the WRR logic
Iterator<Subchannel> it = subchannels.values().iterator();
Expand Down Expand Up @@ -1240,7 +1243,7 @@ private void verifyLongCounterRecord(String name, int times, long value) {
public boolean matches(LongCounterMetricInstrument longCounterInstrument) {
return longCounterInstrument.getName().equals(name);
}
}), eq(value), eq(Lists.newArrayList(channelTarget)), eq(Lists.newArrayList("")));
}), eq(value), eq(Lists.newArrayList(channelTarget)), eq(Lists.newArrayList(locality)));
}

// Verifies that the MetricRecorder has been called to record a given double histogram value the
Expand All @@ -1252,7 +1255,7 @@ private void verifyDoubleHistogramRecord(String name, int times, double value) {
public boolean matches(DoubleHistogramMetricInstrument doubleHistogramInstrument) {
return doubleHistogramInstrument.getName().equals(name);
}
}), eq(value), eq(Lists.newArrayList(channelTarget)), eq(Lists.newArrayList("")));
}), eq(value), eq(Lists.newArrayList(channelTarget)), eq(Lists.newArrayList(locality)));
}

private int getNumFilteredPendingTasks() {
Expand Down