-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Enable refactoring queue-proxy binary with out-of-tree extensions #13133
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
ecee81c
c5a8030
bddc07e
e98327f
dbcf283
9a4d6d9
5137f03
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -73,7 +73,7 @@ const ( | |
| keyPath = queue.CertDirectory + "/" + certificates.SecretPKKey | ||
| ) | ||
|
|
||
| type config struct { | ||
| type privateEnv struct { | ||
| ContainerConcurrency int `split_words:"true" required:"true"` | ||
| QueueServingPort string `split_words:"true" required:"true"` | ||
| QueueServingTLSPort string `split_words:"true" required:"true"` | ||
|
|
@@ -92,12 +92,6 @@ type config struct { | |
| ServingEnableProbeRequestLog bool `split_words:"true"` // optional | ||
|
|
||
| // Metrics configuration | ||
| ServingNamespace string `split_words:"true" required:"true"` | ||
| ServingRevision string `split_words:"true" required:"true"` | ||
| ServingConfiguration string `split_words:"true" required:"true"` | ||
| ServingPodIP string `split_words:"true" required:"true"` | ||
| ServingPod string `split_words:"true" required:"true"` | ||
| ServingService string `split_words:"true"` // optional | ||
| ServingRequestMetricsBackend string `split_words:"true"` // optional | ||
| MetricsCollectorAddress string `split_words:"true"` // optional | ||
|
|
||
|
|
@@ -110,22 +104,45 @@ type config struct { | |
| // Concurrency State Endpoint configuration | ||
| ConcurrencyStateEndpoint string `split_words:"true"` // optional | ||
| ConcurrencyStateTokenPath string `split_words:"true"` // optional | ||
|
|
||
| Env | ||
| } | ||
|
|
||
| type Env struct { | ||
| ServingNamespace string `split_words:"true" required:"true"` | ||
| ServingRevision string `split_words:"true" required:"true"` | ||
| ServingConfiguration string `split_words:"true" required:"true"` | ||
| ServingPodIP string `split_words:"true" required:"true"` | ||
| ServingPod string `split_words:"true" required:"true"` | ||
| ServingService string `split_words:"true"` // optional | ||
| } | ||
|
|
||
| type Defaults struct { | ||
| Ctx context.Context | ||
| Logger *zap.SugaredLogger | ||
| Transport http.RoundTripper | ||
| Env Env | ||
| } | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, I'm wondering if I suspect not. I'm also wondering about the name... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This list includes two parts:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wouldn't split, I'd just note in comments which ones are considered mutable and which ones aren't. Since they're exported types, an overview comment on the |
||
|
|
||
| type Option func(*Defaults) | ||
|
|
||
| func init() { | ||
| maxprocs.Set() | ||
| } | ||
|
|
||
| func Main() { | ||
| ctx := signals.NewContext() | ||
| func Main(opts ...Option) error { | ||
davidhadas marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| d := &Defaults{ | ||
| Ctx: signals.NewContext(), | ||
| } | ||
|
||
|
|
||
| // Parse the environment. | ||
| var env config | ||
| var env privateEnv | ||
| if err := envconfig.Process("", &env); err != nil { | ||
| fmt.Fprintln(os.Stderr, err) | ||
dprotaso marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| os.Exit(1) | ||
| return err | ||
| } | ||
|
|
||
| d.Env = env.Env | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can see that you're slicing the "where I'm at" out of the larger configuration. I'm wondering whether we should simply pass along all the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This could enable splitting out There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This ensures that extensions receive a copy of the Env and cannot change the Env used by the service which I think is appropriate here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think just having a read-only |
||
|
|
||
| // Setup the logger. | ||
| logger, _ := pkglogging.NewLogger(env.ServingLoggingConfig, env.ServingLoggingLevel) | ||
| defer flush(logger) | ||
|
|
@@ -137,6 +154,16 @@ func Main() { | |
| }.String()), | ||
| zap.String(logkey.Pod, env.ServingPod)) | ||
|
|
||
| d.Logger = logger | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not put this on 147? Reducing code movement? I'd be inclined to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This ensures that extensions receive a copy of the logger and cannot change the logger used by the service which I think is appropriate here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm ok either way. Longer term I would expect the logger be pluggable in the future func WithLogger(l *zap.SugaredLogger) func(q *Defaults) {
return func(d *Defaults) {
d.Logger = l
return
}
} |
||
| d.Transport = buildTransport(env, logger) | ||
|
|
||
| // allow extensions to read d and return modified context and transport | ||
| for _, opts := range opts { | ||
| opts(d) | ||
|
||
| } | ||
| ctx := d.Ctx | ||
| transport := d.Transport | ||
|
||
|
|
||
| // Report stats on Go memory usage every 30 seconds. | ||
| metrics.MemStatsOrDie(ctx) | ||
|
|
||
|
|
@@ -169,7 +196,7 @@ func Main() { | |
| // Enable TLS when certificate is mounted. | ||
| tlsEnabled := exists(logger, certPath) && exists(logger, keyPath) | ||
|
|
||
| mainServer, drain := buildServer(ctx, env, probe, stats, logger, concurrencyendpoint, false) | ||
| mainServer, drain := buildServer(ctx, env, transport, probe, stats, logger, concurrencyendpoint, false) | ||
| httpServers := map[string]*http.Server{ | ||
| "main": mainServer, | ||
| "metrics": buildMetricsServer(protoStatReporter), | ||
|
|
@@ -184,7 +211,7 @@ func Main() { | |
| // See also https://github.com/knative/serving/issues/12808. | ||
| var tlsServers map[string]*http.Server | ||
| if tlsEnabled { | ||
| mainTLSServer, drain := buildServer(ctx, env, probe, stats, logger, concurrencyendpoint, true /* enable TLS */) | ||
| mainTLSServer, drain := buildServer(ctx, env, transport, probe, stats, logger, concurrencyendpoint, true /* enable TLS */) | ||
| tlsServers = map[string]*http.Server{ | ||
| "tlsMain": mainTLSServer, | ||
| "tlsAdmin": buildAdminServer(logger, drain), | ||
|
|
@@ -220,9 +247,7 @@ func Main() { | |
| select { | ||
| case err := <-errCh: | ||
| logger.Errorw("Failed to bring up queue-proxy, shutting down.", zap.Error(err)) | ||
| // This extra flush is needed because defers are not handled via os.Exit calls. | ||
| flush(logger) | ||
| os.Exit(1) | ||
| return err | ||
| case <-ctx.Done(): | ||
| if env.ConcurrencyStateEndpoint != "" { | ||
| concurrencyendpoint.Terminating(logger) | ||
|
|
@@ -242,6 +267,7 @@ func Main() { | |
| } | ||
| logger.Info("Shutdown complete, exiting...") | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| func exists(logger *zap.SugaredLogger, filename string) bool { | ||
|
|
@@ -263,14 +289,14 @@ func buildProbe(logger *zap.SugaredLogger, encodedProbe string, autodetectHTTP2 | |
| return readiness.NewProbe(coreProbe) | ||
| } | ||
|
|
||
| func buildServer(ctx context.Context, env config, probeContainer func() bool, stats *netstats.RequestStats, logger *zap.SugaredLogger, | ||
| func buildServer(ctx context.Context, env privateEnv, transport http.RoundTripper, probeContainer func() bool, stats *netstats.RequestStats, logger *zap.SugaredLogger, | ||
| ce *queue.ConcurrencyEndpoint, enableTLS bool) (server *http.Server, drain func()) { | ||
| // TODO: If TLS is enabled, execute probes twice and tracking two different sets of container health. | ||
|
|
||
| target := net.JoinHostPort("127.0.0.1", env.UserPort) | ||
|
|
||
| httpProxy := pkghttp.NewHeaderPruningReverseProxy(target, pkghttp.NoHostOverride, activator.RevisionHeaders, false /* use HTTP */) | ||
| httpProxy.Transport = buildTransport(env, logger) | ||
| httpProxy.Transport = transport | ||
| httpProxy.ErrorHandler = pkghandler.Error(logger) | ||
| httpProxy.BufferPool = netproxy.NewBufferPool() | ||
| httpProxy.FlushInterval = netproxy.FlushInterval | ||
|
|
@@ -336,7 +362,7 @@ func buildServer(ctx context.Context, env config, probeContainer func() bool, st | |
| return pkgnet.NewServer(":"+env.QueueServingPort, composedHandler), drainer.Drain | ||
| } | ||
|
|
||
| func buildTransport(env config, logger *zap.SugaredLogger) http.RoundTripper { | ||
| func buildTransport(env privateEnv, logger *zap.SugaredLogger) http.RoundTripper { | ||
| maxIdleConns := 1000 // TODO: somewhat arbitrary value for CC=0, needs experimental validation. | ||
| if env.ContainerConcurrency > 0 { | ||
| maxIdleConns = env.ContainerConcurrency | ||
|
|
@@ -362,7 +388,7 @@ func buildTransport(env config, logger *zap.SugaredLogger) http.RoundTripper { | |
| } | ||
| } | ||
|
|
||
| func buildBreaker(logger *zap.SugaredLogger, env config) *queue.Breaker { | ||
| func buildBreaker(logger *zap.SugaredLogger, env privateEnv) *queue.Breaker { | ||
| if env.ContainerConcurrency < 1 { | ||
| return nil | ||
| } | ||
|
|
@@ -379,7 +405,7 @@ func buildBreaker(logger *zap.SugaredLogger, env config) *queue.Breaker { | |
| return queue.NewBreaker(params) | ||
| } | ||
|
|
||
| func supportsMetrics(ctx context.Context, logger *zap.SugaredLogger, env config, enableTLS bool) bool { | ||
| func supportsMetrics(ctx context.Context, logger *zap.SugaredLogger, env privateEnv, enableTLS bool) bool { | ||
| // Keep it on HTTP because Metrics needs to be registered on either TLS server or non-TLS server. | ||
| if enableTLS { | ||
| return false | ||
|
|
@@ -418,7 +444,7 @@ func buildMetricsServer(protobufStatReporter *queue.ProtobufStatsReporter) *http | |
| } | ||
| } | ||
|
|
||
| func requestLogHandler(logger *zap.SugaredLogger, currentHandler http.Handler, env config) http.Handler { | ||
| func requestLogHandler(logger *zap.SugaredLogger, currentHandler http.Handler, env privateEnv) http.Handler { | ||
| revInfo := &pkghttp.RequestLogRevision{ | ||
| Name: env.ServingRevision, | ||
| Namespace: env.ServingNamespace, | ||
|
|
@@ -436,7 +462,7 @@ func requestLogHandler(logger *zap.SugaredLogger, currentHandler http.Handler, e | |
| return handler | ||
| } | ||
|
|
||
| func requestMetricsHandler(logger *zap.SugaredLogger, currentHandler http.Handler, env config) http.Handler { | ||
| func requestMetricsHandler(logger *zap.SugaredLogger, currentHandler http.Handler, env privateEnv) http.Handler { | ||
| h, err := queue.NewRequestMetricsHandler(currentHandler, env.ServingNamespace, | ||
| env.ServingService, env.ServingConfiguration, env.ServingRevision, env.ServingPod) | ||
| if err != nil { | ||
|
|
@@ -446,7 +472,7 @@ func requestMetricsHandler(logger *zap.SugaredLogger, currentHandler http.Handle | |
| return h | ||
| } | ||
|
|
||
| func requestAppMetricsHandler(logger *zap.SugaredLogger, currentHandler http.Handler, breaker *queue.Breaker, env config) http.Handler { | ||
| func requestAppMetricsHandler(logger *zap.SugaredLogger, currentHandler http.Handler, breaker *queue.Breaker, env privateEnv) http.Handler { | ||
| h, err := queue.NewAppRequestMetricsHandler(currentHandler, breaker, env.ServingNamespace, | ||
| env.ServingService, env.ServingConfiguration, env.ServingRevision, env.ServingPod) | ||
| if err != nil { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a couple of minor nits:
configstruct ?Env(PodDetails? Caveat here is that I'm terrible at naming...)Envunder the metrics config section (i.e. where the old fields were removed)Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that Env struct is identical in content to:
type RequestLogRevision struct from https://pkg.go.dev/knative.dev/serving/pkg/http#RequestLogRevision which is also being used later in main.go, How about naming it "Revision"
I am not sure it should be under
// Metrics configuration
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think "Env" is a good name, as it's describing the "environment" (i.e. Pod) where the queue-proxy is running.
A few bits (like
ServingService) won't vary per queue-proxy, but this struct seems to be "where am I on the map" rather than "what am I doing".