Skip to content

Commit 077e2c3

Browse files
Pranjali-2501easwarsarjan-balpurnesh42Hdfawley
authored
Cherry-pick #8411, #8419, #8422, #8445 and #8451 to v1.74.x (#8454)
* xdsclient: preserve original bytes for decoding when the resource is wrapped (#8411) * xds: Avoid error logs when setting fallback bootstrap config (#8419) * xdsclient: relay marshalled bytes of complete resource proto to decoders (#8422) * xds: give up pool lock before closing xdsclient channel (#8445) * transport: release mutex before returning on expired deadlines in server streams (#8451) --------- Co-authored-by: Easwar Swaminathan <[email protected]> Co-authored-by: Arjan Singh Bal <[email protected]> Co-authored-by: Purnesh Dixit <[email protected]> Co-authored-by: Doug Fawley <[email protected]>
1 parent b34f845 commit 077e2c3

File tree

10 files changed

+122
-68
lines changed

10 files changed

+122
-68
lines changed

internal/transport/http2_server.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -602,6 +602,7 @@ func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeade
602602
}
603603

604604
if s.ctx.Err() != nil {
605+
t.mu.Unlock()
605606
// Early abort in case the timeout was zero or so low it already fired.
606607
t.controlBuf.put(&earlyAbortStream{
607608
httpStatus: http.StatusOK,

internal/xds/bootstrap/bootstrap.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -576,6 +576,9 @@ func (c *Config) UnmarshalJSON(data []byte) error {
576576
// the presence of the errors) and may return a Config object with certain
577577
// fields left unspecified, in which case the caller should use some sane
578578
// defaults.
579+
//
580+
// This function returns an error if it's unable to parse the contents of the
581+
// bootstrap config. It returns (nil, nil) if none of the env vars are set.
579582
func GetConfiguration() (*Config, error) {
580583
fName := envconfig.XDSBootstrapFileName
581584
fContent := envconfig.XDSBootstrapFileContent
@@ -598,7 +601,7 @@ func GetConfiguration() (*Config, error) {
598601
return NewConfigFromContents([]byte(fContent))
599602
}
600603

601-
return nil, fmt.Errorf("bootstrap environment variables (%q or %q) not defined", envconfig.XDSBootstrapFileNameEnv, envconfig.XDSBootstrapFileContentEnv)
604+
return nil, nil
602605
}
603606

604607
// NewConfigFromContents creates a new bootstrap configuration from the provided

internal/xds/bootstrap/bootstrap_test.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -383,12 +383,19 @@ func (s) TestGetConfiguration_Failure(t *testing.T) {
383383
cancel := setupBootstrapOverride(bootstrapFileMap)
384384
defer cancel()
385385

386-
for _, name := range []string{"nonExistentBootstrapFile", "empty", "badJSON", "noBalancerName", "emptyXdsServer"} {
386+
for _, name := range []string{"nonExistentBootstrapFile", "badJSON", "noBalancerName", "emptyXdsServer"} {
387387
t.Run(name, func(t *testing.T) {
388388
testGetConfigurationWithFileNameEnv(t, name, true, nil)
389389
testGetConfigurationWithFileContentEnv(t, name, true, nil)
390390
})
391391
}
392+
const name = "empty"
393+
t.Run(name, func(t *testing.T) {
394+
testGetConfigurationWithFileNameEnv(t, name, true, nil)
395+
// If both the env vars are empty, a nil config with a nil error must be
396+
// returned.
397+
testGetConfigurationWithFileContentEnv(t, name, false, nil)
398+
})
392399
}
393400

394401
// Tests the functionality in GetConfiguration with different bootstrap file
@@ -462,9 +469,9 @@ func (s) TestGetConfiguration_BootstrapEnvPriority(t *testing.T) {
462469
envconfig.XDSBootstrapFileContent = ""
463470
defer func() { envconfig.XDSBootstrapFileContent = origBootstrapContent }()
464471

465-
// When both env variables are empty, GetConfiguration should fail.
466-
if _, err := GetConfiguration(); err == nil {
467-
t.Errorf("GetConfiguration() returned nil error, expected to fail")
472+
// When both env variables are empty, GetConfiguration should return nil.
473+
if cfg, err := GetConfiguration(); err != nil || cfg != nil {
474+
t.Errorf("GetConfiguration() returned (%v, %v), want (<nil>, <nil>)", cfg, err)
468475
}
469476

470477
// When one of them is set, it should be used.

xds/internal/clients/xdsclient/channel.go

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -253,16 +253,10 @@ func decodeResponse(opts *DecodeOptions, rType *ResourceType, resp response) (ma
253253
perResourceErrors := make(map[string]error) // Tracks resource validation errors, where we have a resource name.
254254
ret := make(map[string]dataAndErrTuple) // Return result, a map from resource name to either resource data or error.
255255
for _, r := range resp.resources {
256-
r, err := xdsresource.UnwrapResource(r)
257-
if err != nil {
258-
topLevelErrors = append(topLevelErrors, err)
259-
continue
260-
}
261-
if _, ok := opts.Config.ResourceTypes[r.TypeUrl]; !ok || r.TypeUrl != resp.typeURL {
262-
topLevelErrors = append(topLevelErrors, xdsresource.NewErrorf(xdsresource.ErrorTypeResourceTypeUnsupported, "unexpected resource type: %q ", r.GetTypeUrl()))
263-
continue
264-
}
265-
result, err := rType.Decoder.Decode(r.GetValue(), *opts)
256+
result, err := rType.Decoder.Decode(AnyProto{
257+
TypeURL: r.GetTypeUrl(),
258+
Value: r.GetValue(),
259+
}, *opts)
266260

267261
// Name field of the result is left unpopulated only when resource
268262
// deserialization fails.

xds/internal/clients/xdsclient/helpers_test.go

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"google.golang.org/grpc/xds/internal/clients/internal/pretty"
3131
"google.golang.org/grpc/xds/internal/clients/xdsclient/internal/xdsresource"
3232
"google.golang.org/protobuf/proto"
33+
"google.golang.org/protobuf/types/known/anypb"
3334

3435
v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
3536
v3httppb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"
@@ -62,17 +63,24 @@ var (
6263
}
6364
)
6465

65-
func unmarshalListenerResource(r []byte) (string, listenerUpdate, error) {
66+
func unmarshalListenerResource(rProto *anypb.Any) (string, listenerUpdate, error) {
67+
rProto, err := xdsresource.UnwrapResource(rProto)
68+
if err != nil {
69+
return "", listenerUpdate{}, fmt.Errorf("failed to unwrap resource: %v", err)
70+
}
71+
if !xdsresource.IsListenerResource(rProto.GetTypeUrl()) {
72+
return "", listenerUpdate{}, fmt.Errorf("unexpected listener resource type: %q", rProto.GetTypeUrl())
73+
}
6674
lis := &v3listenerpb.Listener{}
67-
if err := proto.Unmarshal(r, lis); err != nil {
75+
if err := proto.Unmarshal(rProto.GetValue(), lis); err != nil {
6876
return "", listenerUpdate{}, fmt.Errorf("failed to unmarshal resource: %v", err)
6977
}
7078

7179
lu, err := processListener(lis)
7280
if err != nil {
7381
return lis.GetName(), listenerUpdate{}, err
7482
}
75-
lu.Raw = r
83+
lu.Raw = rProto.GetValue()
7684
return lis.GetName(), *lu, nil
7785
}
7886

@@ -154,8 +162,12 @@ type listenerDecoder struct{}
154162

155163
// Decode deserializes and validates an xDS resource serialized inside the
156164
// provided `Any` proto, as received from the xDS management server.
157-
func (listenerDecoder) Decode(resource []byte, _ DecodeOptions) (*DecodeResult, error) {
158-
name, listener, err := unmarshalListenerResource(resource)
165+
func (listenerDecoder) Decode(resource AnyProto, _ DecodeOptions) (*DecodeResult, error) {
166+
rProto := &anypb.Any{
167+
TypeUrl: resource.TypeURL,
168+
Value: resource.Value,
169+
}
170+
name, listener, err := unmarshalListenerResource(rProto)
159171
switch {
160172
case name == "":
161173
// Name is unset only when protobuf deserialization fails.

xds/internal/clients/xdsclient/resource_type.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,21 @@ type Decoder interface {
5151
// Decode deserializes and validates an xDS resource as received from the
5252
// xDS management server.
5353
//
54-
// If deserialization fails or resource validation fails, it returns a
55-
// non-nil error. Otherwise, returns a fully populated DecodeResult.
56-
Decode(resource []byte, options DecodeOptions) (*DecodeResult, error)
54+
// The `resource` parameter may contain a value of the serialized wrapped
55+
// resource (i.e. with the type URL
56+
// `type.googleapis.com/envoy.service.discovery.v3.Resource`).
57+
// Implementations are responsible for unwrapping the underlying resource if
58+
// it is wrapped.
59+
//
60+
// If unmarshalling or validation fails, it returns a non-nil error.
61+
// Otherwise, returns a fully populated DecodeResult.
62+
Decode(resource AnyProto, options DecodeOptions) (*DecodeResult, error)
63+
}
64+
65+
// AnyProto contains the type URL and serialized proto data of an xDS resource.
66+
type AnyProto struct {
67+
TypeURL string
68+
Value []byte
5769
}
5870

5971
// DecodeOptions wraps the options required by ResourceType implementations for

xds/internal/clients/xdsclient/test/helpers_test.go

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"google.golang.org/grpc/xds/internal/clients/xdsclient"
3535
"google.golang.org/grpc/xds/internal/clients/xdsclient/internal/xdsresource"
3636
"google.golang.org/protobuf/proto"
37+
"google.golang.org/protobuf/types/known/anypb"
3738

3839
v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
3940
v3httppb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"
@@ -73,17 +74,24 @@ var (
7374
}
7475
)
7576

76-
func unmarshalListenerResource(r []byte) (string, listenerUpdate, error) {
77+
func unmarshalListenerResource(rProto *anypb.Any) (string, listenerUpdate, error) {
78+
rProto, err := xdsresource.UnwrapResource(rProto)
79+
if err != nil {
80+
return "", listenerUpdate{}, fmt.Errorf("failed to unwrap resource: %v", err)
81+
}
82+
if !xdsresource.IsListenerResource(rProto.GetTypeUrl()) {
83+
return "", listenerUpdate{}, fmt.Errorf("unexpected listener resource type: %q", rProto.GetTypeUrl())
84+
}
7785
lis := &v3listenerpb.Listener{}
78-
if err := proto.Unmarshal(r, lis); err != nil {
86+
if err := proto.Unmarshal(rProto.GetValue(), lis); err != nil {
7987
return "", listenerUpdate{}, fmt.Errorf("failed to unmarshal resource: %v", err)
8088
}
8189

8290
lu, err := processListener(lis)
8391
if err != nil {
8492
return lis.GetName(), listenerUpdate{}, err
8593
}
86-
lu.Raw = r
94+
lu.Raw = rProto.GetValue()
8795
return lis.GetName(), *lu, nil
8896
}
8997

@@ -165,8 +173,12 @@ type listenerDecoder struct{}
165173

166174
// Decode deserializes and validates an xDS resource serialized inside the
167175
// provided `Any` proto, as received from the xDS management server.
168-
func (listenerDecoder) Decode(resource []byte, _ xdsclient.DecodeOptions) (*xdsclient.DecodeResult, error) {
169-
name, listener, err := unmarshalListenerResource(resource)
176+
func (listenerDecoder) Decode(resource xdsclient.AnyProto, _ xdsclient.DecodeOptions) (*xdsclient.DecodeResult, error) {
177+
rProto := &anypb.Any{
178+
TypeUrl: resource.TypeURL,
179+
Value: resource.Value,
180+
}
181+
name, listener, err := unmarshalListenerResource(rProto)
170182
switch {
171183
case name == "":
172184
// Name is unset only when protobuf deserialization fails.

xds/internal/xdsclient/pool.go

Lines changed: 43 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525

2626
v3statuspb "github.com/envoyproxy/go-control-plane/envoy/service/status/v3"
2727
estats "google.golang.org/grpc/experimental/stats"
28+
"google.golang.org/grpc/internal/envconfig"
2829
istats "google.golang.org/grpc/internal/stats"
2930
"google.golang.org/grpc/internal/xds/bootstrap"
3031
"google.golang.org/protobuf/proto"
@@ -34,7 +35,10 @@ var (
3435
// DefaultPool is the default pool for xDS clients. It is created at init
3536
// time and reads bootstrap configuration from env vars to create the xDS
3637
// client.
37-
DefaultPool = &Pool{clients: make(map[string]*clientImpl)}
38+
DefaultPool = &Pool{
39+
clients: make(map[string]*clientImpl),
40+
getConfiguration: sync.OnceValues(bootstrap.GetConfiguration),
41+
}
3842
)
3943

4044
// Pool represents a pool of xDS clients that share the same bootstrap
@@ -43,9 +47,12 @@ type Pool struct {
4347
// Note that mu should ideally only have to guard clients. But here, we need
4448
// it to guard config as well since SetFallbackBootstrapConfig writes to
4549
// config.
46-
mu sync.Mutex
47-
clients map[string]*clientImpl
48-
config *bootstrap.Config
50+
mu sync.Mutex
51+
clients map[string]*clientImpl
52+
fallbackConfig *bootstrap.Config
53+
// getConfiguration is a sync.OnceValues that attempts to read the bootstrap
54+
// configuration from environment variables once.
55+
getConfiguration func() (*bootstrap.Config, error)
4956
}
5057

5158
// OptionsForTesting contains options to configure xDS client creation for
@@ -78,7 +85,9 @@ type OptionsForTesting struct {
7885
func NewPool(config *bootstrap.Config) *Pool {
7986
return &Pool{
8087
clients: make(map[string]*clientImpl),
81-
config: config,
88+
getConfiguration: func() (*bootstrap.Config, error) {
89+
return config, nil
90+
},
8291
}
8392
}
8493

@@ -154,12 +163,7 @@ func (p *Pool) GetClientForTesting(name string) (XDSClient, func(), error) {
154163
func (p *Pool) SetFallbackBootstrapConfig(config *bootstrap.Config) {
155164
p.mu.Lock()
156165
defer p.mu.Unlock()
157-
158-
if p.config != nil {
159-
logger.Error("Attempt to set a bootstrap configuration even though one is already set via environment variables.")
160-
return
161-
}
162-
p.config = config
166+
p.fallbackConfig = config
163167
}
164168

165169
// DumpResources returns the status and contents of all xDS resources.
@@ -191,7 +195,11 @@ func (p *Pool) DumpResources() *v3statuspb.ClientStatusResponse {
191195
func (p *Pool) BootstrapConfigForTesting() *bootstrap.Config {
192196
p.mu.Lock()
193197
defer p.mu.Unlock()
194-
return p.config
198+
cfg, _ := p.getConfiguration()
199+
if cfg != nil {
200+
return cfg
201+
}
202+
return p.fallbackConfig
195203
}
196204

197205
// UnsetBootstrapConfigForTesting unsets the bootstrap configuration used by
@@ -201,7 +209,8 @@ func (p *Pool) BootstrapConfigForTesting() *bootstrap.Config {
201209
func (p *Pool) UnsetBootstrapConfigForTesting() {
202210
p.mu.Lock()
203211
defer p.mu.Unlock()
204-
p.config = nil
212+
p.fallbackConfig = nil
213+
p.getConfiguration = sync.OnceValues(bootstrap.GetConfiguration)
205214
}
206215

207216
func (p *Pool) clientRefCountedClose(name string) {
@@ -218,7 +227,6 @@ func (p *Pool) clientRefCountedClose(name string) {
218227
}
219228
delete(p.clients, name)
220229

221-
client.Close()
222230
for _, s := range client.bootstrapConfig.XDSServers() {
223231
for _, f := range s.Cleanups() {
224232
f()
@@ -233,6 +241,11 @@ func (p *Pool) clientRefCountedClose(name string) {
233241
}
234242
p.mu.Unlock()
235243

244+
// This attempts to close the transport to the management server and could
245+
// theoretically call back into the xdsclient package again and deadlock.
246+
// Hence, this needs to be called without holding the lock.
247+
client.Close()
248+
236249
xdsClientImplCloseHook(name)
237250
}
238251

@@ -243,43 +256,36 @@ func (p *Pool) newRefCounted(name string, metricsRecorder estats.MetricsRecorder
243256
p.mu.Lock()
244257
defer p.mu.Unlock()
245258

246-
if p.config == nil {
247-
if len(p.clients) != 0 || p != DefaultPool {
248-
// If the current pool `p` already contains xDS clients or it is not
249-
// the `DefaultPool`, the bootstrap config should have been already
250-
// present in the pool.
251-
return nil, nil, fmt.Errorf("xds: bootstrap configuration not set in the pool")
252-
}
253-
// If the current pool `p` is the `DefaultPool` and has no clients, it
254-
// might be the first time an xDS client is being created on it. So,
255-
// the bootstrap configuration is read from environment variables.
256-
//
257-
// DefaultPool is initialized with bootstrap configuration from one of the
258-
// supported environment variables. If the environment variables are not
259-
// set, then fallback bootstrap configuration should be set before
260-
// attempting to create an xDS client, else xDS client creation will fail.
261-
config, err := bootstrap.GetConfiguration()
262-
if err != nil {
263-
return nil, nil, fmt.Errorf("xds: failed to read xDS bootstrap config from env vars: %v", err)
264-
}
265-
p.config = config
259+
config, err := p.getConfiguration()
260+
if err != nil {
261+
return nil, nil, fmt.Errorf("xds: failed to read xDS bootstrap config from env vars: %v", err)
262+
}
263+
264+
if config == nil {
265+
// If the environment variables are not set, then fallback bootstrap
266+
// configuration should be set before attempting to create an xDS client,
267+
// else xDS client creation will fail.
268+
config = p.fallbackConfig
269+
}
270+
if config == nil {
271+
return nil, nil, fmt.Errorf("failed to read xDS bootstrap config from env vars: bootstrap environment variables (%q or %q) not defined and fallback config not set", envconfig.XDSBootstrapFileNameEnv, envconfig.XDSBootstrapFileContentEnv)
266272
}
267273

268274
if c := p.clients[name]; c != nil {
269275
c.incrRef()
270276
return c, sync.OnceFunc(func() { p.clientRefCountedClose(name) }), nil
271277
}
272278

273-
c, err := newClientImpl(p.config, metricsRecorder, name)
279+
c, err := newClientImpl(config, metricsRecorder, name)
274280
if err != nil {
275281
return nil, nil, err
276282
}
277283
if logger.V(2) {
278-
c.logger.Infof("Created client with name %q and bootstrap configuration:\n %s", name, p.config)
284+
c.logger.Infof("Created client with name %q and bootstrap configuration:\n %s", name, config)
279285
}
280286
p.clients[name] = c
281287
xdsClientImplCreateHook(name)
282288

283-
logger.Infof("xDS node ID: %s", p.config.Node().GetId())
289+
logger.Infof("xDS node ID: %s", config.Node().GetId())
284290
return c, sync.OnceFunc(func() { p.clientRefCountedClose(name) }), nil
285291
}

xds/internal/xdsclient/pool/pool_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,10 @@ func (s) TestDefaultPool_LazyLoadBootstrapConfig(t *testing.T) {
7373
t.Fatalf("DefaultPool.BootstrapConfigForTesting() = %v, want nil", cfg)
7474
}
7575

76+
// The pool will attempt to read the env vars only once. Reset the pool's
77+
// state to make it re-read the env vars during next client creation.
78+
xdsclient.DefaultPool.UnsetBootstrapConfigForTesting()
79+
7680
_, closeFunc, err = xdsclient.DefaultPool.NewClient(t.Name(), &stats.NoopMetricsRecorder{})
7781
if err != nil {
7882
t.Fatalf("Failed to create xDS client: %v", err)

0 commit comments

Comments
 (0)