Skip to content

Commit edf0724

Browse files
authored
feat: implement apiToken failover mechanism (alibaba#1256)
1 parent cf98052 commit edf0724

33 files changed

+1552
-1225
lines changed

plugins/wasm-go/extensions/ai-proxy/README.md

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,16 @@ description: AI 代理插件配置参考
3131

3232
`provider`的配置字段说明如下:
3333

34-
| 名称 | 数据类型 | 填写要求 | 默认值 | 描述 |
35-
| -------------- | --------------- | -------- | ------ | ------------------------------------------------------------------------------------------------------------------------------------------------------------- |
36-
| `type` | string | 必填 | - | AI 服务提供商名称 |
37-
| `apiTokens` | array of string | 非必填 | - | 用于在访问 AI 服务时进行认证的令牌。如果配置了多个 token,插件会在请求时随机进行选择。部分服务提供商只支持配置一个 token。 |
38-
| `timeout` | number | 非必填 | - | 访问 AI 服务的超时时间。单位为毫秒。默认值为 120000,即 2 分钟 |
39-
| `modelMapping` | map of string | 非必填 | - | AI 模型映射表,用于将请求中的模型名称映射为服务提供商支持模型名称。<br/>1. 支持前缀匹配。例如用 "gpt-3-*" 匹配所有名称以“gpt-3-”开头的模型;<br/>2. 支持使用 "*" 为键来配置通用兜底映射关系;<br/>3. 如果映射的目标名称为空字符串 "",则表示保留原模型名称。 |
40-
| `protocol` | string | 非必填 | - | 插件对外提供的 API 接口契约。目前支持以下取值:openai(默认值,使用 OpenAI 的接口契约)、original(使用目标服务提供商的原始接口契约) |
41-
| `context` | object | 非必填 | - | 配置 AI 对话上下文信息 |
42-
| `customSettings` | array of customSetting | 非必填 | - | 为AI请求指定覆盖或者填充参数 |
34+
| 名称 | 数据类型 | 填写要求 | 默认值 | 描述 |
35+
|------------------| --------------- | -------- | ------ |-----------------------------------------------------------------------------------------------------------------------------------------------------------|
36+
| `type` | string | 必填 | - | AI 服务提供商名称 |
37+
| `apiTokens` | array of string | 非必填 | - | 用于在访问 AI 服务时进行认证的令牌。如果配置了多个 token,插件会在请求时随机进行选择。部分服务提供商只支持配置一个 token。 |
38+
| `timeout` | number | 非必填 | - | 访问 AI 服务的超时时间。单位为毫秒。默认值为 120000,即 2 分钟 |
39+
| `modelMapping` | map of string | 非必填 | - | AI 模型映射表,用于将请求中的模型名称映射为服务提供商支持模型名称。<br/>1. 支持前缀匹配。例如用 "gpt-3-*" 匹配所有名称以“gpt-3-”开头的模型;<br/>2. 支持使用 "*" 为键来配置通用兜底映射关系;<br/>3. 如果映射的目标名称为空字符串 "",则表示保留原模型名称。 |
40+
| `protocol` | string | 非必填 | - | 插件对外提供的 API 接口契约。目前支持以下取值:openai(默认值,使用 OpenAI 的接口契约)、original(使用目标服务提供商的原始接口契约) |
41+
| `context` | object | 非必填 | - | 配置 AI 对话上下文信息 |
42+
| `customSettings` | array of customSetting | 非必填 | - | 为AI请求指定覆盖或者填充参数 |
43+
| `failover` | object | 非必填 | - | 配置 apiToken 的 failover 策略,当 apiToken 不可用时,将其移出 apiToken 列表,待健康检测通过后重新添加回 apiToken 列表 |
4344

4445
`context`的配置字段说明如下:
4546

@@ -75,6 +76,16 @@ custom-setting会遵循如下表格,根据`name`和协议来替换对应的字
7576
如果启用了raw模式,custom-setting会直接用输入的`name``value`去更改请求中的json内容,而不对参数名称做任何限制和修改。
7677
对于大多数协议,custom-setting都会在json内容的根路径修改或者填充参数。对于`qwen`协议,ai-proxy会在json的`parameters`子路径下做配置。对于`gemini`协议,则会在`generation_config`子路径下做配置。
7778

79+
`failover` 的配置字段说明如下:
80+
81+
| 名称 | 数据类型 | 填写要求 | 默认值 | 描述 |
82+
|------------------|--------|------|-------|-----------------------------|
83+
| enabled | bool | 非必填 | false | 是否启用 apiToken 的 failover 机制 |
84+
| failureThreshold | int | 非必填 | 3 | 触发 failover 连续请求失败的阈值(次数) |
85+
| successThreshold | int | 非必填 | 1 | 健康检测的成功阈值(次数) |
86+
| healthCheckInterval | int | 非必填 | 5000 | 健康检测的间隔时间,单位毫秒 |
87+
| healthCheckTimeout | int | 非必填 | 5000 | 健康检测的超时时间,单位毫秒 |
88+
| healthCheckModel | string | 必填 | | 健康检测使用的模型 |
7889

7990
### 提供商特有配置
8091

plugins/wasm-go/extensions/ai-proxy/config/config.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
package config
22

33
import (
4-
"github.com/tidwall/gjson"
5-
64
"github.com/alibaba/higress/plugins/wasm-go/extensions/ai-proxy/provider"
5+
"github.com/alibaba/higress/plugins/wasm-go/pkg/wrapper"
6+
"github.com/tidwall/gjson"
77
)
88

99
// @Name ai-proxy
@@ -75,13 +75,17 @@ func (c *PluginConfig) Validate() error {
7575
return nil
7676
}
7777

78-
func (c *PluginConfig) Complete() error {
78+
func (c *PluginConfig) Complete(log wrapper.Log) error {
7979
if c.activeProviderConfig == nil {
8080
c.activeProvider = nil
8181
return nil
8282
}
8383
var err error
8484
c.activeProvider, err = provider.CreateProvider(*c.activeProviderConfig)
85+
86+
providerConfig := c.GetProviderConfig()
87+
err = providerConfig.SetApiTokensFailover(log, c.activeProvider)
88+
8589
return err
8690
}
8791

plugins/wasm-go/extensions/ai-proxy/main.go

Lines changed: 35 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,10 @@ func parseGlobalConfig(json gjson.Result, pluginConfig *config.PluginConfig, log
4444
if err := pluginConfig.Validate(); err != nil {
4545
return err
4646
}
47-
if err := pluginConfig.Complete(); err != nil {
47+
if err := pluginConfig.Complete(log); err != nil {
4848
return err
4949
}
50+
5051
return nil
5152
}
5253

@@ -59,9 +60,10 @@ func parseOverrideRuleConfig(json gjson.Result, global config.PluginConfig, plug
5960
if err := pluginConfig.Validate(); err != nil {
6061
return err
6162
}
62-
if err := pluginConfig.Complete(); err != nil {
63+
if err := pluginConfig.Complete(log); err != nil {
6364
return err
6465
}
66+
6567
return nil
6668
}
6769

@@ -80,7 +82,13 @@ func onHttpRequestHeader(ctx wrapper.HttpContext, pluginConfig config.PluginConf
8082
path, _ := url.Parse(rawPath)
8183
apiName := getOpenAiApiName(path.Path)
8284
providerConfig := pluginConfig.GetProviderConfig()
83-
if apiName == "" && !providerConfig.IsOriginal() {
85+
if providerConfig.IsOriginal() {
86+
if handler, ok := activeProvider.(provider.ApiNameHandler); ok {
87+
apiName = handler.GetApiName(path.Path)
88+
}
89+
}
90+
91+
if apiName == "" {
8492
log.Debugf("[onHttpRequestHeader] unsupported path: %s", path.Path)
8593
// _ = util.SendResponse(404, "ai-proxy.unknown_api", util.MimeTypeTextPlain, "API not found: "+path.Path)
8694
log.Debugf("[onHttpRequestHeader] no send response")
@@ -89,8 +97,11 @@ func onHttpRequestHeader(ctx wrapper.HttpContext, pluginConfig config.PluginConf
8997
ctx.SetContext(ctxKeyApiName, apiName)
9098

9199
if handler, ok := activeProvider.(provider.RequestHeadersHandler); ok {
92-
// Disable the route re-calculation since the plugin may modify some headers related to the chosen route.
100+
// Disable the route re-calculation since the plugin may modify some headers related to the chosen route.
93101
ctx.DisableReroute()
102+
// Set the apiToken for the current request.
103+
providerConfig.SetApiTokenInUse(ctx, log)
104+
94105
hasRequestBody := wrapper.HasRequestBody()
95106
action, err := handler.OnRequestHeaders(ctx, apiName, log)
96107
if err == nil {
@@ -102,6 +113,7 @@ func onHttpRequestHeader(ctx wrapper.HttpContext, pluginConfig config.PluginConf
102113
}
103114
return action
104115
}
116+
105117
_ = util.SendResponse(500, "ai-proxy.proc_req_headers_failed", util.MimeTypeTextPlain, fmt.Sprintf("failed to process request headers: %v", err))
106118
return types.ActionContinue
107119
}
@@ -156,15 +168,24 @@ func onHttpResponseHeaders(ctx wrapper.HttpContext, pluginConfig config.PluginCo
156168

157169
log.Debugf("[onHttpResponseHeaders] provider=%s", activeProvider.GetProviderType())
158170

171+
providerConfig := pluginConfig.GetProviderConfig()
172+
apiTokenInUse := providerConfig.GetApiTokenInUse(ctx)
173+
159174
status, err := proxywasm.GetHttpResponseHeader(":status")
160175
if err != nil || status != "200" {
161176
if err != nil {
162177
log.Errorf("unable to load :status header from response: %v", err)
163178
}
164179
ctx.DontReadResponseBody()
180+
providerConfig.OnRequestFailed(ctx, apiTokenInUse, log)
181+
165182
return types.ActionContinue
166183
}
167184

185+
// Reset ctxApiTokenRequestFailureCount if the request is successful,
186+
// the apiToken is removed only when the number of consecutive request failures exceeds the threshold.
187+
providerConfig.ResetApiTokenRequestFailureCount(apiTokenInUse, log)
188+
168189
if handler, ok := activeProvider.(provider.ResponseHeadersHandler); ok {
169190
apiName, _ := ctx.GetContext(ctxKeyApiName).(provider.ApiName)
170191
action, err := handler.OnResponseHeaders(ctx, apiName, log)
@@ -233,16 +254,6 @@ func onHttpResponseBody(ctx wrapper.HttpContext, pluginConfig config.PluginConfi
233254
return types.ActionContinue
234255
}
235256

236-
func getOpenAiApiName(path string) provider.ApiName {
237-
if strings.HasSuffix(path, "/v1/chat/completions") {
238-
return provider.ApiNameChatCompletion
239-
}
240-
if strings.HasSuffix(path, "/v1/embeddings") {
241-
return provider.ApiNameEmbeddings
242-
}
243-
return ""
244-
}
245-
246257
func checkStream(ctx *wrapper.HttpContext, log *wrapper.Log) {
247258
contentType, err := proxywasm.GetHttpResponseHeader("Content-Type")
248259
if err != nil || !strings.HasPrefix(contentType, "text/event-stream") {
@@ -252,3 +263,13 @@ func checkStream(ctx *wrapper.HttpContext, log *wrapper.Log) {
252263
(*ctx).BufferResponseBody()
253264
}
254265
}
266+
267+
func getOpenAiApiName(path string) provider.ApiName {
268+
if strings.HasSuffix(path, "/v1/chat/completions") {
269+
return provider.ApiNameChatCompletion
270+
}
271+
if strings.HasSuffix(path, "/v1/embeddings") {
272+
return provider.ApiNameEmbeddings
273+
}
274+
return ""
275+
}
Lines changed: 9 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,12 @@
11
package provider
22

33
import (
4-
"encoding/json"
54
"errors"
6-
"fmt"
7-
"github.com/higress-group/proxy-wasm-go-sdk/proxywasm"
8-
"github.com/higress-group/proxy-wasm-go-sdk/proxywasm/types"
5+
"net/http"
96

107
"github.com/alibaba/higress/plugins/wasm-go/extensions/ai-proxy/util"
118
"github.com/alibaba/higress/plugins/wasm-go/pkg/wrapper"
9+
"github.com/higress-group/proxy-wasm-go-sdk/proxywasm/types"
1210
)
1311

1412
// ai360Provider is the provider for 360 OpenAI service.
@@ -46,10 +44,7 @@ func (m *ai360Provider) OnRequestHeaders(ctx wrapper.HttpContext, apiName ApiNam
4644
if apiName != ApiNameChatCompletion && apiName != ApiNameEmbeddings {
4745
return types.ActionContinue, errUnsupportedApiName
4846
}
49-
_ = util.OverwriteRequestHost(ai360Domain)
50-
_ = proxywasm.RemoveHttpRequestHeader("Accept-Encoding")
51-
_ = proxywasm.RemoveHttpRequestHeader("Content-Length")
52-
_ = proxywasm.ReplaceHttpRequestHeader("Authorization", m.config.GetRandomToken())
47+
m.config.handleRequestHeaders(m, ctx, apiName, log)
5348
// Delay the header processing to allow changing streaming mode in OnRequestBody
5449
return types.HeaderStopIteration, nil
5550
}
@@ -58,47 +53,12 @@ func (m *ai360Provider) OnRequestBody(ctx wrapper.HttpContext, apiName ApiName,
5853
if apiName != ApiNameChatCompletion && apiName != ApiNameEmbeddings {
5954
return types.ActionContinue, errUnsupportedApiName
6055
}
61-
if apiName == ApiNameChatCompletion {
62-
return m.onChatCompletionRequestBody(ctx, body, log)
63-
}
64-
if apiName == ApiNameEmbeddings {
65-
return m.onEmbeddingsRequestBody(ctx, body, log)
66-
}
67-
return types.ActionContinue, errUnsupportedApiName
68-
}
69-
70-
func (m *ai360Provider) onChatCompletionRequestBody(ctx wrapper.HttpContext, body []byte, log wrapper.Log) (types.Action, error) {
71-
request := &chatCompletionRequest{}
72-
if err := decodeChatCompletionRequest(body, request); err != nil {
73-
return types.ActionContinue, err
74-
}
75-
if request.Model == "" {
76-
return types.ActionContinue, errors.New("missing model in chat completion request")
77-
}
78-
// 映射模型
79-
mappedModel := getMappedModel(request.Model, m.config.modelMapping, log)
80-
if mappedModel == "" {
81-
return types.ActionContinue, errors.New("model becomes empty after applying the configured mapping")
82-
}
83-
ctx.SetContext(ctxKeyFinalRequestModel, mappedModel)
84-
request.Model = mappedModel
85-
return types.ActionContinue, replaceJsonRequestBody(request, log)
56+
return m.config.handleRequestBody(m, m.contextCache, ctx, apiName, body, log)
8657
}
8758

88-
func (m *ai360Provider) onEmbeddingsRequestBody(ctx wrapper.HttpContext, body []byte, log wrapper.Log) (types.Action, error) {
89-
request := &embeddingsRequest{}
90-
if err := json.Unmarshal(body, request); err != nil {
91-
return types.ActionContinue, fmt.Errorf("unable to unmarshal request: %v", err)
92-
}
93-
if request.Model == "" {
94-
return types.ActionContinue, errors.New("missing model in embeddings request")
95-
}
96-
// 映射模型
97-
mappedModel := getMappedModel(request.Model, m.config.modelMapping, log)
98-
if mappedModel == "" {
99-
return types.ActionContinue, errors.New("model becomes empty after applying the configured mapping")
100-
}
101-
ctx.SetContext(ctxKeyFinalRequestModel, mappedModel)
102-
request.Model = mappedModel
103-
return types.ActionContinue, replaceJsonRequestBody(request, log)
59+
func (m *ai360Provider) TransformRequestHeaders(ctx wrapper.HttpContext, apiName ApiName, headers http.Header, log wrapper.Log) {
60+
util.OverwriteRequestHostHeader(headers, ai360Domain)
61+
util.OverwriteRequestAuthorizationHeader(headers, "Authorization "+m.config.GetApiTokenInUse(ctx))
62+
headers.Del("Accept-Encoding")
63+
headers.Del("Content-Length")
10464
}

plugins/wasm-go/extensions/ai-proxy/provider/azure.go

Lines changed: 13 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,15 @@ package provider
33
import (
44
"errors"
55
"fmt"
6+
"net/http"
67
"net/url"
78

89
"github.com/alibaba/higress/plugins/wasm-go/extensions/ai-proxy/util"
910
"github.com/alibaba/higress/plugins/wasm-go/pkg/wrapper"
10-
"github.com/higress-group/proxy-wasm-go-sdk/proxywasm"
1111
"github.com/higress-group/proxy-wasm-go-sdk/proxywasm/types"
1212
)
1313

1414
// azureProvider is the provider for Azure OpenAI service.
15-
1615
type azureProviderInitializer struct {
1716
}
1817

@@ -55,47 +54,23 @@ func (m *azureProvider) GetProviderType() string {
5554
}
5655

5756
func (m *azureProvider) OnRequestHeaders(ctx wrapper.HttpContext, apiName ApiName, log wrapper.Log) (types.Action, error) {
58-
_ = util.OverwriteRequestPath(m.serviceUrl.RequestURI())
59-
_ = util.OverwriteRequestHost(m.serviceUrl.Host)
60-
_ = proxywasm.ReplaceHttpRequestHeader("api-key", m.config.apiTokens[0])
61-
if apiName == ApiNameChatCompletion {
62-
_ = proxywasm.RemoveHttpRequestHeader("Content-Length")
63-
} else {
64-
ctx.DontReadRequestBody()
57+
if apiName != ApiNameChatCompletion {
58+
return types.ActionContinue, errUnsupportedApiName
6559
}
60+
m.config.handleRequestHeaders(m, ctx, apiName, log)
6661
return types.ActionContinue, nil
6762
}
6863

6964
func (m *azureProvider) OnRequestBody(ctx wrapper.HttpContext, apiName ApiName, body []byte, log wrapper.Log) (types.Action, error) {
7065
if apiName != ApiNameChatCompletion {
71-
// We don't need to process the request body for other APIs.
72-
return types.ActionContinue, nil
73-
}
74-
request := &chatCompletionRequest{}
75-
if err := decodeChatCompletionRequest(body, request); err != nil {
76-
return types.ActionContinue, err
66+
return types.ActionContinue, errUnsupportedApiName
7767
}
78-
if m.contextCache == nil {
79-
if err := replaceJsonRequestBody(request, log); err != nil {
80-
_ = util.SendResponse(500, "ai-proxy.openai.set_include_usage_failed", util.MimeTypeTextPlain, fmt.Sprintf("failed to replace request body: %v", err))
81-
}
82-
return types.ActionContinue, nil
83-
}
84-
err := m.contextCache.GetContent(func(content string, err error) {
85-
defer func() {
86-
_ = proxywasm.ResumeHttpRequest()
87-
}()
88-
if err != nil {
89-
log.Errorf("failed to load context file: %v", err)
90-
_ = util.SendResponse(500, "ai-proxy.azure.load_ctx_failed", util.MimeTypeTextPlain, fmt.Sprintf("failed to load context file: %v", err))
91-
}
92-
insertContextMessage(request, content)
93-
if err := replaceJsonRequestBody(request, log); err != nil {
94-
_ = util.SendResponse(500, "ai-proxy.azure.insert_ctx_failed", util.MimeTypeTextPlain, fmt.Sprintf("failed to replace request body: %v", err))
95-
}
96-
}, log)
97-
if err == nil {
98-
return types.ActionPause, nil
99-
}
100-
return types.ActionContinue, err
68+
return m.config.handleRequestBody(m, m.contextCache, ctx, apiName, body, log)
69+
}
70+
71+
func (m *azureProvider) TransformRequestHeaders(ctx wrapper.HttpContext, apiName ApiName, headers http.Header, log wrapper.Log) {
72+
util.OverwriteRequestPathHeader(headers, m.serviceUrl.RequestURI())
73+
util.OverwriteRequestHostHeader(headers, m.serviceUrl.Host)
74+
util.OverwriteRequestAuthorizationHeader(headers, "api-key "+m.config.GetApiTokenInUse(ctx))
75+
headers.Del("Content-Length")
10176
}

0 commit comments

Comments
 (0)