Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: bulksubscribe http #478

Open
wants to merge 29 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions examples/pubsub/sub/sub.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,14 @@ var importantSubscription = &common.Subscription{
func main() {
s := daprd.NewService(":8080")

// for single event subscribing
if err := s.AddTopicEventHandler(defaultSubscription, eventHandler); err != nil {
log.Fatalf("error adding topic subscription: %v", err)
}

if err := s.AddTopicEventHandler(importantSubscription, importantEventHandler); err != nil {
sadath-12 marked this conversation as resolved.
Show resolved Hide resolved
log.Fatalf("error adding topic subscription: %v", err)
}
// if err := s.AddBulkTopicEventHandler(defaultSubscription, eventHandler,10,100); err != nil {
sadath-12 marked this conversation as resolved.
Show resolved Hide resolved
// log.Fatalf("error adding topic subscription: %v", err)
// }
sadath-12 marked this conversation as resolved.
Show resolved Hide resolved

if err := s.Start(); err != nil && err != http.ErrServerClosed {
log.Fatalf("error listenning: %v", err)
Expand All @@ -64,6 +65,13 @@ func eventHandler(ctx context.Context, e *common.TopicEvent) (retry bool, err er
return false, nil
}

func bulkeventHandler(ctx context.Context, e []common.TopicEvent) (retry bool, err error) {
for _, event := range e {
log.Printf("event - PubsubName: %s, Topic: %s, ID: %s, Data: %s", event.PubsubName, event.Topic, event.ID, event.Data)
}
return false, nil
}

sadath-12 marked this conversation as resolved.
Show resolved Hide resolved
func importantEventHandler(ctx context.Context, e *common.TopicEvent) (retry bool, err error) {
log.Printf("important event - PubsubName: %s, Topic: %s, ID: %s, Data: %s", e.PubsubName, e.Topic, e.ID, e.Data)
return false, nil
Expand Down
2 changes: 2 additions & 0 deletions service/common/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ type Service interface {
// AddTopicEventHandler appends provided event handler with its topic and optional metadata to the service.
// Note, retries are only considered when there is an error. Lack of error is considered as a success
AddTopicEventHandler(sub *Subscription, fn TopicEventHandler) error
// AddBulkTopicEventHandler appends provided event handler with its topic along with configuring maxMessagesCount, maxAwaitDurationMs for bulk handling and optional metadata to the service.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: break this comment up so it doesn't wrap/exceed a reasonable character count (~140ish)

AddBulkTopicEventHandler(sub *Subscription, fn TopicEventHandler, maxMessagesCount, maxAwaitDurationMs int32) error
// AddBindingInvocationHandler appends provided binding invocation handler with its name to the service.
AddBindingInvocationHandler(name string, fn BindingInvocationHandler) error
// RegisterActorImplFactory Register a new actor to actor runtime of go sdk
Expand Down
28 changes: 24 additions & 4 deletions service/grpc/topic.go
sadath-12 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,25 @@ func (s *Server) AddTopicEventHandler(sub *common.Subscription, fn common.TopicE
return s.topicRegistrar.AddSubscription(sub, fn)
}

func (s *Server) AddBulkTopicEventHandler(sub *common.Subscription, fn common.TopicEventHandler, maxMessagesCount, maxAwaitDurationMs int32) error {
sadath-12 marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As my previous review - the validation of arguments passed to these parameters should be implemented as per the implementation spec. I do think that this is something we need to address both sdk-side and in the runtime explicitly as part of best practice.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what default values you suggest if nil values are given?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error-out if either are X <= 0 this would prevent a negative value, a nil value is not possible for the int type.

if sub == nil {
return errors.New("subscription required")
}

return s.topicRegistrar.AddBulkSubscription(sub, fn, maxMessagesCount, maxAwaitDurationMs)
}

// ListTopicSubscriptions is called by Dapr to get the list of topics in a pubsub component the app wants to subscribe to.
func (s *Server) ListTopicSubscriptions(ctx context.Context, in *empty.Empty) (*runtimev1pb.ListTopicSubscriptionsResponse, error) {
subs := make([]*runtimev1pb.TopicSubscription, 0)
for _, v := range s.topicRegistrar {
s := v.Subscription
sub := &runtimev1pb.TopicSubscription{
PubsubName: s.PubsubName,
Topic: s.Topic,
Metadata: s.Metadata,
Routes: convertRoutes(s.Routes),
PubsubName: s.PubsubName,
Topic: s.Topic,
Metadata: s.Metadata,
Routes: convertRoutes(s.Routes),
BulkSubscribe: convertBulkSubscribe(s.BulkSubscribe),
}
subs = append(subs, sub)
}
Expand All @@ -73,6 +82,17 @@ func convertRoutes(routes *internal.TopicRoutes) *runtimev1pb.TopicRoutes {
}
}

func convertBulkSubscribe(bulkSubscribe *internal.BulkSubscribe) *runtimev1pb.BulkSubscribeConfig {
if bulkSubscribe == nil {
return nil
}
return &runtimev1pb.BulkSubscribeConfig{
Enabled: bulkSubscribe.Enabled,
MaxMessagesCount: bulkSubscribe.MaxMessagesCount,
MaxAwaitDurationMs: bulkSubscribe.MaxAwaitDurationMs,
}
}

// OnTopicEvent fired whenever a message has been published to a topic that has been subscribed.
// Dapr sends published messages in a CloudEvents v1.0 envelope.
func (s *Server) OnTopicEvent(ctx context.Context, in *runtimev1pb.TopicEventRequest) (*runtimev1pb.TopicEventResponse, error) {
Expand Down
145 changes: 144 additions & 1 deletion service/http/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ const (
// topicEventJSON is identical to `common.TopicEvent`
// except for it treats `data` as a json.RawMessage so it can
// be used as bytes or interface{}.
// Merge itemMap into topicEventJSON
sadath-12 marked this conversation as resolved.
Show resolved Hide resolved
type topicEventJSON struct {
// ID identifies the event.
ID string `json:"id"`
ID string `json:"id"` // y
sadath-12 marked this conversation as resolved.
Show resolved Hide resolved
// The version of the CloudEvents specification.
SpecVersion string `json:"specversion"`
// The type of event related to the originating occurrence.
Expand Down Expand Up @@ -113,6 +114,29 @@ func (in topicEventJSON) getData() (data any, rawData []byte) {
return data, rawData
}

type AppResponseStatus string

const (
// Success means the message is received and processed correctly.
Success AppResponseStatus = "SUCCESS"
// Retry means the message is received but could not be processed and must be retried.
Retry AppResponseStatus = "RETRY"
// Drop means the message is received but should not be processed.
Drop AppResponseStatus = "DROP"
)
Comment on lines +117 to +126
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please reuse the type provided

const (
// SubscriptionResponseStatusSuccess means message is processed successfully.
SubscriptionResponseStatusSuccess = "SUCCESS"
// SubscriptionResponseStatusRetry means message to be retried by Dapr.
SubscriptionResponseStatusRetry = "RETRY"
// SubscriptionResponseStatusDrop means warning is logged and message is dropped.
SubscriptionResponseStatusDrop = "DROP"
)


type BulkSubscribeResponseEntry struct {
// The id of the bulk subscribe entry
EntryId string `json:"entryId"`

// The response status of the bulk subscribe entry
Status AppResponseStatus `json:"status"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Status AppResponseStatus `json:"status"`
Status string `json:"status"`

A string value type should be fine

}

type BulkSubscribeResponse struct {
Statuses []BulkSubscribeResponseEntry `json:"statuses"`
}

func (s *Server) registerBaseHandler() {
// register subscribe handler
f := func(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -301,9 +325,128 @@ func (s *Server) AddTopicEventHandler(sub *common.Subscription, fn common.TopicE
return nil
}

func (s *Server) AddBulkTopicEventHandler(sub *common.Subscription, fn common.TopicEventHandler, maxMessagesCount, maxAwaitDurationMs int32) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise with the grpc implementation I would like to see validation of the maxMessagesCount and maxAwaitDurationMs params

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This exported function should have a comment quickly outlining the use

if sub == nil {
return errors.New("subscription required")
}
// Route is only required for HTTP but should be specified for the
// app protocol to be interchangeable.
if sub.Route == "" {
return errors.New("handler route name")
sadath-12 marked this conversation as resolved.
Show resolved Hide resolved
}
if err := s.topicRegistrar.AddBulkSubscription(sub, fn, maxMessagesCount, maxAwaitDurationMs); err != nil {
return err
}

s.mux.Handle(sub.Route, optionsHandler(http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
// check for post with no data
var (
body []byte
err error
)
if r.Body != nil {
body, err = io.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), PubSubHandlerDropStatusCode)
return
}
}
if len(body) == 0 {
http.Error(w, "nil content", PubSubHandlerDropStatusCode)
return
}

// deserialize the event
var ins internal.BulkSubscribeEnvelope
if err = json.Unmarshal(body, &ins); err != nil {
http.Error(w, err.Error(), PubSubHandlerDropStatusCode)
return
}

statuses := make([]BulkSubscribeResponseEntry, 0, len(ins.Entries))

for _, entry := range ins.Entries {
itemJSON, err := json.Marshal(entry.Event)
if err != nil {
http.Error(w, err.Error(), PubSubHandlerDropStatusCode)
return
}
var in topicEventJSON

if err := json.Unmarshal(itemJSON, &in); err != nil {
http.Error(w, err.Error(), PubSubHandlerDropStatusCode)
return
}
if in.PubsubName == "" {
in.Topic = sub.PubsubName
}
if in.Topic == "" {
in.Topic = sub.Topic
}
data, rawData := in.getData()

te := common.TopicEvent{
ID: in.ID,
SpecVersion: in.SpecVersion,
Type: in.Type,
Source: in.Source,
DataContentType: in.DataContentType,
Data: data,
RawData: rawData,
DataBase64: in.DataBase64,
Subject: in.Subject,
PubsubName: in.PubsubName,
Topic: in.Topic,
}

retry, err := fn(r.Context(), &te)
if err == nil {
statuses = append(statuses, BulkSubscribeResponseEntry{
EntryId: entry.EntryId,
Status: Success,
},
)
} else if retry {
statuses = append(statuses, BulkSubscribeResponseEntry{
EntryId: entry.EntryId,
Status: Retry,
},
)
} else {
statuses = append(statuses, BulkSubscribeResponseEntry{
EntryId: entry.EntryId,
Status: Drop,
},
)
}
}

resp := BulkSubscribeResponse{
Statuses: statuses,
}
if err != nil {
http.Error(w, err.Error(), PubSubHandlerDropStatusCode)
return
}
Comment on lines +429 to +432
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no unhandled error at this point, could you clarify that if a single event is dropped it will be replayed/retried at a later date?

w.Header().Add("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)

writeBulkStatus(w, resp)
})))

return nil
}

func writeStatus(w http.ResponseWriter, s string) {
status := &common.SubscriptionResponse{Status: s}
if err := json.NewEncoder(w).Encode(status); err != nil {
http.Error(w, err.Error(), PubSubHandlerRetryStatusCode)
}
}

func writeBulkStatus(w http.ResponseWriter, s BulkSubscribeResponse) {
sadath-12 marked this conversation as resolved.
Show resolved Hide resolved
if err := json.NewEncoder(w).Encode(s); err != nil {
http.Error(w, err.Error(), PubSubHandlerRetryStatusCode)
}
}
Comment on lines +459 to +463
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's fine to create a new function as it is manifestly different or is idiomatic, do you think this is better or would it be better to pass the slice to your function as an argument and wrap it within the function?

54 changes: 51 additions & 3 deletions service/internal/topicregistrar.go
sadath-12 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ type TopicRegistrar map[string]*TopicRegistration

// TopicRegistration encapsulates the subscription and handlers.
type TopicRegistration struct {
Subscription *TopicSubscription
DefaultHandler common.TopicEventHandler
RouteHandlers map[string]common.TopicEventHandler
Subscription *TopicSubscription
DefaultHandler common.TopicEventHandler
RouteHandlers map[string]common.TopicEventHandler
}

func (m TopicRegistrar) AddSubscription(sub *common.Subscription, fn common.TopicEventHandler) error {
Expand Down Expand Up @@ -62,3 +62,51 @@ func (m TopicRegistrar) AddSubscription(sub *common.Subscription, fn common.Topi

return nil
}

func (m TopicRegistrar) AddBulkSubscription(sub *common.Subscription, fn common.TopicEventHandler, maxMessagesCount, maxAwaitDurationMs int32) error {
if sub.Topic == "" {
return errors.New("topic name required")
}
if sub.PubsubName == "" {
return errors.New("pub/sub name required")
}
if fn == nil {
return fmt.Errorf("topic handler required")
sadath-12 marked this conversation as resolved.
Show resolved Hide resolved
}

var key string
if !sub.DisableTopicValidation {
key = sub.PubsubName + "-" + sub.Topic
} else {
key = sub.PubsubName
}

ts, ok := m[key]
if !ok {
ts = &TopicRegistration{
Subscription: NewTopicSubscription(sub.PubsubName, sub.Topic),
RouteHandlers: make(map[string]common.TopicEventHandler),
DefaultHandler: nil,
}
ts.Subscription.SetMetadata(sub.Metadata)
m[key] = ts
}

ts.Subscription.SetBulkSubscribe(maxMessagesCount, maxAwaitDurationMs)

if sub.Match != "" {
if err := ts.Subscription.AddRoutingRule(sub.Route, sub.Match, sub.Priority); err != nil {
return err
}
} else {
if err := ts.Subscription.SetDefaultRoute(sub.Route); err != nil {
return err
}

ts.DefaultHandler = fn
}

ts.RouteHandlers[sub.Route] = fn

return nil
}
38 changes: 38 additions & 0 deletions service/internal/topicsubscription.go
sadath-12 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,31 @@ type TopicSubscription struct {
Routes *TopicRoutes `json:"routes,omitempty"`
// Metadata is the subscription metadata.
Metadata map[string]string `json:"metadata,omitempty"`
// bulksubsribe
BulkSubscribe *BulkSubscribe `json:"bulkSubscribe,omitempty"`
sadath-12 marked this conversation as resolved.
Show resolved Hide resolved
}

type BulkSubscribeMessageItem struct {
EntryId string `json:"entryId"` //nolint:stylecheck
Event interface{} `json:"event"`
Metadata map[string]string `json:"metadata"`
ContentType string `json:"contentType,omitempty"`
}

type BulkSubscribeEnvelope struct {
ID string
Entries []BulkSubscribeMessageItem
Metadata map[string]string
Topic string
Pubsub string
EventType string
}


type BulkSubscribe struct {
Enabled bool `json:"enabled"`
MaxMessagesCount int32 `json:"maxMessagesCount,omitempty"`
MaxAwaitDurationMs int32 `json:"maxAwaitDurationMs,omitempty"`
}

// TopicRoutes encapsulates the default route and multiple routing rules.
Expand Down Expand Up @@ -60,6 +85,19 @@ func (s *TopicSubscription) SetMetadata(metadata map[string]string) error {
return nil
}

func (s *TopicSubscription) SetBulkSubscribe(maxMessagesCount,maxAwaitDurationMs int32) error {
if s.BulkSubscribe != nil {
return fmt.Errorf("subscription for topic %s on pubsub %s already has bulkSubscribe set", s.Topic, s.PubsubName)
}
s.BulkSubscribe = &BulkSubscribe{
Enabled: true,
MaxMessagesCount: maxMessagesCount,
MaxAwaitDurationMs: maxAwaitDurationMs,
}

return nil
}

// SetDefaultRoute sets the default route if not already set.
// An error is returned if it is already set.
func (s *TopicSubscription) SetDefaultRoute(path string) error {
Expand Down