-
Notifications
You must be signed in to change notification settings - Fork 186
feat: bulksubscribe http #478
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 7 commits
3f5bcd8
1c45f4e
2193f47
5d4e498
f19a505
fdd425a
cb26d03
597c099
10d2c67
e218130
5131387
81b72ef
203d31c
068ef1c
36b78f5
38bc154
35c37bc
fb4138a
0b41a28
d05856f
92e7eef
f315c9e
0f2d9bb
f4a3b65
3577081
5e1f64d
de720d9
05f3ad7
300e5ac
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,6 +35,8 @@ type Service interface { | |
// AddTopicEventHandler appends provided event handler with its topic and optional metadata to the service. | ||
// Note, retries are only considered when there is an error. Lack of error is considered as a success | ||
AddTopicEventHandler(sub *Subscription, fn TopicEventHandler) error | ||
// AddBulkTopicEventHandler appends provided event handler with its topic along with configuring maxMessagesCount, maxAwaitDurationMs for bulk handling and optional metadata to the service. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: break this comment up so it doesn't wrap/exceed a reasonable character count (~140ish) |
||
AddBulkTopicEventHandler(sub *Subscription, fn TopicEventHandler, maxMessagesCount, maxAwaitDurationMs int32) error | ||
// AddBindingInvocationHandler appends provided binding invocation handler with its name to the service. | ||
AddBindingInvocationHandler(name string, fn BindingInvocationHandler) error | ||
// RegisterActorImplFactory Register a new actor to actor runtime of go sdk | ||
|
sadath-12 marked this conversation as resolved.
Show resolved
Hide resolved
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -37,16 +37,25 @@ func (s *Server) AddTopicEventHandler(sub *common.Subscription, fn common.TopicE | |
return s.topicRegistrar.AddSubscription(sub, fn) | ||
} | ||
|
||
func (s *Server) AddBulkTopicEventHandler(sub *common.Subscription, fn common.TopicEventHandler, maxMessagesCount, maxAwaitDurationMs int32) error { | ||
sadath-12 marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As my previous review - the validation of arguments passed to these parameters should be implemented as per the implementation spec. I do think that this is something we need to address both sdk-side and in the runtime explicitly as part of best practice. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what default values you suggest if nil values are given? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Error-out if either are |
||
if sub == nil { | ||
return errors.New("subscription required") | ||
} | ||
|
||
return s.topicRegistrar.AddBulkSubscription(sub, fn, maxMessagesCount, maxAwaitDurationMs) | ||
} | ||
|
||
// ListTopicSubscriptions is called by Dapr to get the list of topics in a pubsub component the app wants to subscribe to. | ||
func (s *Server) ListTopicSubscriptions(ctx context.Context, in *empty.Empty) (*runtimev1pb.ListTopicSubscriptionsResponse, error) { | ||
subs := make([]*runtimev1pb.TopicSubscription, 0) | ||
for _, v := range s.topicRegistrar { | ||
s := v.Subscription | ||
sub := &runtimev1pb.TopicSubscription{ | ||
PubsubName: s.PubsubName, | ||
Topic: s.Topic, | ||
Metadata: s.Metadata, | ||
Routes: convertRoutes(s.Routes), | ||
PubsubName: s.PubsubName, | ||
Topic: s.Topic, | ||
Metadata: s.Metadata, | ||
Routes: convertRoutes(s.Routes), | ||
BulkSubscribe: convertBulkSubscribe(s.BulkSubscribe), | ||
} | ||
subs = append(subs, sub) | ||
} | ||
|
@@ -73,6 +82,17 @@ func convertRoutes(routes *internal.TopicRoutes) *runtimev1pb.TopicRoutes { | |
} | ||
} | ||
|
||
func convertBulkSubscribe(bulkSubscribe *internal.BulkSubscribe) *runtimev1pb.BulkSubscribeConfig { | ||
if bulkSubscribe == nil { | ||
return nil | ||
} | ||
return &runtimev1pb.BulkSubscribeConfig{ | ||
Enabled: bulkSubscribe.Enabled, | ||
MaxMessagesCount: bulkSubscribe.MaxMessagesCount, | ||
MaxAwaitDurationMs: bulkSubscribe.MaxAwaitDurationMs, | ||
} | ||
} | ||
|
||
// OnTopicEvent fired whenever a message has been published to a topic that has been subscribed. | ||
// Dapr sends published messages in a CloudEvents v1.0 envelope. | ||
func (s *Server) OnTopicEvent(ctx context.Context, in *runtimev1pb.TopicEventRequest) (*runtimev1pb.TopicEventResponse, error) { | ||
|
Original file line number | Diff line number | Diff line change | ||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -42,9 +42,10 @@ const ( | |||||||||||||||||
// topicEventJSON is identical to `common.TopicEvent` | ||||||||||||||||||
// except for it treats `data` as a json.RawMessage so it can | ||||||||||||||||||
// be used as bytes or interface{}. | ||||||||||||||||||
// Merge itemMap into topicEventJSON | ||||||||||||||||||
sadath-12 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||
type topicEventJSON struct { | ||||||||||||||||||
// ID identifies the event. | ||||||||||||||||||
ID string `json:"id"` | ||||||||||||||||||
ID string `json:"id"` // y | ||||||||||||||||||
sadath-12 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||
// The version of the CloudEvents specification. | ||||||||||||||||||
SpecVersion string `json:"specversion"` | ||||||||||||||||||
// The type of event related to the originating occurrence. | ||||||||||||||||||
|
@@ -113,6 +114,29 @@ func (in topicEventJSON) getData() (data any, rawData []byte) { | |||||||||||||||||
return data, rawData | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
type AppResponseStatus string | ||||||||||||||||||
|
||||||||||||||||||
const ( | ||||||||||||||||||
// Success means the message is received and processed correctly. | ||||||||||||||||||
Success AppResponseStatus = "SUCCESS" | ||||||||||||||||||
// Retry means the message is received but could not be processed and must be retried. | ||||||||||||||||||
Retry AppResponseStatus = "RETRY" | ||||||||||||||||||
// Drop means the message is received but should not be processed. | ||||||||||||||||||
Drop AppResponseStatus = "DROP" | ||||||||||||||||||
) | ||||||||||||||||||
Comment on lines
+117
to
+126
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please reuse the type provided Lines 106 to 113 in de720d9
|
||||||||||||||||||
|
||||||||||||||||||
type BulkSubscribeResponseEntry struct { | ||||||||||||||||||
// The id of the bulk subscribe entry | ||||||||||||||||||
EntryId string `json:"entryId"` | ||||||||||||||||||
|
||||||||||||||||||
// The response status of the bulk subscribe entry | ||||||||||||||||||
Status AppResponseStatus `json:"status"` | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
A string value type should be fine |
||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
type BulkSubscribeResponse struct { | ||||||||||||||||||
Statuses []BulkSubscribeResponseEntry `json:"statuses"` | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
func (s *Server) registerBaseHandler() { | ||||||||||||||||||
// register subscribe handler | ||||||||||||||||||
f := func(w http.ResponseWriter, r *http.Request) { | ||||||||||||||||||
|
@@ -301,9 +325,128 @@ func (s *Server) AddTopicEventHandler(sub *common.Subscription, fn common.TopicE | |||||||||||||||||
return nil | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
func (s *Server) AddBulkTopicEventHandler(sub *common.Subscription, fn common.TopicEventHandler, maxMessagesCount, maxAwaitDurationMs int32) error { | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Likewise with the grpc implementation I would like to see validation of the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This exported function should have a comment quickly outlining the use |
||||||||||||||||||
if sub == nil { | ||||||||||||||||||
return errors.New("subscription required") | ||||||||||||||||||
} | ||||||||||||||||||
// Route is only required for HTTP but should be specified for the | ||||||||||||||||||
// app protocol to be interchangeable. | ||||||||||||||||||
if sub.Route == "" { | ||||||||||||||||||
return errors.New("handler route name") | ||||||||||||||||||
sadath-12 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||
} | ||||||||||||||||||
if err := s.topicRegistrar.AddBulkSubscription(sub, fn, maxMessagesCount, maxAwaitDurationMs); err != nil { | ||||||||||||||||||
return err | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
s.mux.Handle(sub.Route, optionsHandler(http.HandlerFunc( | ||||||||||||||||||
func(w http.ResponseWriter, r *http.Request) { | ||||||||||||||||||
// check for post with no data | ||||||||||||||||||
var ( | ||||||||||||||||||
body []byte | ||||||||||||||||||
err error | ||||||||||||||||||
) | ||||||||||||||||||
if r.Body != nil { | ||||||||||||||||||
body, err = io.ReadAll(r.Body) | ||||||||||||||||||
if err != nil { | ||||||||||||||||||
http.Error(w, err.Error(), PubSubHandlerDropStatusCode) | ||||||||||||||||||
return | ||||||||||||||||||
} | ||||||||||||||||||
} | ||||||||||||||||||
if len(body) == 0 { | ||||||||||||||||||
http.Error(w, "nil content", PubSubHandlerDropStatusCode) | ||||||||||||||||||
return | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
// deserialize the event | ||||||||||||||||||
var ins internal.BulkSubscribeEnvelope | ||||||||||||||||||
if err = json.Unmarshal(body, &ins); err != nil { | ||||||||||||||||||
http.Error(w, err.Error(), PubSubHandlerDropStatusCode) | ||||||||||||||||||
return | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
statuses := make([]BulkSubscribeResponseEntry, 0, len(ins.Entries)) | ||||||||||||||||||
|
||||||||||||||||||
for _, entry := range ins.Entries { | ||||||||||||||||||
itemJSON, err := json.Marshal(entry.Event) | ||||||||||||||||||
if err != nil { | ||||||||||||||||||
http.Error(w, err.Error(), PubSubHandlerDropStatusCode) | ||||||||||||||||||
return | ||||||||||||||||||
} | ||||||||||||||||||
var in topicEventJSON | ||||||||||||||||||
|
||||||||||||||||||
if err := json.Unmarshal(itemJSON, &in); err != nil { | ||||||||||||||||||
http.Error(w, err.Error(), PubSubHandlerDropStatusCode) | ||||||||||||||||||
return | ||||||||||||||||||
} | ||||||||||||||||||
if in.PubsubName == "" { | ||||||||||||||||||
in.Topic = sub.PubsubName | ||||||||||||||||||
} | ||||||||||||||||||
if in.Topic == "" { | ||||||||||||||||||
in.Topic = sub.Topic | ||||||||||||||||||
} | ||||||||||||||||||
data, rawData := in.getData() | ||||||||||||||||||
|
||||||||||||||||||
te := common.TopicEvent{ | ||||||||||||||||||
ID: in.ID, | ||||||||||||||||||
SpecVersion: in.SpecVersion, | ||||||||||||||||||
Type: in.Type, | ||||||||||||||||||
Source: in.Source, | ||||||||||||||||||
DataContentType: in.DataContentType, | ||||||||||||||||||
Data: data, | ||||||||||||||||||
RawData: rawData, | ||||||||||||||||||
DataBase64: in.DataBase64, | ||||||||||||||||||
Subject: in.Subject, | ||||||||||||||||||
PubsubName: in.PubsubName, | ||||||||||||||||||
Topic: in.Topic, | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
retry, err := fn(r.Context(), &te) | ||||||||||||||||||
if err == nil { | ||||||||||||||||||
statuses = append(statuses, BulkSubscribeResponseEntry{ | ||||||||||||||||||
EntryId: entry.EntryId, | ||||||||||||||||||
Status: Success, | ||||||||||||||||||
}, | ||||||||||||||||||
) | ||||||||||||||||||
} else if retry { | ||||||||||||||||||
statuses = append(statuses, BulkSubscribeResponseEntry{ | ||||||||||||||||||
EntryId: entry.EntryId, | ||||||||||||||||||
Status: Retry, | ||||||||||||||||||
}, | ||||||||||||||||||
) | ||||||||||||||||||
} else { | ||||||||||||||||||
statuses = append(statuses, BulkSubscribeResponseEntry{ | ||||||||||||||||||
EntryId: entry.EntryId, | ||||||||||||||||||
Status: Drop, | ||||||||||||||||||
}, | ||||||||||||||||||
) | ||||||||||||||||||
} | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
resp := BulkSubscribeResponse{ | ||||||||||||||||||
Statuses: statuses, | ||||||||||||||||||
} | ||||||||||||||||||
if err != nil { | ||||||||||||||||||
http.Error(w, err.Error(), PubSubHandlerDropStatusCode) | ||||||||||||||||||
return | ||||||||||||||||||
} | ||||||||||||||||||
Comment on lines
+429
to
+432
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is no unhandled error at this point, could you clarify that if a single event is dropped it will be replayed/retried at a later date? |
||||||||||||||||||
w.Header().Add("Content-Type", "application/json") | ||||||||||||||||||
w.WriteHeader(http.StatusOK) | ||||||||||||||||||
|
||||||||||||||||||
writeBulkStatus(w, resp) | ||||||||||||||||||
}))) | ||||||||||||||||||
|
||||||||||||||||||
return nil | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
func writeStatus(w http.ResponseWriter, s string) { | ||||||||||||||||||
status := &common.SubscriptionResponse{Status: s} | ||||||||||||||||||
if err := json.NewEncoder(w).Encode(status); err != nil { | ||||||||||||||||||
http.Error(w, err.Error(), PubSubHandlerRetryStatusCode) | ||||||||||||||||||
} | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
func writeBulkStatus(w http.ResponseWriter, s BulkSubscribeResponse) { | ||||||||||||||||||
sadath-12 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||
if err := json.NewEncoder(w).Encode(s); err != nil { | ||||||||||||||||||
http.Error(w, err.Error(), PubSubHandlerRetryStatusCode) | ||||||||||||||||||
} | ||||||||||||||||||
} | ||||||||||||||||||
Comment on lines
+459
to
+463
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's fine to create a new function as it is manifestly different or is idiomatic, do you think this is better or would it be better to pass the slice to your function as an argument and wrap it within the function? |
sadath-12 marked this conversation as resolved.
Show resolved
Hide resolved
|
sadath-12 marked this conversation as resolved.
Show resolved
Hide resolved
|
Uh oh!
There was an error while loading. Please reload this page.