Skip to content

Commit bf5b7ae

Browse files
authored
clusterresolver: handle EDS nacks and resource-not-found errors correctly (#6436)
1 parent fc0aa46 commit bf5b7ae

File tree

5 files changed

+571
-15
lines changed

5 files changed

+571
-15
lines changed

xds/internal/balancer/clusterresolver/clusterresolver.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ func (bb) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Bal
8585
b.logger = prefixLogger(b)
8686
b.logger.Infof("Created")
8787

88-
b.resourceWatcher = newResourceResolver(b)
88+
b.resourceWatcher = newResourceResolver(b, b.logger)
8989
b.cc = &ccWrapper{
9090
ClientConn: cc,
9191
resourceWatcher: b.resourceWatcher,

xds/internal/balancer/clusterresolver/e2e_test/aggregate_cluster_test.go

Lines changed: 294 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,21 @@ import (
2727
"github.com/google/go-cmp/cmp"
2828
"google.golang.org/grpc"
2929
"google.golang.org/grpc/codes"
30+
"google.golang.org/grpc/credentials/insecure"
31+
"google.golang.org/grpc/internal"
32+
"google.golang.org/grpc/internal/stubserver"
3033
"google.golang.org/grpc/internal/testutils"
3134
"google.golang.org/grpc/internal/testutils/xds/e2e"
3235
"google.golang.org/grpc/peer"
3336
"google.golang.org/grpc/resolver"
3437
"google.golang.org/grpc/resolver/manual"
38+
"google.golang.org/grpc/serviceconfig"
3539
"google.golang.org/grpc/status"
40+
xdstestutils "google.golang.org/grpc/xds/internal/testutils"
41+
"google.golang.org/grpc/xds/internal/xdsclient"
42+
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
3643
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version"
44+
"google.golang.org/protobuf/types/known/wrapperspb"
3745

3846
v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
3947
v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
@@ -771,3 +779,289 @@ func (s) TestAggregateCluster_BadEDS_BadDNS(t *testing.T) {
771779
t.Fatalf("EmptyCall() failed with error %v, want %v", err, dnsErr)
772780
}
773781
}
782+
783+
// TestAggregateCluster_NoFallback_EDSNackedWithPreviousGoodUpdate tests the
784+
// scenario where the top-level cluster is an aggregate cluster that resolves to
785+
// an EDS and LOGICAL_DNS cluster. The management server first sends a good EDS
786+
// response for the EDS cluster and the test verifies that RPCs get routed to
787+
// the EDS cluster. The management server then sends a bad EDS response. The
788+
// test verifies that the cluster_resolver LB policy continues to use the
789+
// previously received good update and that RPCs still get routed to the EDS
790+
// cluster.
791+
func (s) TestAggregateCluster_NoFallback_EDSNackedWithPreviousGoodUpdate(t *testing.T) {
792+
dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS()
793+
defer cleanup1()
794+
795+
// Start an xDS management server.
796+
mgmtServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
797+
defer cleanup2()
798+
799+
// Start two test backends and extract their host and port. The first
800+
// backend is used for the EDS cluster and the second backend is used for
801+
// the LOGICAL_DNS cluster.
802+
servers, cleanup3 := startTestServiceBackends(t, 2)
803+
defer cleanup3()
804+
addrs, ports := backendAddressesAndPorts(t, servers)
805+
806+
// Configure an aggregate cluster pointing to an EDS and DNS cluster. Also
807+
// configure an endpoints resource for the EDS cluster.
808+
const (
809+
edsClusterName = clusterName + "-eds"
810+
dnsClusterName = clusterName + "-dns"
811+
dnsHostName = "dns_host"
812+
dnsPort = uint32(8080)
813+
)
814+
resources := e2e.UpdateOptions{
815+
NodeID: nodeID,
816+
Clusters: []*v3clusterpb.Cluster{
817+
makeAggregateClusterResource(clusterName, []string{edsClusterName, dnsClusterName}),
818+
e2e.DefaultCluster(edsClusterName, "", e2e.SecurityLevelNone),
819+
makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort),
820+
},
821+
Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(edsClusterName, "localhost", []uint32{uint32(ports[0])})},
822+
SkipValidation: true,
823+
}
824+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
825+
defer cancel()
826+
if err := mgmtServer.Update(ctx, resources); err != nil {
827+
t.Fatal(err)
828+
}
829+
830+
// Create xDS client, configure cds_experimental LB policy with a manual
831+
// resolver, and dial the test backends.
832+
cc, cleanup := setupAndDial(t, bootstrapContents)
833+
defer cleanup()
834+
835+
// Ensure that the DNS resolver is started for the expected target.
836+
select {
837+
case <-ctx.Done():
838+
t.Fatal("Timeout when waiting for DNS resolver to be started")
839+
case target := <-dnsTargetCh:
840+
got, want := target.Endpoint(), fmt.Sprintf("%s:%d", dnsHostName, dnsPort)
841+
if got != want {
842+
t.Fatalf("DNS resolution started for target %q, want %q", got, want)
843+
}
844+
}
845+
846+
// Update DNS resolver with test backend addresses.
847+
dnsR.UpdateState(resolver.State{Addresses: addrs[1:]})
848+
849+
// Make an RPC and ensure that it gets routed to the first backend since the
850+
// EDS cluster is of higher priority than the LOGICAL_DNS cluster.
851+
client := testgrpc.NewTestServiceClient(cc)
852+
peer := &peer.Peer{}
853+
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer), grpc.WaitForReady(true)); err != nil {
854+
t.Fatalf("EmptyCall() failed: %v", err)
855+
}
856+
if peer.Addr.String() != addrs[0].Addr {
857+
t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, addrs[0].Addr)
858+
}
859+
860+
// Push an EDS resource from the management server that is expected to be
861+
// NACKed by the xDS client. Since the cluster_resolver LB policy has a
862+
// previously received good EDS resource, it will continue to use that.
863+
resources.Endpoints[0].Endpoints[0].LbEndpoints[0].LoadBalancingWeight = &wrapperspb.UInt32Value{Value: 0}
864+
if err := mgmtServer.Update(ctx, resources); err != nil {
865+
t.Fatal(err)
866+
}
867+
868+
// Ensure that RPCs continue to get routed to the EDS cluster for the next
869+
// second.
870+
for end := time.Now().Add(time.Second); time.Now().Before(end); <-time.After(defaultTestShortTimeout) {
871+
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer)); err != nil {
872+
t.Fatalf("EmptyCall() failed: %v", err)
873+
}
874+
if peer.Addr.String() != addrs[0].Addr {
875+
t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, addrs[0].Addr)
876+
}
877+
}
878+
}
879+
880+
// TestAggregateCluster_Fallback_EDSNackedWithoutPreviousGoodUpdate tests the
881+
// scenario where the top-level cluster is an aggregate cluster that resolves to
882+
// an EDS and LOGICAL_DNS cluster. The management server sends a bad EDS
883+
// response. The test verifies that the cluster_resolver LB policy falls back to
884+
// the LOGICAL_DNS cluster, because it is supposed to treat the bad EDS response
885+
// as though it received an update with no endpoints.
886+
func (s) TestAggregateCluster_Fallback_EDSNackedWithoutPreviousGoodUpdate(t *testing.T) {
887+
dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS()
888+
defer cleanup1()
889+
890+
// Start an xDS management server.
891+
mgmtServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
892+
defer cleanup2()
893+
894+
// Start two test backends and extract their host and port. The first
895+
// backend is used for the EDS cluster and the second backend is used for
896+
// the LOGICAL_DNS cluster.
897+
servers, cleanup3 := startTestServiceBackends(t, 2)
898+
defer cleanup3()
899+
addrs, ports := backendAddressesAndPorts(t, servers)
900+
901+
// Configure an aggregate cluster pointing to an EDS and DNS cluster.
902+
const (
903+
edsClusterName = clusterName + "-eds"
904+
dnsClusterName = clusterName + "-dns"
905+
dnsHostName = "dns_host"
906+
dnsPort = uint32(8080)
907+
)
908+
resources := e2e.UpdateOptions{
909+
NodeID: nodeID,
910+
Clusters: []*v3clusterpb.Cluster{
911+
makeAggregateClusterResource(clusterName, []string{edsClusterName, dnsClusterName}),
912+
e2e.DefaultCluster(edsClusterName, "", e2e.SecurityLevelNone),
913+
makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort),
914+
},
915+
Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(edsClusterName, "localhost", []uint32{uint32(ports[0])})},
916+
SkipValidation: true,
917+
}
918+
919+
// Set a load balancing weight of 0 for the backend in the EDS resource.
920+
// This is expected to be NACKed by the xDS client. Since the
921+
// cluster_resolver LB policy has no previously received good EDS resource,
922+
// it will treat this as though it received an update with no endpoints.
923+
resources.Endpoints[0].Endpoints[0].LbEndpoints[0].LoadBalancingWeight = &wrapperspb.UInt32Value{Value: 0}
924+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
925+
defer cancel()
926+
if err := mgmtServer.Update(ctx, resources); err != nil {
927+
t.Fatal(err)
928+
}
929+
930+
// Create xDS client, configure cds_experimental LB policy with a manual
931+
// resolver, and dial the test backends.
932+
cc, cleanup := setupAndDial(t, bootstrapContents)
933+
defer cleanup()
934+
935+
// Ensure that the DNS resolver is started for the expected target.
936+
select {
937+
case <-ctx.Done():
938+
t.Fatal("Timeout when waiting for DNS resolver to be started")
939+
case target := <-dnsTargetCh:
940+
got, want := target.Endpoint(), fmt.Sprintf("%s:%d", dnsHostName, dnsPort)
941+
if got != want {
942+
t.Fatalf("DNS resolution started for target %q, want %q", got, want)
943+
}
944+
}
945+
946+
// Update DNS resolver with test backend addresses.
947+
dnsR.UpdateState(resolver.State{Addresses: addrs[1:]})
948+
949+
// Make an RPC and ensure that it gets routed to the LOGICAL_DNS cluster.
950+
peer := &peer.Peer{}
951+
client := testgrpc.NewTestServiceClient(cc)
952+
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer), grpc.WaitForReady(true)); err != nil {
953+
t.Fatalf("EmptyCall() failed: %v", err)
954+
}
955+
if peer.Addr.String() != addrs[1].Addr {
956+
t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, addrs[1].Addr)
957+
}
958+
}
959+
960+
// TestAggregateCluster_Fallback_EDS_ResourceNotFound tests the scenario where
961+
// the top-level cluster is an aggregate cluster that resolves to an EDS and
962+
// LOGICAL_DNS cluster. The management server does not respond with the EDS
963+
// cluster. The test verifies that the cluster_resolver LB policy falls back to
964+
// the LOGICAL_DNS cluster in this case.
965+
func (s) TestAggregateCluster_Fallback_EDS_ResourceNotFound(t *testing.T) {
966+
dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS()
967+
defer cleanup1()
968+
969+
// Start an xDS management server.
970+
mgmtServer, nodeID, _, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
971+
defer cleanup2()
972+
973+
// Start a test backend for the LOGICAL_DNS cluster.
974+
server := stubserver.StartTestService(t, nil)
975+
defer server.Stop()
976+
977+
// Configure an aggregate cluster pointing to an EDS and DNS cluster. No
978+
// endpoints are configured for the EDS cluster.
979+
const (
980+
edsClusterName = clusterName + "-eds"
981+
dnsClusterName = clusterName + "-dns"
982+
dnsHostName = "dns_host"
983+
dnsPort = uint32(8080)
984+
)
985+
resources := e2e.UpdateOptions{
986+
NodeID: nodeID,
987+
Clusters: []*v3clusterpb.Cluster{
988+
makeAggregateClusterResource(clusterName, []string{edsClusterName, dnsClusterName}),
989+
e2e.DefaultCluster(edsClusterName, "", e2e.SecurityLevelNone),
990+
makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort),
991+
},
992+
SkipValidation: true,
993+
}
994+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
995+
defer cancel()
996+
if err := mgmtServer.Update(ctx, resources); err != nil {
997+
t.Fatal(err)
998+
}
999+
1000+
// Create an xDS client talking to the above management server, configured
1001+
// with a short watch expiry timeout.
1002+
xdsClient, close, err := xdsclient.NewWithConfigForTesting(&bootstrap.Config{
1003+
XDSServer: xdstestutils.ServerConfigForAddress(t, mgmtServer.Address),
1004+
NodeProto: &v3corepb.Node{Id: nodeID},
1005+
}, defaultTestWatchExpiryTimeout, time.Duration(0))
1006+
if err != nil {
1007+
t.Fatalf("failed to create xds client: %v", err)
1008+
}
1009+
defer close()
1010+
1011+
// Create a manual resolver and push a service config specifying the use of
1012+
// the cds LB policy as the top-level LB policy, and a corresponding config
1013+
// with a single cluster.
1014+
r := manual.NewBuilderWithScheme("whatever")
1015+
jsonSC := fmt.Sprintf(`{
1016+
"loadBalancingConfig":[{
1017+
"cds_experimental":{
1018+
"cluster": "%s"
1019+
}
1020+
}]
1021+
}`, clusterName)
1022+
scpr := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(jsonSC)
1023+
r.InitialState(xdsclient.SetClient(resolver.State{ServiceConfig: scpr}, xdsClient))
1024+
1025+
// Create a ClientConn.
1026+
cc, err := grpc.Dial(r.Scheme()+":///test.service", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
1027+
if err != nil {
1028+
t.Fatalf("failed to dial local test server: %v", err)
1029+
}
1030+
defer cc.Close()
1031+
1032+
// Make an RPC with a short deadline. We expect this RPC to not succeed
1033+
// because the DNS resolver has not responded with endpoint addresses.
1034+
client := testgrpc.NewTestServiceClient(cc)
1035+
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
1036+
defer sCancel()
1037+
if _, err := client.EmptyCall(sCtx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded {
1038+
t.Fatalf("EmptyCall() code %s, want %s", status.Code(err), codes.DeadlineExceeded)
1039+
}
1040+
1041+
// Ensure that the DNS resolver is started for the expected target.
1042+
select {
1043+
case <-ctx.Done():
1044+
t.Fatal("Timeout when waiting for DNS resolver to be started")
1045+
case target := <-dnsTargetCh:
1046+
got, want := target.Endpoint(), fmt.Sprintf("%s:%d", dnsHostName, dnsPort)
1047+
if got != want {
1048+
t.Fatalf("DNS resolution started for target %q, want %q", got, want)
1049+
}
1050+
}
1051+
1052+
// Update DNS resolver with test backend addresses.
1053+
dnsR.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: server.Address}}})
1054+
1055+
// Make an RPC and ensure that it gets routed to the LOGICAL_DNS cluster.
1056+
// Even though the EDS cluster is of higher priority, since the management
1057+
// server does not respond with an EDS resource, the cluster_resolver LB
1058+
// policy is expected to fallback to the LOGICAL_DNS cluster once the watch
1059+
// timeout expires.
1060+
peer := &peer.Peer{}
1061+
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer), grpc.WaitForReady(true)); err != nil {
1062+
t.Fatalf("EmptyCall() failed: %v", err)
1063+
}
1064+
if peer.Addr.String() != server.Address {
1065+
t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, server.Address)
1066+
}
1067+
}

0 commit comments

Comments
 (0)