Skip to content

Commit 2cc198a

Browse files
aabchoonacxmathetakeyuzisun
authored
backport: completion stream + metrics and assistant content (#497)
**Commit Message** PR to backport `mockChatCompletionMetrics`, chat completion stream fix, and openai content type. Including: - #459 (468 uses mock components introduced here) - #468 - #486 --------- Signed-off-by: Huamin Chen <[email protected]> Signed-off-by: Ignasi Barrera <[email protected]> Signed-off-by: Takeshi Yoneda <[email protected]> Signed-off-by: Aaron Choo <[email protected]> Co-authored-by: Ignasi Barrera <[email protected]> Co-authored-by: Takeshi Yoneda <[email protected]> Co-authored-by: Dan Sun <[email protected]>
1 parent 90facee commit 2cc198a

19 files changed

+1028
-229
lines changed

cmd/extproc/mainlib/main.go

Lines changed: 64 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,24 @@ import (
1313
"log"
1414
"log/slog"
1515
"net"
16+
"net/http"
1617
"os"
1718
"os/signal"
1819
"strings"
1920
"syscall"
2021
"time"
2122

2223
extprocv3 "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
24+
"github.com/prometheus/client_golang/prometheus"
25+
"github.com/prometheus/client_golang/prometheus/promhttp"
26+
otelprom "go.opentelemetry.io/otel/exporters/prometheus"
27+
"go.opentelemetry.io/otel/metric"
28+
metricsdk "go.opentelemetry.io/otel/sdk/metric"
2329
"google.golang.org/grpc"
2430
"google.golang.org/grpc/health/grpc_health_v1"
2531

2632
"github.com/envoyproxy/ai-gateway/internal/extproc"
33+
"github.com/envoyproxy/ai-gateway/internal/metrics"
2734
"github.com/envoyproxy/ai-gateway/internal/version"
2835
)
2936

@@ -32,6 +39,7 @@ type extProcFlags struct {
3239
configPath string // path to the configuration file.
3340
extProcAddr string // gRPC address for the external processor.
3441
logLevel slog.Level // log level for the external processor.
42+
metricsAddr string // HTTP address for the metrics server.
3543
}
3644

3745
// parseAndValidateFlags parses and validates the flas passed to the external processor.
@@ -51,13 +59,14 @@ func parseAndValidateFlags(args []string) (extProcFlags, error) {
5159
fs.StringVar(&flags.extProcAddr,
5260
"extProcAddr",
5361
":1063",
54-
"gRPC address for the external processor. For example, :1063 or unix:///tmp/ext_proc.sock",
62+
"gRPC address for the external processor. For example, :1063 or unix:///tmp/ext_proc.sock.",
5563
)
5664
logLevelPtr := fs.String(
5765
"logLevel",
5866
"info",
5967
"log level for the external processor. One of 'debug', 'info', 'warn', or 'error'.",
6068
)
69+
fs.StringVar(&flags.metricsAddr, "metricsAddr", ":9190", "HTTP address for the metrics server.")
6170

6271
if err := fs.Parse(args); err != nil {
6372
return extProcFlags{}, fmt.Errorf("failed to parse extProcFlags: %w", err)
@@ -102,11 +111,13 @@ func Main() {
102111
log.Fatalf("failed to listen: %v", err)
103112
}
104113

114+
metricsServer, meter := startMetricsServer(flags.metricsAddr, l)
115+
105116
server, err := extproc.NewServer(l)
106117
if err != nil {
107118
log.Fatalf("failed to create external processor server: %v", err)
108119
}
109-
server.Register("/v1/chat/completions", extproc.NewChatCompletionProcessor)
120+
server.Register("/v1/chat/completions", extproc.ChatCompletionProcessorFactory(metrics.NewChatCompletion(meter)))
110121
server.Register("/v1/models", extproc.NewModelsProcessor)
111122

112123
if err := extproc.StartConfigWatcher(ctx, flags.configPath, server, l, time.Second*5); err != nil {
@@ -116,10 +127,18 @@ func Main() {
116127
s := grpc.NewServer()
117128
extprocv3.RegisterExternalProcessorServer(s, server)
118129
grpc_health_v1.RegisterHealthServer(s, server)
130+
119131
go func() {
120132
<-ctx.Done()
121133
s.GracefulStop()
134+
135+
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
136+
defer cancel()
137+
if err := metricsServer.Shutdown(shutdownCtx); err != nil {
138+
l.Error("Failed to shutdown metrics server gracefully", "error", err)
139+
}
122140
}()
141+
123142
_ = s.Serve(lis)
124143
}
125144

@@ -130,3 +149,46 @@ func listenAddress(addrFlag string) (string, string) {
130149
}
131150
return "tcp", addrFlag
132151
}
152+
153+
// startMetricsServer starts the HTTP server for Prometheus metrics.
154+
func startMetricsServer(addr string, logger *slog.Logger) (*http.Server, metric.Meter) {
155+
registry := prometheus.NewRegistry()
156+
exporter, err := otelprom.New(otelprom.WithRegisterer(registry))
157+
if err != nil {
158+
log.Fatal("failed to create metrics exporter")
159+
}
160+
provider := metricsdk.NewMeterProvider(metricsdk.WithReader(exporter))
161+
meter := provider.Meter("envoyproxy/ai-gateway")
162+
163+
// Create a new HTTP server for metrics.
164+
mux := http.NewServeMux()
165+
166+
// Register the metrics handler.
167+
mux.Handle("/metrics", promhttp.HandlerFor(
168+
registry,
169+
promhttp.HandlerOpts{
170+
EnableOpenMetrics: true,
171+
},
172+
))
173+
174+
// Add a simple health check endpoint.
175+
mux.HandleFunc("/health", func(w http.ResponseWriter, _ *http.Request) {
176+
w.WriteHeader(http.StatusOK)
177+
_, _ = w.Write([]byte("OK"))
178+
})
179+
180+
server := &http.Server{
181+
Addr: addr,
182+
Handler: mux,
183+
ReadHeaderTimeout: 5 * time.Second,
184+
}
185+
186+
go func() {
187+
logger.Info("Starting metrics server", "address", addr)
188+
if err := server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
189+
logger.Error("Metrics server failed", "error", err)
190+
}
191+
}()
192+
193+
return server, meter
194+
}

cmd/extproc/mainlib/main_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@
66
package mainlib
77

88
import (
9+
"io"
910
"log/slog"
11+
"net/http"
1012
"testing"
1113

1214
"github.com/stretchr/testify/assert"
@@ -104,3 +106,19 @@ func TestListenAddress(t *testing.T) {
104106
})
105107
}
106108
}
109+
110+
func TestStartMetricsServer(t *testing.T) {
111+
s, m := startMetricsServer("127.0.0.1:", slog.New(slog.NewTextHandler(io.Discard, &slog.HandlerOptions{})))
112+
t.Cleanup(func() { _ = s.Shutdown(t.Context()) })
113+
114+
require.NotNil(t, s)
115+
require.NotNil(t, m)
116+
117+
require.HTTPStatusCode(t, s.Handler.ServeHTTP, http.MethodGet, "/", nil, http.StatusNotFound)
118+
119+
require.HTTPSuccess(t, s.Handler.ServeHTTP, http.MethodGet, "/health", nil)
120+
require.HTTPBodyContains(t, s.Handler.ServeHTTP, http.MethodGet, "/health", nil, "OK")
121+
122+
require.HTTPSuccess(t, s.Handler.ServeHTTP, http.MethodGet, "/metrics", nil)
123+
require.HTTPBodyContains(t, s.Handler.ServeHTTP, http.MethodGet, "/metrics", nil, "target_info{")
124+
}

