Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 92 additions & 0 deletions hack/add_queue_options.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
#!/usr/bin/env bash

# Copyright 2022 The Knative Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# RTPLUGS_PKG should include a comma seperated list of go packages
# Each package should implement a plug of github.com/IBM/go-security-plugs/rtplugs
# All packages will be staticaly loaded to queue proxry
#
# Package names that will be defined in the podSpec runtimePlugins and those
# defined in the config-defaults configmap runtime-plugins parameter will be activated


# ----------- Do not edit above this line ------------------

# Comment out packages that you do not wish to load

# Add the testing workload security guard plug - harmless, yet better not use in production
RTPLUGS_PKG="${RTPLUGS_PKG},github.com/IBM/go-security-plugs/plugs/testgate"

# Add the sample workload security guard plug - do not use in production
#RTPLUGS_PKG="${RTPLUGS_PKG},github.com/IBM/go-security-plugs/plugs/rtgate"

# Add workload security guard plug (under development) - use at your own risk
#RTPLUGS_PKG="${RTPLUGS_PKG},github.com/IBM/workload-security-guard/pkg/wsgate"

# Add here additional packages that you wish to load in this template
#TPLUGS_PKG="${RTPLUGS_PKG},github.com/HomerSimpson/BestSecurityEver/shutdown"


# ----------- Do not edit below this line ------------------

cd cmd/queue

mv -n main.go main.go.orig

echo "------------------------"
echo "Generating main.go"

cat <<EOT > main.go
// Code generated by $0. DO NOT EDIT.
package main

import "os"

import "github.com/IBM/go-security-plugs/qpsecurity"
import "knative.dev/serving/pkg/queue/sharedmain"

EOT

IFS=","
for p in ${RTPLUGS_PKG}
do
# process
if [ ! -z "$p" ]
then
echo import _ \"$p\"
echo "import _ \"$p\"" >> main.go
go get $p
PLUGNAME="${p##*/}"
RTPLUGS_NAMES="${RTPLUGS_NAMES}${RTPLUGS_NAMES:+,}${PLUGNAME}"
fi
done
echo "Active plugs: ${RTPLUGS_NAMES}"

cat <<EOT >> main.go
func main() {
os.Setenv("RTPLUGS", "${RTPLUGS_NAMES}")

qOpt := qpsecurity.NewQPSecurityPlugs()
defer qOpt.Shutdown()

sharedmain.Main(qOpt.Setup)
}
EOT
echo "--------------- resulting main.go -----------"
cat main.go

cd ../..
go mod tidy
go mod vendor
76 changes: 51 additions & 25 deletions pkg/queue/sharedmain/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ const (
keyPath = queue.CertDirectory + "/" + certificates.SecretPKKey
)

