Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 0 additions & 19 deletions pkg/queue/health/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"io"
"net/http"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

netheader "knative.dev/networking/pkg/http/header"
Expand All @@ -35,36 +34,18 @@ func ProbeHandler(tracer trace.Tracer, prober func() bool) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
ph := netheader.GetKnativeProbeValue(r)

_, probeSpan := tracer.Start(r.Context(), "kn.queueproxy.probe")
defer probeSpan.End()

if ph != queue.Name {
http.Error(w, badProbeTemplate+ph, http.StatusBadRequest)
probeSpan.AddEvent("kn.queueproxy.probe.error",
trace.WithAttributes(
attribute.String("reason", badProbeTemplate+ph),
),
)
return
}

if prober == nil {
http.Error(w, "no probe", http.StatusInternalServerError)
probeSpan.AddEvent("kn.queueproxy.probe.error",
trace.WithAttributes(
attribute.String("reason", "no probe"),
),
)
return
}

if !prober() {
w.WriteHeader(http.StatusServiceUnavailable)
probeSpan.AddEvent("kn.queueproxy.probe.error",
trace.WithAttributes(
attribute.String("reason", "container not healthy"),
),
)
return
}

Expand Down
5 changes: 3 additions & 2 deletions pkg/queue/sharedmain/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func Main(opts ...Option) error {
}
}()

d.Transport = buildTransport(env, tp)
d.Transport = buildTransport(env, tp, mp)

// allow extensions to read d and return modified context and transport
for _, opts := range opts {
Expand Down Expand Up @@ -344,7 +344,7 @@ func buildProbe(logger *zap.SugaredLogger, encodedProbe string, autodetectHTTP2
return readiness.NewProbe(coreProbes)
}

func buildTransport(env config, tp trace.TracerProvider) http.RoundTripper {
func buildTransport(env config, tp trace.TracerProvider, mp metric.MeterProvider) http.RoundTripper {
maxIdleConns := 1000 // TODO: somewhat arbitrary value for CC=0, needs experimental validation.
if env.ContainerConcurrency > 0 {
maxIdleConns = env.ContainerConcurrency
Expand All @@ -355,6 +355,7 @@ func buildTransport(env config, tp trace.TracerProvider) http.RoundTripper {
return otelhttp.NewTransport(
transport,
otelhttp.WithTracerProvider(tp),
otelhttp.WithMeterProvider(mp),
)
}

Expand Down
104 changes: 24 additions & 80 deletions pkg/queue/sharedmain/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,65 +32,26 @@ import (
"go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"

netheader "knative.dev/networking/pkg/http/header"
netstats "knative.dev/networking/pkg/http/stats"
pkgnet "knative.dev/pkg/network"
"knative.dev/serving/pkg/queue"
"knative.dev/serving/pkg/queue/health"
)

func TestQueueTraceSpans(t *testing.T) {
testcases := []struct {
name string
prober func() bool
wantSpans int
requestHeader string
infiniteCC bool
probeWillFail bool
probeTrace bool
}{{
name: "proxy trace",
prober: func() bool { return true },
wantSpans: 3,
requestHeader: "",
probeWillFail: false,
probeTrace: false,
}, {
name: "proxy trace, no breaker",
prober: func() bool { return true },
wantSpans: 2,
requestHeader: "",
probeWillFail: false,
probeTrace: false,
infiniteCC: true,
}, {
name: "true prober function with probe trace",
prober: func() bool { return true },
wantSpans: 1,
requestHeader: queue.Name,
probeWillFail: false,
probeTrace: true,
}, {
name: "unexpected probe header",
prober: func() bool { return true },
wantSpans: 1,
requestHeader: "test-probe",
probeWillFail: true,
probeTrace: true,
}, {
name: "nil prober function",
prober: nil,
wantSpans: 1,
requestHeader: queue.Name,
probeWillFail: true,
probeTrace: true,
}, {
name: "false prober function",
prober: func() bool { return false },
wantSpans: 1,
requestHeader: queue.Name,
probeWillFail: true,
probeTrace: true,
}}

for _, tc := range testcases {
Expand All @@ -105,47 +66,37 @@ func TestQueueTraceSpans(t *testing.T) {
writer := httptest.NewRecorder()
req := httptest.NewRequest(http.MethodPost, "http://example.com", nil)

if !tc.probeTrace {
t.Log("skip probe trace")
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))
defer server.Close()
serverURL, _ := url.Parse(server.URL)

proxy := httputil.NewSingleHostReverseProxy(serverURL)
params := queue.BreakerParams{QueueDepth: 10, MaxConcurrency: 10, InitialCapacity: 10}
var breaker *queue.Breaker
if !tc.infiniteCC {
breaker = queue.NewBreaker(params)
}
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))
defer server.Close()
serverURL, _ := url.Parse(server.URL)

proxy := httputil.NewSingleHostReverseProxy(serverURL)
params := queue.BreakerParams{QueueDepth: 10, MaxConcurrency: 10, InitialCapacity: 10}
var breaker *queue.Breaker
if !tc.infiniteCC {
breaker = queue.NewBreaker(params)
}

proxy.Transport = otelhttp.NewTransport(
pkgnet.AutoTransport,
otelhttp.WithMeterProvider(mp),
otelhttp.WithTracerProvider(tp),
otelhttp.WithSpanNameFormatter(func(op string, r *http.Request) string {
return r.URL.Path
}),
)
proxy.Transport = otelhttp.NewTransport(
pkgnet.AutoTransport,
otelhttp.WithMeterProvider(mp),
otelhttp.WithTracerProvider(tp),
otelhttp.WithSpanNameFormatter(func(op string, r *http.Request) string {
return r.URL.Path
}),
)

h := queue.ProxyHandler(tracer, breaker, netstats.NewRequestStats(time.Now()), proxy)
h(writer, req)
} else {
t.Log("probe trace")
h := health.ProbeHandler(tracer, tc.prober)
req.Header.Set(netheader.ProbeKey, tc.requestHeader)
h(writer, req)
}
h := queue.ProxyHandler(tracer, breaker, netstats.NewRequestStats(time.Now()), proxy)
h(writer, req)

gotSpans := exporter.GetSpans()
if len(gotSpans) != tc.wantSpans {
t.Errorf("Got %d spans, expected %d", len(gotSpans), tc.wantSpans)
}
spanNames := []string{"kn.queueproxy.probe", "/", "kn.queueproxy.proxy"}
if !tc.probeTrace {
spanNames = spanNames[1:]
}
spanNames := []string{"/", "kn.queueproxy.proxy"}

// We want to add `queueWait` span only if there is possible queueing
// and if the tests actually expects tracing.
if !tc.infiniteCC && tc.wantSpans > 1 {
Expand All @@ -161,13 +112,6 @@ func TestQueueTraceSpans(t *testing.T) {
if gotSpans[i].Name != spanName {
t.Errorf("Span[%d].Name = %q, want: %q", i, gotSpans[i].Name, spanName)
}
if tc.probeWillFail {
if len(gotSpans[i].Events) == 0 {
t.Error("Expected error as value for failed span Annotation, got empty Annotation")
} else if gotSpans[i].Events[0].Name != "kn.queueproxy.probe.error" {
t.Errorf("Expected error as value for failed span Annotation, got %q", gotSpans[i].Events[0].Name)
}
}
}
})
}
Expand Down
Loading