-
Notifications
You must be signed in to change notification settings - Fork 492
refactor(contrib/envoy): implement message_processor for logic re-use #3860
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
062d617 to
fa1be8a
Compare
BenchmarksBenchmark execution time: 2025-08-25 13:41:13 Comparing candidate commit 6db4230 in PR branch Found 0 performance improvements and 0 performance regressions! Performance is the same for 24 metrics, 0 unstable metrics. |
048876a to
049d56e
Compare
…ment needed interfaces to work with the mp
9483abb to
6db4230
Compare
| "sync/atomic" | ||
| "time" | ||
|
|
||
| "github.com/DataDog/dd-trace-go/contrib/envoyproxy/go-control-plane/v2/message_processor" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually underscores in package names are banned by the geneva convention. Something like msgproc should be enough. Usually I encourage longer names but with package names this create a bunch of wierd issues so no
| } | ||
|
|
||
| // Handle the action returned by the message processor | ||
| processingResponse, err := s.handleAction(action, &processingRequest) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you really need a pointer to the processing request here or a copy would be fine?
| var ( | ||
| response *envoyextproc.ProcessingResponse | ||
| action message_processor.Action | ||
| err error | ||
| reqState *message_processor.RequestState | ||
| ) | ||
| response, *currentRequest, err = s.messageProcessor.ProcessRequestHeaders(ctx, v) | ||
| return response, err | ||
| reqState, action, err = s.messageProcessor.OnRequestHeaders(ctx, &requestHeadersEnvoy{v, s.config.Integration}) | ||
| if err == nil { | ||
| *currentRequest = *reqState | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| var ( | |
| response *envoyextproc.ProcessingResponse | |
| action message_processor.Action | |
| err error | |
| reqState *message_processor.RequestState | |
| ) | |
| response, *currentRequest, err = s.messageProcessor.ProcessRequestHeaders(ctx, v) | |
| return response, err | |
| reqState, action, err = s.messageProcessor.OnRequestHeaders(ctx, &requestHeadersEnvoy{v, s.config.Integration}) | |
| if err == nil { | |
| *currentRequest = *reqState | |
| } | |
| var ( | |
| action message_processor.Action | |
| err error | |
| ) | |
| *currentRequest , action, err = s.messageProcessor.OnRequestHeaders(ctx, &requestHeadersEnvoy{v, s.config.Integration}) |
| func (a *requestHeadersEnvoy) EndOfStream() bool { | ||
| return a.req.RequestHeaders.GetEndOfStream() | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This kind of boilerplate can be avoided by using embedding on the right object
| func (a *requestBodyEnvoy) Body() []byte { | ||
| return a.req.RequestBody.GetBody() | ||
| } | ||
|
|
||
| func (a *requestBodyEnvoy) EndOfStream() bool { | ||
| return a.req.RequestBody.GetEndOfStream() | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here about embedding
| headerMutation, err := reqState.PropagationHeaders() | ||
| if err != nil { | ||
| reqState.Close() | ||
| return nil, Action{}, err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same for the action{}
| if !req.EndOfStream() && mp.isBodySupported(httpReq.Header.Get("Content-Type"), mp.config) { | ||
| requestBody = true | ||
| reqState.AwaitingRequestBody = true | ||
| // Todo: Set telemetry body size (using content-length) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't we do it here ?
| if mp.config.BodyParsingSizeLimit <= 0 || !reqState.AwaitingRequestBody { | ||
| mp.instr.Logger().Error("message_processor: the body parsing has been wrongly configured. " + | ||
| "Please refer to the official documentation for guidance on the proper settings or contact support.") | ||
| return newContinueAction(), nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what do you mean by wrongly configured. I don't see how it could be actionable for a user.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am trying to understand why the types and funcs in this file are here if the interfaces from this package are delegating this job
| } | ||
|
|
||
| if currentRequest.Blocked { | ||
| if _, ok := processingResponse.Response.(*envoyextproc.ProcessingResponse_ImmediateResponse); ok { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wasn't the ActionType made for this kind of cases ?
|
supersed by #3945 |
What does this PR do?
This PR refactor how the envoy contrib is handling messages. Now the processing of analysis messages (request headers, request body, response headers, response body) is now implemented agnostic to the framework its instrumenting. This is implemented in the package
message_processor.Motivation
The motivation to create a new package
message_processoris the ability to re-use the message processing logic as an external processor. This package will be used in part of #3912 that implement the support of messages processing for HAProxy.The package
message_processoris implemented inside the envoy contrib to keep the agility of deploying releases out of the normal releases process of the tracer (using the-docker.xtag).Reviewer's Checklist
./scripts/lint.shlocally.Unsure? Have a question? Request a review!