go.mod

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,13 @@ require (
1313
github.com/go-logr/logr v1.4.2
1414
github.com/google/cel-go v0.23.2
1515
github.com/google/go-cmp v0.7.0
16-
github.com/openai/openai-go v0.1.0-alpha.59
16+
github.com/openai/openai-go v0.1.0-alpha.61
17+
github.com/prometheus/client_golang v1.20.5
1718
github.com/stretchr/testify v1.10.0
19+
go.opentelemetry.io/otel v1.34.0
20+
go.opentelemetry.io/otel/exporters/prometheus v0.56.0
21+
go.opentelemetry.io/otel/metric v1.34.0
22+
go.opentelemetry.io/otel/sdk/metric v1.34.0
1823
go.uber.org/goleak v1.3.0
1924
go.uber.org/zap v1.27.0
2025
golang.org/x/exp v0.0.0-20250128182459-e0ece0dbea4c
@@ -271,7 +276,6 @@ require (
271276
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect
272277
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
273278
github.com/polyfloyd/go-errorlint v1.7.1 // indirect
274-
github.com/prometheus/client_golang v1.20.5 // indirect
275279
github.com/prometheus/client_model v0.6.1 // indirect
276280
github.com/prometheus/common v0.62.0 // indirect
277281
github.com/prometheus/procfs v0.15.1 // indirect
@@ -343,8 +347,7 @@ require (
343347
go-simpler.org/sloglint v0.9.0 // indirect
344348
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
345349
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.58.0 // indirect
346-
go.opentelemetry.io/otel v1.34.0 // indirect
347-
go.opentelemetry.io/otel/metric v1.34.0 // indirect
350+
go.opentelemetry.io/otel/sdk v1.34.0 // indirect
348351
go.opentelemetry.io/otel/trace v1.34.0 // indirect
349352
go.uber.org/automaxprocs v1.6.0 // indirect
350353
go.uber.org/multierr v1.11.0 // indirect

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -632,8 +632,8 @@ github.com/onsi/ginkgo/v2 v2.22.2 h1:/3X8Panh8/WwhU/3Ssa6rCKqPLuAkVY2I0RoyDLySlU
632632
github.com/onsi/ginkgo/v2 v2.22.2/go.mod h1:oeMosUL+8LtarXBHu/c0bx2D/K9zyQ6uX3cTyztHwsk=
633633
github.com/onsi/gomega v1.36.2 h1:koNYke6TVk6ZmnyHrCXba/T/MoLBXFjeC1PtvYgw0A8=
634634
github.com/onsi/gomega v1.36.2/go.mod h1:DdwyADRjrc825LhMEkD76cHR5+pUnjhUN8GlHlRPHzY=
635-
github.com/openai/openai-go v0.1.0-alpha.59 h1:T3IYwKSCezfIlL9Oi+CGvU03fq0RoH33775S78Ti48Y=
636-
github.com/openai/openai-go v0.1.0-alpha.59/go.mod h1:3SdE6BffOX9HPEQv8IL/fi3LYZ5TUpRYaqGQZbyk11A=
635+
github.com/openai/openai-go v0.1.0-alpha.61 h1:dLJW1Dk15VAwm76xyPsiPt/Ky94NNGoMLETAI1ISoBY=
636+
github.com/openai/openai-go v0.1.0-alpha.61/go.mod h1:3SdE6BffOX9HPEQv8IL/fi3LYZ5TUpRYaqGQZbyk11A=
637637
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
638638
github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM=
639639
github.com/opencontainers/image-spec v1.1.0 h1:8SG7/vwALn54lVB/0yZ/MMwhFrPYtpEHQb2IpWsCzug=

internal/apischema/openai/openai.go

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ package openai
1010

1111
import (
1212
"encoding/json"
13+
"errors"
1314
"fmt"
1415
"strconv"
1516
"strings"
@@ -149,6 +150,28 @@ func (c *ChatCompletionContentPartUserUnionParam) UnmarshalJSON(data []byte) err
149150
return nil
150151
}
151152

153+
type StringOrAssistantRoleContentUnion struct {
154+
Value interface{}
155+
}
156+
157+
func (s *StringOrAssistantRoleContentUnion) UnmarshalJSON(data []byte) error {
158+
var str string
159+
err := json.Unmarshal(data, &str)
160+
if err == nil {
161+
s.Value = str
162+
return nil
163+
}
164+
165+
var content ChatCompletionAssistantMessageParamContent
166+
err = json.Unmarshal(data, &content)
167+
if err == nil {
168+
s.Value = content
169+
return nil
170+
}
171+
172+
return errors.New("cannot unmarshal JSON data as string or assistant content parts")
173+
}
174+
152175
type StringOrArray struct {
153176
Value interface{}
154177
}
@@ -335,7 +358,7 @@ type ChatCompletionAssistantMessageParam struct {
335358
Audio ChatCompletionAssistantMessageParamAudio `json:"audio,omitempty"`
336359
// The contents of the assistant message. Required unless `tool_calls` or
337360
// `function_call` is specified.
338-
Content ChatCompletionAssistantMessageParamContent `json:"content"`
361+
Content StringOrAssistantRoleContentUnion `json:"content"`
339362
// An optional name for the participant. Provides the model information to
340363
// differentiate between participants of the same role.
341364
Name string `json:"name,omitempty"`

internal/apischema/openai/openai_test.go

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,35 @@ func TestOpenAIChatCompletionMessageUnmarshal(t *testing.T) {
153153
{
154154
Value: ChatCompletionAssistantMessageParam{
155155
Role: ChatMessageRoleAssistant,
156-
Content: ChatCompletionAssistantMessageParamContent{Text: ptr.To("you are a helpful assistant")},
156+
Content: StringOrAssistantRoleContentUnion{Value: ChatCompletionAssistantMessageParamContent{Text: ptr.To("you are a helpful assistant")}},
157+
},
158+
Type: ChatMessageRoleAssistant,
159+
},
160+
},
161+
},
162+
},
163+
{
164+
name: "assistant message string",
165+
in: []byte(`{"model": "gpu-o4",
166+
"messages": [
167+
{"role": "assistant", "content": "you are a helpful assistant"},
168+
{"role": "assistant", "content": "{'text': 'you are a helpful assistant'}"}
169+
]}
170+
`),
171+
out: &ChatCompletionRequest{
172+
Model: "gpu-o4",
173+
Messages: []ChatCompletionMessageParamUnion{
174+
{
175+
Value: ChatCompletionAssistantMessageParam{
176+
Role: ChatMessageRoleAssistant,
177+
Content: StringOrAssistantRoleContentUnion{Value: "you are a helpful assistant"},
178+
},
179+
Type: ChatMessageRoleAssistant,
180+
},
181+
{
182+
Value: ChatCompletionAssistantMessageParam{
183+
Role: ChatMessageRoleAssistant,
184+
Content: StringOrAssistantRoleContentUnion{Value: "{'text': 'you are a helpful assistant'}"},
157185
},
158186
Type: ChatMessageRoleAssistant,
159187
},

0 commit comments

Comments
 (0)