Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions cmd/queue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,14 @@ limitations under the License.

package main

import "knative.dev/serving/pkg/queue/sharedmain"
import (
"os"

"knative.dev/serving/pkg/queue/sharedmain"
)

func main() {
sharedmain.Main()
if sharedmain.Main() != nil {
os.Exit(1)
}
Comment on lines +26 to +28
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should also probably log the error here

Copy link
Contributor Author

@davidhadas davidhadas Jul 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we let downstream change this file, I think it is best if we log inside sharedmain to ensure consistent logging.

}
64 changes: 44 additions & 20 deletions pkg/queue/sharedmain/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,6 @@ type config struct {
ServingEnableProbeRequestLog bool `split_words:"true"` // optional

// Metrics configuration
ServingNamespace string `split_words:"true" required:"true"`
ServingRevision string `split_words:"true" required:"true"`
ServingConfiguration string `split_words:"true" required:"true"`
ServingPodIP string `split_words:"true" required:"true"`
ServingPod string `split_words:"true" required:"true"`
ServingService string `split_words:"true"` // optional
ServingRequestMetricsBackend string `split_words:"true"` // optional
MetricsCollectorAddress string `split_words:"true"` // optional

Expand All @@ -110,23 +104,46 @@ type config struct {
// Concurrency State Endpoint configuration
ConcurrencyStateEndpoint string `split_words:"true"` // optional
ConcurrencyStateTokenPath string `split_words:"true"` // optional

Env
}

type Env struct {
ServingNamespace string `split_words:"true" required:"true"`
ServingRevision string `split_words:"true" required:"true"`
ServingConfiguration string `split_words:"true" required:"true"`
ServingPodIP string `split_words:"true" required:"true"`
ServingPod string `split_words:"true" required:"true"`
ServingService string `split_words:"true"` // optional
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a couple of minor nits:

  • maybe don't rename the config struct ?
  • perhaps a more descriptive name than Env (PodDetails? Caveat here is that I'm terrible at naming...)
  • I'd put Env under the metrics config section (i.e. where the old fields were removed)

Copy link
Contributor Author

@davidhadas davidhadas Jul 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that Env struct is identical in content to:
type RequestLogRevision struct from https://pkg.go.dev/knative.dev/serving/pkg/http#RequestLogRevision which is also being used later in main.go, How about naming it "Revision"

I am not sure it should be under
// Metrics configuration

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think "Env" is a good name, as it's describing the "environment" (i.e. Pod) where the queue-proxy is running.

A few bits (like ServingService) won't vary per queue-proxy, but this struct seems to be "where am I on the map" rather than "what am I doing".

}

type Defaults struct {
Ctx context.Context
Logger *zap.SugaredLogger
Transport http.RoundTripper
Env Env
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I'm wondering if Main should be a method on Defaults.

I suspect not.

I'm also wondering about the name... Configuration, Process, Execution, QueueProxy?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This list includes two parts:

  • parameters whose value is determined by the service and provided for the extensions to use (but not to change)
  • parameters whose default is determined by the service and provided for the extensions such that may change them
    Maybe we should have divided this struct into two:
 type Defaults {
      Ctx              context.Context
      Transport    http.RoundTripper
 }	
 type Config {
     Env
     Logger  *zap.SugaredLogger
 }
 type QueueProxy struct {
        Invariables     Config
        Mutables        Defaults
 }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't split, I'd just note in comments which ones are considered mutable and which ones aren't.

Since they're exported types, an overview comment on the Defaults and Env structs themselves wouldn't be a bad idea.


type Option func(*Defaults)

func init() {
maxprocs.Set()
}

func Main() {
ctx := signals.NewContext()
func Main(opts ...Option) error {
d := Defaults{
Ctx: signals.NewContext(),
}

// Parse the environment.
var env config
if err := envconfig.Process("", &env); err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
return err
}

// Setup the logger.
d.Env = env.Env
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can see that you're slicing the "where I'm at" out of the larger configuration. I'm wondering whether we should simply pass along all the config/privateEnv, and accept that plugins could (for example) adjust EnableProfiling without other high-level config.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could enable splitting out envconfig into a pluggable module, though I'm not sure I'd recommend it, since I think we want that functionality enabled by default rather than every caller needing to say sharedmain.EnvConfig.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This ensures that extensions receive a copy of the Env and cannot change the Env used by the service which I think is appropriate here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think just having a read-only Env is ok for now - I think as people request more we can move properties from private => public with a discussion


// Setup the Logger.
logger, _ := pkglogging.NewLogger(env.ServingLoggingConfig, env.ServingLoggingLevel)
defer flush(logger)

Expand All @@ -137,8 +154,16 @@ func Main() {
}.String()),
zap.String(logkey.Pod, env.ServingPod))

d.Logger = logger
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not put this on 147? Reducing code movement?

I'd be inclined to s/logger/d.Logger/ throughout.

Copy link
Contributor Author

@davidhadas davidhadas Jul 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This ensures that extensions receive a copy of the logger and cannot change the logger used by the service which I think is appropriate here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm ok either way.

Longer term I would expect the logger be pluggable in the future

func WithLogger(l *zap.SugaredLogger) func(q *Defaults) {
	return func(d *Defaults) {
		d.Logger = l
		return
	}
}

d.Transport = buildTransport(env, logger)

// allow extensions to read d and return modified context and transport
for _, opts := range opts {
opts(&d)
}

// Report stats on Go memory usage every 30 seconds.
metrics.MemStatsOrDie(ctx)
metrics.MemStatsOrDie(d.Ctx)

protoStatReporter := queue.NewProtobufStatsReporter(env.ServingPod, reportingPeriod)

Expand Down Expand Up @@ -169,7 +194,7 @@ func Main() {
// Enable TLS when certificate is mounted.
tlsEnabled := exists(logger, certPath) && exists(logger, keyPath)

mainServer, drain := buildServer(ctx, env, probe, stats, logger, concurrencyendpoint, false)
mainServer, drain := buildServer(d.Ctx, env, d.Transport, probe, stats, logger, concurrencyendpoint, false)
httpServers := map[string]*http.Server{
"main": mainServer,
"metrics": buildMetricsServer(protoStatReporter),
Expand All @@ -184,7 +209,7 @@ func Main() {
// See also https://github.com/knative/serving/issues/12808.
var tlsServers map[string]*http.Server
if tlsEnabled {
mainTLSServer, drain := buildServer(ctx, env, probe, stats, logger, concurrencyendpoint, true /* enable TLS */)
mainTLSServer, drain := buildServer(d.Ctx, env, d.Transport, probe, stats, logger, concurrencyendpoint, true /* enable TLS */)
tlsServers = map[string]*http.Server{
"tlsMain": mainTLSServer,
"tlsAdmin": buildAdminServer(logger, drain),
Expand Down Expand Up @@ -220,10 +245,8 @@ func Main() {
select {
case err := <-errCh:
logger.Errorw("Failed to bring up queue-proxy, shutting down.", zap.Error(err))
// This extra flush is needed because defers are not handled via os.Exit calls.
flush(logger)
os.Exit(1)
case <-ctx.Done():
return err
case <-d.Ctx.Done():
if env.ConcurrencyStateEndpoint != "" {
concurrencyendpoint.Terminating(logger)
}
Expand All @@ -242,6 +265,7 @@ func Main() {
}
logger.Info("Shutdown complete, exiting...")
}
return nil
}

func exists(logger *zap.SugaredLogger, filename string) bool {
Expand All @@ -263,14 +287,14 @@ func buildProbe(logger *zap.SugaredLogger, encodedProbe string, autodetectHTTP2
return readiness.NewProbe(coreProbe)
}

func buildServer(ctx context.Context, env config, probeContainer func() bool, stats *netstats.RequestStats, logger *zap.SugaredLogger,
func buildServer(ctx context.Context, env config, transport http.RoundTripper, probeContainer func() bool, stats *netstats.RequestStats, logger *zap.SugaredLogger,
ce *queue.ConcurrencyEndpoint, enableTLS bool) (server *http.Server, drain func()) {
// TODO: If TLS is enabled, execute probes twice and tracking two different sets of container health.

target := net.JoinHostPort("127.0.0.1", env.UserPort)

httpProxy := pkghttp.NewHeaderPruningReverseProxy(target, pkghttp.NoHostOverride, activator.RevisionHeaders, false /* use HTTP */)
httpProxy.Transport = buildTransport(env, logger)
httpProxy.Transport = transport
httpProxy.ErrorHandler = pkghandler.Error(logger)
httpProxy.BufferPool = netproxy.NewBufferPool()
httpProxy.FlushInterval = netproxy.FlushInterval
Expand Down