type config struct {
type privateEnv struct {
ContainerConcurrency int `split_words:"true" required:"true"`
QueueServingPort string `split_words:"true" required:"true"`
QueueServingTLSPort string `split_words:"true" required:"true"`
Expand All @@ -92,12 +92,6 @@ type config struct {
ServingEnableProbeRequestLog bool `split_words:"true"` // optional

// Metrics configuration
ServingNamespace string `split_words:"true" required:"true"`
ServingRevision string `split_words:"true" required:"true"`
ServingConfiguration string `split_words:"true" required:"true"`
ServingPodIP string `split_words:"true" required:"true"`
ServingPod string `split_words:"true" required:"true"`
ServingService string `split_words:"true"` // optional
ServingRequestMetricsBackend string `split_words:"true"` // optional
MetricsCollectorAddress string `split_words:"true"` // optional

Expand All @@ -110,22 +104,45 @@ type config struct {
// Concurrency State Endpoint configuration
ConcurrencyStateEndpoint string `split_words:"true"` // optional
ConcurrencyStateTokenPath string `split_words:"true"` // optional

Env
}

type Env struct {
ServingNamespace string `split_words:"true" required:"true"`
ServingRevision string `split_words:"true" required:"true"`
ServingConfiguration string `split_words:"true" required:"true"`
ServingPodIP string `split_words:"true" required:"true"`
ServingPod string `split_words:"true" required:"true"`
ServingService string `split_words:"true"` // optional
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a couple of minor nits:

  • maybe don't rename the config struct ?
  • perhaps a more descriptive name than Env (PodDetails? Caveat here is that I'm terrible at naming...)
  • I'd put Env under the metrics config section (i.e. where the old fields were removed)

Copy link
Contributor Author

@davidhadas davidhadas Jul 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that Env struct is identical in content to:
type RequestLogRevision struct from https://pkg.go.dev/knative.dev/serving/pkg/http#RequestLogRevision which is also being used later in main.go, How about naming it "Revision"

I am not sure it should be under
// Metrics configuration

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think "Env" is a good name, as it's describing the "environment" (i.e. Pod) where the queue-proxy is running.

A few bits (like ServingService) won't vary per queue-proxy, but this struct seems to be "where am I on the map" rather than "what am I doing".

}

type Defaults struct {
Ctx context.Context
Logger *zap.SugaredLogger
Transport http.RoundTripper
Env Env
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I'm wondering if Main should be a method on Defaults.

I suspect not.

I'm also wondering about the name... Configuration, Process, Execution, QueueProxy?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This list includes two parts:

  • parameters whose value is determined by the service and provided for the extensions to use (but not to change)
  • parameters whose default is determined by the service and provided for the extensions such that may change them
    Maybe we should have divided this struct into two:
 type Defaults {
      Ctx              context.Context
      Transport    http.RoundTripper
 }	
 type Config {
     Env
     Logger  *zap.SugaredLogger
 }
 type QueueProxy struct {
        Invariables     Config
        Mutables        Defaults
 }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't split, I'd just note in comments which ones are considered mutable and which ones aren't.

Since they're exported types, an overview comment on the Defaults and Env structs themselves wouldn't be a bad idea.


type Option func(*Defaults)

func init() {
maxprocs.Set()
}

func Main() {
ctx := signals.NewContext()
func Main(opts ...Option) error {
d := &Defaults{
Ctx: signals.NewContext(),
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer to make d concrete here, rather than a pointer.


// Parse the environment.
var env config
var env privateEnv
if err := envconfig.Process("", &env); err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
return err
}

d.Env = env.Env
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can see that you're slicing the "where I'm at" out of the larger configuration. I'm wondering whether we should simply pass along all the config/privateEnv, and accept that plugins could (for example) adjust EnableProfiling without other high-level config.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could enable splitting out envconfig into a pluggable module, though I'm not sure I'd recommend it, since I think we want that functionality enabled by default rather than every caller needing to say sharedmain.EnvConfig.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This ensures that extensions receive a copy of the Env and cannot change the Env used by the service which I think is appropriate here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think just having a read-only Env is ok for now - I think as people request more we can move properties from private => public with a discussion


// Setup the logger.
logger, _ := pkglogging.NewLogger(env.ServingLoggingConfig, env.ServingLoggingLevel)
defer flush(logger)
Expand All @@ -137,6 +154,16 @@ func Main() {
}.String()),
zap.String(logkey.Pod, env.ServingPod))

d.Logger = logger
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not put this on 147? Reducing code movement?

I'd be inclined to s/logger/d.Logger/ throughout.

Copy link
Contributor Author

@davidhadas davidhadas Jul 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This ensures that extensions receive a copy of the logger and cannot change the logger used by the service which I think is appropriate here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm ok either way.

Longer term I would expect the logger be pluggable in the future

func WithLogger(l *zap.SugaredLogger) func(q *Defaults) {
	return func(d *Defaults) {
		d.Logger = l
		return
	}
}

d.Transport = buildTransport(env, logger)

// allow extensions to read d and return modified context and transport
for _, opts := range opts {
opts(d)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Slight preference for opts(&d) to make it clear that d is modified in-place here, but I don't care strongly.

}
ctx := d.Ctx
transport := d.Transport
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, I'd be inclined to substitute these throughout.


// Report stats on Go memory usage every 30 seconds.
metrics.MemStatsOrDie(ctx)

Expand Down Expand Up @@ -169,7 +196,7 @@ func Main() {
// Enable TLS when certificate is mounted.
tlsEnabled := exists(logger, certPath) && exists(logger, keyPath)

mainServer, drain := buildServer(ctx, env, probe, stats, logger, concurrencyendpoint, false)
mainServer, drain := buildServer(ctx, env, transport, probe, stats, logger, concurrencyendpoint, false)
httpServers := map[string]*http.Server{
"main": mainServer,
"metrics": buildMetricsServer(protoStatReporter),
Expand All @@ -184,7 +211,7 @@ func Main() {
// See also https://github.com/knative/serving/issues/12808.
var tlsServers map[string]*http.Server
if tlsEnabled {
mainTLSServer, drain := buildServer(ctx, env, probe, stats, logger, concurrencyendpoint, true /* enable TLS */)
mainTLSServer, drain := buildServer(ctx, env, transport, probe, stats, logger, concurrencyendpoint, true /* enable TLS */)
tlsServers = map[string]*http.Server{
"tlsMain": mainTLSServer,
"tlsAdmin": buildAdminServer(logger, drain),
Expand Down Expand Up @@ -220,9 +247,7 @@ func Main() {
select {
case err := <-errCh:
logger.Errorw("Failed to bring up queue-proxy, shutting down.", zap.Error(err))
// This extra flush is needed because defers are not handled via os.Exit calls.
flush(logger)
os.Exit(1)
return err
case <-ctx.Done():
if env.ConcurrencyStateEndpoint != "" {
concurrencyendpoint.Terminating(logger)
Expand All @@ -242,6 +267,7 @@ func Main() {
}
logger.Info("Shutdown complete, exiting...")
}
return nil
}

func exists(logger *zap.SugaredLogger, filename string) bool {
Expand All @@ -263,14 +289,14 @@ func buildProbe(logger *zap.SugaredLogger, encodedProbe string, autodetectHTTP2
return readiness.NewProbe(coreProbe)
}

func buildServer(ctx context.Context, env config, probeContainer func() bool, stats *netstats.RequestStats, logger *zap.SugaredLogger,
func buildServer(ctx context.Context, env privateEnv, transport http.RoundTripper, probeContainer func() bool, stats *netstats.RequestStats, logger *zap.SugaredLogger,
ce *queue.ConcurrencyEndpoint, enableTLS bool) (server *http.Server, drain func()) {
// TODO: If TLS is enabled, execute probes twice and tracking two different sets of container health.

target := net.JoinHostPort("127.0.0.1", env.UserPort)

httpProxy := pkghttp.NewHeaderPruningReverseProxy(target, pkghttp.NoHostOverride, activator.RevisionHeaders, false /* use HTTP */)
httpProxy.Transport = buildTransport(env, logger)
httpProxy.Transport = transport
httpProxy.ErrorHandler = pkghandler.Error(logger)
httpProxy.BufferPool = netproxy.NewBufferPool()
httpProxy.FlushInterval = netproxy.FlushInterval
Expand Down Expand Up @@ -336,7 +362,7 @@ func buildServer(ctx context.Context, env config, probeContainer func() bool, st
return pkgnet.NewServer(":"+env.QueueServingPort, composedHandler), drainer.Drain
}

func buildTransport(env config, logger *zap.SugaredLogger) http.RoundTripper {
func buildTransport(env privateEnv, logger *zap.SugaredLogger) http.RoundTripper {
maxIdleConns := 1000 // TODO: somewhat arbitrary value for CC=0, needs experimental validation.
if env.ContainerConcurrency > 0 {
maxIdleConns = env.ContainerConcurrency
Expand All @@ -362,7 +388,7 @@ func buildTransport(env config, logger *zap.SugaredLogger) http.RoundTripper {
}
}

func buildBreaker(logger *zap.SugaredLogger, env config) *queue.Breaker {
func buildBreaker(logger *zap.SugaredLogger, env privateEnv) *queue.Breaker {
if env.ContainerConcurrency < 1 {
return nil
}
Expand All @@ -379,7 +405,7 @@ func buildBreaker(logger *zap.SugaredLogger, env config) *queue.Breaker {
return queue.NewBreaker(params)
}

func supportsMetrics(ctx context.Context, logger *zap.SugaredLogger, env config, enableTLS bool) bool {
func supportsMetrics(ctx context.Context, logger *zap.SugaredLogger, env privateEnv, enableTLS bool) bool {
// Keep it on HTTP because Metrics needs to be registered on either TLS server or non-TLS server.
if enableTLS {
return false
Expand Down Expand Up @@ -418,7 +444,7 @@ func buildMetricsServer(protobufStatReporter *queue.ProtobufStatsReporter) *http
}
}

func requestLogHandler(logger *zap.SugaredLogger, currentHandler http.Handler, env config) http.Handler {
func requestLogHandler(logger *zap.SugaredLogger, currentHandler http.Handler, env privateEnv) http.Handler {
revInfo := &pkghttp.RequestLogRevision{
Name: env.ServingRevision,
Namespace: env.ServingNamespace,
Expand All @@ -436,7 +462,7 @@ func requestLogHandler(logger *zap.SugaredLogger, currentHandler http.Handler, e
return handler
}

func requestMetricsHandler(logger *zap.SugaredLogger, currentHandler http.Handler, env config) http.Handler {
func requestMetricsHandler(logger *zap.SugaredLogger, currentHandler http.Handler, env privateEnv) http.Handler {
h, err := queue.NewRequestMetricsHandler(currentHandler, env.ServingNamespace,
env.ServingService, env.ServingConfiguration, env.ServingRevision, env.ServingPod)
if err != nil {
Expand All @@ -446,7 +472,7 @@ func requestMetricsHandler(logger *zap.SugaredLogger, currentHandler http.Handle
return h
}

func requestAppMetricsHandler(logger *zap.SugaredLogger, currentHandler http.Handler, breaker *queue.Breaker, env config) http.Handler {
func requestAppMetricsHandler(logger *zap.SugaredLogger, currentHandler http.Handler, breaker *queue.Breaker, env privateEnv) http.Handler {
h, err := queue.NewAppRequestMetricsHandler(currentHandler, breaker, env.ServingNamespace,
env.ServingService, env.ServingConfiguration, env.ServingRevision, env.ServingPod)
if err != nil {
Expand Down