Skip to content

Commit 1c8d21b

Browse files
authored
Make queue-proxy's reverse proxy handler extendable (#16097)
* Make queue-proxy's reverse proxy handler extendable This builds on #13133 to make it possible to adjust settings on the *httputil.ReverseProxy that queue-proxy uses, or to replace it entirely with any http.Handler, using the out-of-tree extension pattern. * Ensure configured Transport is used If an integrator customizes the Transport in an Options function, we want to apply the provided Transport to our default httputil.ReverseProxy regardless of which Option sets the Transport and regardless of whether or not other Option functions replace our default http.Handler. Add some tests for this since the interactions beteween Options.Transport and Options.ProxyHandler can be subtle.
1 parent 561f348 commit 1c8d21b

File tree

3 files changed

+97
-18
lines changed

3 files changed

+97
-18
lines changed

pkg/queue/sharedmain/handlers.go

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ package sharedmain
1818

1919
import (
2020
"context"
21-
"net"
2221
"net/http"
2322
"time"
2423

@@ -28,34 +27,24 @@ import (
2827
"go.uber.org/zap"
2928

3029
netheader "knative.dev/networking/pkg/http/header"
31-
netproxy "knative.dev/networking/pkg/http/proxy"
3230
netstats "knative.dev/networking/pkg/http/stats"
3331
pkghandler "knative.dev/pkg/network/handlers"
34-
"knative.dev/serving/pkg/activator"
35-
pkghttp "knative.dev/serving/pkg/http"
3632
"knative.dev/serving/pkg/http/handler"
3733
"knative.dev/serving/pkg/queue"
3834
"knative.dev/serving/pkg/queue/health"
3935
)
4036

4137
func mainHandler(
4238
env config,
43-
transport http.RoundTripper,
39+
d Defaults,
4440
prober func() bool,
4541
stats *netstats.RequestStats,
4642
logger *zap.SugaredLogger,
4743
mp metric.MeterProvider,
4844
tp trace.TracerProvider,
4945
) (http.Handler, *pkghandler.Drainer) {
50-
target := net.JoinHostPort("127.0.0.1", env.UserPort)
5146
tracer := tp.Tracer("knative.dev/serving/pkg/queue")
5247

53-
httpProxy := pkghttp.NewHeaderPruningReverseProxy(target, pkghttp.NoHostOverride, activator.RevisionHeaders, false /* use HTTP */)
54-
httpProxy.Transport = transport
55-
httpProxy.ErrorHandler = pkghandler.Error(logger)
56-
httpProxy.BufferPool = netproxy.NewBufferPool()
57-
httpProxy.FlushInterval = netproxy.FlushInterval
58-
5948
breaker := buildBreaker(logger, env)
6049

6150
timeout := time.Duration(env.RevisionTimeoutSeconds) * time.Second
@@ -69,7 +58,7 @@ func mainHandler(
6958
}
7059
// Create queue handler chain.
7160
// Note: innermost handlers are specified first, ie. the last handler in the chain will be executed first.
72-
var composedHandler http.Handler = httpProxy
61+
composedHandler := d.ProxyHandler
7362

7463
composedHandler = requestAppMetricsHandler(logger, composedHandler, breaker, mp)
7564
composedHandler = queue.ProxyHandler(tracer, breaker, stats, composedHandler)

pkg/queue/sharedmain/main.go

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,17 @@ import (
2222
"encoding/json"
2323
"errors"
2424
"fmt"
25+
"net"
2526
"net/http"
27+
"net/http/httputil"
2628
"os"
2729
"strconv"
2830
"time"
2931

32+
netproxy "knative.dev/networking/pkg/http/proxy"
33+
pkghandler "knative.dev/pkg/network/handlers"
34+
"knative.dev/serving/pkg/activator"
35+
3036
"github.com/kelseyhightower/envconfig"
3137
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
3238
"go.opentelemetry.io/otel/metric"
@@ -145,6 +151,12 @@ type Defaults struct {
145151
// If Transport is wrapped, the new RoundTripper should replace the value of Transport.
146152
// The new Transport will then be used by other Options (called next) and by QP.
147153
Transport http.RoundTripper
154+
155+
// ProxyHandler provides Options with the QP's main ReverseProxy
156+
// The default value is of type *httputil.ReverseProxy
157+
// An Option may type assert to *httutil.ReverseProxy and adjust attributes of the ReverseProxy,
158+
// or replace the handler entirely, typically to insert additional handler(s) into the chain.
159+
ProxyHandler http.Handler
148160
}
149161

150162
type Option func(*Defaults)
@@ -204,11 +216,9 @@ func Main(opts ...Option) error {
204216
}()
205217

206218
d.Transport = buildTransport(env, tp, mp)
219+
proxyHandler := buildProxyHandler(logger, env, d.Transport)
207220

208-
// allow extensions to read d and return modified context and transport
209-
for _, opts := range opts {
210-
opts(&d)
211-
}
221+
applyOptions(&d, proxyHandler, opts...)
212222

213223
protoStatReporter := queue.NewProtobufStatsReporter(env.ServingPod, reportingPeriod)
214224

@@ -232,7 +242,7 @@ func Main(opts ...Option) error {
232242
// Enable TLS when certificate is mounted.
233243
tlsEnabled := exists(logger, certPath) && exists(logger, keyPath)
234244

235-
mainHandler, drainer := mainHandler(env, d.Transport, probe, stats, logger, mp, tp)
245+
mainHandler, drainer := mainHandler(env, d, probe, stats, logger, mp, tp)
236246
adminHandler := adminHandler(d.Ctx, logger, drainer)
237247

238248
// Enable TLS server when activator server certs are mounted.
@@ -325,6 +335,17 @@ func Main(opts ...Option) error {
325335
return nil
326336
}
327337

338+
func applyOptions(d *Defaults, proxyHandler *httputil.ReverseProxy, opts ...Option) {
339+
d.ProxyHandler = proxyHandler
340+
341+
// allow extensions to read d and return modified context, transport, and proxy handler.
342+
for _, opts := range opts {
343+
opts(d)
344+
}
345+
346+
proxyHandler.Transport = d.Transport
347+
}
348+
328349
func exists(logger *zap.SugaredLogger, filename string) bool {
329350
_, err := os.Stat(filename)
330351
if err != nil && !os.IsNotExist(err) {
@@ -359,6 +380,17 @@ func buildTransport(env config, tp trace.TracerProvider, mp metric.MeterProvider
359380
)
360381
}
361382

383+
func buildProxyHandler(logger *zap.SugaredLogger, env config, transport http.RoundTripper) *httputil.ReverseProxy {
384+
target := net.JoinHostPort("127.0.0.1", env.UserPort)
385+
httpProxy := pkghttp.NewHeaderPruningReverseProxy(target, pkghttp.NoHostOverride, activator.RevisionHeaders, false /* use HTTP */)
386+
httpProxy.Transport = transport
387+
httpProxy.ErrorHandler = pkghandler.Error(logger)
388+
httpProxy.BufferPool = netproxy.NewBufferPool()
389+
httpProxy.FlushInterval = netproxy.FlushInterval
390+
391+
return httpProxy
392+
}
393+
362394
func buildBreaker(logger *zap.SugaredLogger, env config) *queue.Breaker {
363395
if env.ContainerConcurrency < 1 {
364396
return nil

pkg/queue/sharedmain/main_test.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,3 +221,61 @@ func getFieldValue(cfg *config, fieldName string) reflect.Value {
221221
f := reflect.Indirect(rVal).FieldByName(fieldName)
222222
return f
223223
}
224+
225+
func TestApplyOptions(t *testing.T) {
226+
t.Run("option sets Transport on proxyHandler", func(t *testing.T) {
227+
d := &Defaults{}
228+
proxyHandler := &httputil.ReverseProxy{}
229+
customTransport := &http.Transport{}
230+
231+
opt := func(d *Defaults) {
232+
d.Transport = customTransport
233+
}
234+
235+
applyOptions(d, proxyHandler, opt)
236+
237+
if proxyHandler.Transport != customTransport {
238+
t.Errorf("proxyHandler.Transport = %v, want %v", proxyHandler.Transport, customTransport)
239+
}
240+
})
241+
242+
t.Run("option receives non-nil ProxyHandler", func(t *testing.T) {
243+
d := &Defaults{}
244+
proxyHandler := &httputil.ReverseProxy{}
245+
var receivedHandler http.Handler
246+
247+
opt := func(d *Defaults) {
248+
receivedHandler = d.ProxyHandler
249+
}
250+
251+
applyOptions(d, proxyHandler, opt)
252+
253+
if receivedHandler == nil {
254+
t.Error("option function received nil ProxyHandler, want non-nil")
255+
}
256+
if receivedHandler != proxyHandler {
257+
t.Errorf("option function received %v, want %v", receivedHandler, proxyHandler)
258+
}
259+
})
260+
261+
t.Run("multiple options with handler and transport changes", func(t *testing.T) {
262+
d := &Defaults{}
263+
proxyHandler := &httputil.ReverseProxy{}
264+
customTransport := &http.Transport{}
265+
wrappedHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {})
266+
267+
opt1 := func(d *Defaults) {
268+
d.ProxyHandler = wrappedHandler
269+
}
270+
271+
opt2 := func(d *Defaults) {
272+
d.Transport = customTransport
273+
}
274+
275+
applyOptions(d, proxyHandler, opt1, opt2)
276+
277+
if proxyHandler.Transport != customTransport {
278+
t.Errorf("proxyHandler.Transport = %v, want %v", proxyHandler.Transport, customTransport)
279+
}
280+
})
281+
}

0 commit comments

Comments
 (0)