Skip to content

Commit 5fce2b7

Browse files
committed
refactor proxy message processor that remove returns type
1 parent 5ed946a commit 5fce2b7

File tree

4 files changed

+98
-113
lines changed

4 files changed

+98
-113
lines changed

contrib/envoyproxy/go-control-plane/envoy.go

Lines changed: 14 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -48,15 +48,15 @@ type appsecEnvoyExternalProcessorServer struct {
4848
envoyextproc.ExternalProcessorServer
4949
config AppsecEnvoyConfig
5050
requestCounter atomic.Uint32
51-
messageProcessor proxy.Processor[envoyextproc.ProcessingResponse]
51+
messageProcessor proxy.Processor
5252
}
5353

5454
// AppsecEnvoyExternalProcessorServer creates a new external processor server with AAP enabled
5555
func AppsecEnvoyExternalProcessorServer(userImplementation envoyextproc.ExternalProcessorServer, config AppsecEnvoyConfig) envoyextproc.ExternalProcessorServer {
5656
processor := &appsecEnvoyExternalProcessorServer{
5757
ExternalProcessorServer: userImplementation,
5858
config: config,
59-
messageProcessor: proxy.NewProcessor[envoyextproc.ProcessingResponse](proxy.ProcessorConfig[envoyextproc.ProcessingResponse]{
59+
messageProcessor: proxy.NewProcessor(proxy.ProcessorConfig{
6060
BlockingUnavailable: config.BlockingUnavailable,
6161
BodyParsingSizeLimit: config.BodyParsingSizeLimit,
6262
Framework: "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3",
@@ -76,10 +76,14 @@ func AppsecEnvoyExternalProcessorServer(userImplementation envoyextproc.External
7676
return processor
7777
}
7878

79+
type processServerKeyType struct{}
80+
81+
var processServerKey processServerKeyType
82+
7983
// Process handles the bidirectional stream that Envoy uses to control the filter
8084
func (s *appsecEnvoyExternalProcessorServer) Process(processServer envoyextproc.ExternalProcessor_ProcessServer) error {
8185
var (
82-
ctx = processServer.Context()
86+
ctx = context.WithValue(processServer.Context(), processServerKey, processServer)
8387
currentRequest proxy.RequestState
8488
)
8589

@@ -92,7 +96,7 @@ func (s *appsecEnvoyExternalProcessorServer) Process(processServer envoyextproc.
9296
}()
9397

9498
for {
95-
if err := s.checkContext(ctx); err != nil {
99+
if err := s.checkContext(processServer.Context()); err != nil {
96100
return err
97101
}
98102

@@ -102,18 +106,12 @@ func (s *appsecEnvoyExternalProcessorServer) Process(processServer envoyextproc.
102106
}
103107

104108
// Process the message
105-
processingResponse, err := s.processMessage(ctx, &processingRequest, &currentRequest)
109+
err := s.processMessage(ctx, &processingRequest, &currentRequest)
106110
if err != nil && err != io.EOF {
107111
instr.Logger().Error("external_processing: error processing request: %s\n", err.Error())
108112
return err
109113
}
110114

111-
if processingResponse != nil {
112-
if err := s.sendResponse(processServer, processingResponse); err != nil {
113-
return err
114-
}
115-
}
116-
117115
if err == io.EOF {
118116
return nil
119117
}
@@ -144,11 +142,11 @@ func (s *appsecEnvoyExternalProcessorServer) handleReceiveError(err error) error
144142
}
145143

146144
// processMessage processes a single message based on its type
147-
func (s *appsecEnvoyExternalProcessorServer) processMessage(ctx context.Context, req *envoyextproc.ProcessingRequest, currentRequest *proxy.RequestState) (action *envoyextproc.ProcessingResponse, err error) {
145+
func (s *appsecEnvoyExternalProcessorServer) processMessage(ctx context.Context, req *envoyextproc.ProcessingRequest, currentRequest *proxy.RequestState) (err error) {
148146
switch v := req.Request.(type) {
149147
case *envoyextproc.ProcessingRequest_RequestHeaders:
150-
*currentRequest, action, err = s.messageProcessor.OnRequestHeaders(ctx, &messageRequestHeaders{ProcessingRequest: req, HttpHeaders: req.GetRequestHeaders(), integration: s.config.Integration})
151-
return action, err
148+
*currentRequest, err = s.messageProcessor.OnRequestHeaders(ctx, &messageRequestHeaders{ProcessingRequest: req, HttpHeaders: req.GetRequestHeaders(), integration: s.config.Integration})
149+
return err
152150

153151
case *envoyextproc.ProcessingRequest_RequestBody:
154152
return s.messageProcessor.OnRequestBody(&messageBody{ProcessingRequest: req, HttpBody: req.GetRequestBody()}, currentRequest)
@@ -158,7 +156,7 @@ func (s *appsecEnvoyExternalProcessorServer) processMessage(ctx context.Context,
158156
// Handle case where request headers were never sent
159157
instr.Logger().Warn("external_processing: can't process the response: envoy never sent the beginning of the request, this is a known issue" +
160158
" and can happen when a malformed request is sent to Envoy where the header Host is missing. See link to issue https://github.com/envoyproxy/envoy/issues/38022")
161-
return nil, status.Errorf(codes.InvalidArgument, "Error processing response headers from ext_proc: can't process the response")
159+
return status.Errorf(codes.InvalidArgument, "Error processing response headers from ext_proc: can't process the response")
162160
}
163161
return s.messageProcessor.OnResponseHeaders(&responseHeadersEnvoy{ProcessingRequest: req, HttpHeaders: req.GetResponseHeaders()}, currentRequest)
164162

@@ -172,18 +170,6 @@ func (s *appsecEnvoyExternalProcessorServer) processMessage(ctx context.Context,
172170
return s.messageProcessor.OnResponseTrailers(currentRequest)
173171

174172
default:
175-
return nil, status.Errorf(codes.Unknown, "Unknown request type: %T", v)
173+
return status.Errorf(codes.Unknown, "Unknown request type: %T", v)
176174
}
177175
}
178-
179-
// sendResponse sends a processing response back to Envoy
180-
func (s *appsecEnvoyExternalProcessorServer) sendResponse(processServer envoyextproc.ExternalProcessor_ProcessServer, response *envoyextproc.ProcessingResponse) error {
181-
instr.Logger().Debug("external_processing: sending response: %v\n", response)
182-
183-
if err := processServer.SendMsg(response); err != nil {
184-
instr.Logger().Error("external_processing: error sending response (probably because of an Envoy timeout): %s", err.Error())
185-
return status.Errorf(codes.Unknown, "Error sending response (probably because of an Envoy timeout): %s", err.Error())
186-
}
187-
188-
return nil
189-
}

contrib/envoyproxy/go-control-plane/envoy_http.go

Lines changed: 36 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package gocontrolplane
77

88
import (
9+
"context"
910
"fmt"
1011
"math"
1112
"net/http"
@@ -22,11 +23,20 @@ import (
2223
"google.golang.org/grpc/status"
2324
)
2425

25-
func continueActionFunc(options proxy.ContinueActionOptions) (envoyextproc.ProcessingResponse, error) {
26+
func continueActionFunc(ctx context.Context, options proxy.ContinueActionOptions) error {
2627
if len(options.HeaderMutations) > 0 || options.Body {
27-
return buildHeadersResponse(options), nil
28+
return buildHeadersResponse(ctx, options)
2829
}
2930

31+
emptyResp, err := buildEmptyContinueResponse(options)
32+
if err != nil {
33+
return err
34+
}
35+
36+
return sendResponse(ctx, &emptyResp)
37+
}
38+
39+
func buildEmptyContinueResponse(options proxy.ContinueActionOptions) (envoyextproc.ProcessingResponse, error) {
3040
common := &envoyextproc.CommonResponse{
3141
Status: envoyextproc.CommonResponse_CONTINUE,
3242
}
@@ -49,15 +59,15 @@ func continueActionFunc(options proxy.ContinueActionOptions) (envoyextproc.Proce
4959
}
5060
}
5161

52-
func blockActionFunc(data proxy.BlockActionOptions) (envoyextproc.ProcessingResponse, error) {
62+
func blockActionFunc(ctx context.Context, data proxy.BlockActionOptions) error {
5363
blockedHeaders := convertHeadersToEnvoy(data.Headers)
5464

5565
var statusCode int32
5666
if data.StatusCode > 0 && data.StatusCode <= math.MaxInt32 {
5767
statusCode = int32(data.StatusCode)
5868
}
5969

60-
return envoyextproc.ProcessingResponse{
70+
resp := envoyextproc.ProcessingResponse{
6171
Response: &envoyextproc.ProcessingResponse_ImmediateResponse{
6272
ImmediateResponse: &envoyextproc.ImmediateResponse{
6373
Status: &envoytypes.HttpStatus{
@@ -72,11 +82,13 @@ func blockActionFunc(data proxy.BlockActionOptions) (envoyextproc.ProcessingResp
7282
},
7383
},
7484
},
75-
}, nil
85+
}
86+
87+
return sendResponse(ctx, &resp)
7688
}
7789

7890
// buildHeadersResponse creates an Envoy HeadersResponse from provided data answering a RequestHeaders or ResponseHeaders message
79-
func buildHeadersResponse(data proxy.ContinueActionOptions) envoyextproc.ProcessingResponse {
91+
func buildHeadersResponse(ctx context.Context, data proxy.ContinueActionOptions) error {
8092
var modeOverride *envoyextprocfilter.ProcessingMode
8193
if data.Body {
8294
modeOverride = &envoyextprocfilter.ProcessingMode{RequestBodyMode: envoyextprocfilter.ProcessingMode_STREAMED}
@@ -102,7 +114,24 @@ func buildHeadersResponse(data proxy.ContinueActionOptions) envoyextproc.Process
102114
}
103115
}
104116

105-
return processingResponse
117+
return sendResponse(ctx, &processingResponse)
118+
}
119+
120+
// sendResponse sends a processing response back to Envoy
121+
func sendResponse(ctx context.Context, response *envoyextproc.ProcessingResponse) error {
122+
instr.Logger().Debug("external_processing: sending response: %v\n", response)
123+
124+
processServer, _ := ctx.Value(processServerKey).(envoyextproc.ExternalProcessor_ProcessServer)
125+
if processServer == nil {
126+
return status.Errorf(codes.Unknown, "No gRPC stream available to send the response")
127+
}
128+
129+
if err := processServer.SendMsg(response); err != nil {
130+
instr.Logger().Error("external_processing: error sending response (probably because of an Envoy timeout): %s", err.Error())
131+
return status.Errorf(codes.Unknown, "Error sending response (probably because of an Envoy timeout): %s", err.Error())
132+
}
133+
134+
return nil
106135
}
107136

108137
// convertHeadersToEnvoy converts standard HTTP headers to Envoy HeaderValueOption format

contrib/envoyproxy/go-control-plane/proxy/config.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,15 @@ type BlockActionOptions struct {
3131
}
3232

3333
// ProcessorConfig contains configuration for the message processor
34-
type ProcessorConfig[O any] struct {
34+
type ProcessorConfig struct {
3535
context.Context
3636
BlockingUnavailable bool
3737
BodyParsingSizeLimit int
3838
Framework string
3939

4040
// ContinueMessageFunc is a function that generates a continue message of type O based on the provided ContinueActionOptions.
41-
ContinueMessageFunc func(ContinueActionOptions) (O, error)
41+
ContinueMessageFunc func(context.Context, ContinueActionOptions) error
4242

4343
// BlockMessageFunc is a function that generates a block message of type O based on the provided status code, headers, and body.
44-
BlockMessageFunc func(options BlockActionOptions) (O, error)
44+
BlockMessageFunc func(context.Context, BlockActionOptions) error
4545
}

0 commit comments

Comments
 (0)