Skip to content

Commit

Permalink
Allow pubsub cloudevent override with special metadata (dapr#5783)
Browse files Browse the repository at this point in the history
* allow pubsub cloudevent override with special metadata

Signed-off-by: Bernd Verst <[email protected]>

* simplify cloudevent override unmarshaling

Signed-off-by: Bernd Verst <[email protected]>

* Address linter

Signed-off-by: Bernd Verst <[email protected]>

* Update unit tests

Signed-off-by: Bernd Verst <[email protected]>

* Update E2E test for pubsub metadata override

Signed-off-by: Bernd Verst <[email protected]>

* Adds some test cases

Signed-off-by: Bernd Verst <[email protected]>

* Update Pubsub E2E tests for CE override

Signed-off-by: Bernd Verst <[email protected]>

* Actually override cloudevent for bulk publish

Signed-off-by: Bernd Verst <[email protected]>

* Update e2e test

Signed-off-by: Bernd Verst <[email protected]>

* Undo E2E tests -- not necessary

Signed-off-by: Bernd Verst <[email protected]>

* Verifies cloud event property override

Signed-off-by: Bernd Verst <[email protected]>

* Address linter and add test case

Signed-off-by: Bernd Verst <[email protected]>

* remove comments

Signed-off-by: Bernd Verst <[email protected]>

* fix comments

Signed-off-by: Bernd Verst <[email protected]>

* add more explanation

Signed-off-by: Bernd Verst <[email protected]>

* fix pointer issue

Signed-off-by: Bernd Verst <[email protected]>

---------

Signed-off-by: Bernd Verst <[email protected]>
  • Loading branch information
berndverst authored Jan 27, 2023
1 parent 7d58eb3 commit 75046d6
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 27 deletions.
6 changes: 2 additions & 4 deletions pkg/grpc/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ func (a *api) PublishEvent(ctx context.Context, in *runtimev1pb.PublishEventRequ
TraceID: corID,
TraceState: traceState,
Pubsub: in.PubsubName,
})
}, in.Metadata)
if err != nil {
err = status.Errorf(codes.InvalidArgument, messages.ErrPubsubCloudEventCreation, err.Error())
apiServerLogger.Debug(err)
Expand Down Expand Up @@ -555,8 +555,6 @@ func (a *api) BulkPublishEventAlpha1(ctx context.Context, in *runtimev1pb.BulkPu
corID := diag.SpanContextToW3CString(childSpan.SpanContext())
spanMap[i] = childSpan

var envelope map[string]interface{}

envelope, err := runtimePubsub.NewCloudEvent(&runtimePubsub.CloudEvent{
ID: a.id,
Topic: topic,
Expand All @@ -565,7 +563,7 @@ func (a *api) BulkPublishEventAlpha1(ctx context.Context, in *runtimev1pb.BulkPu
TraceID: corID,
TraceState: traceState,
Pubsub: pubsubName,
})
}, entries[i].Metadata)
if err != nil {
err = status.Errorf(codes.InvalidArgument, messages.ErrPubsubCloudEventCreation, err.Error())
apiServerLogger.Debug(err)
Expand Down
29 changes: 29 additions & 0 deletions pkg/grpc/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1953,6 +1953,19 @@ func TestPublishTopic(t *testing.T) {
assert.Nil(t, err)
})

t.Run("no err: publish event request with topic, pubsub and ce metadata override", func(t *testing.T) {
_, err := client.PublishEvent(context.Background(), &runtimev1pb.PublishEventRequest{
PubsubName: "pubsub",
Topic: "topic",
Metadata: map[string]string{
"cloudevent-source": "unit-test",
"cloudevent-topic": "overridetopic", // noop -- if this modified the envelope the test would fail
"cloudevent-pubsub": "overridepubsub", // noop -- if this modified the envelope the test would fail
},
})
assert.Nil(t, err)
})

t.Run("err: publish event request with error-topic and pubsub", func(t *testing.T) {
_, err := client.PublishEvent(context.Background(), &runtimev1pb.PublishEventRequest{
PubsubName: "pubsub",
Expand Down Expand Up @@ -2130,6 +2143,22 @@ func TestBulkPublish(t *testing.T) {
assert.Nil(t, err)
assert.Empty(t, res.FailedEntries)
})

t.Run("no failures with ce metadata override", func(t *testing.T) {
res, err := client.BulkPublishEventAlpha1(context.Background(), &runtimev1pb.BulkPublishRequest{
PubsubName: "pubsub",
Topic: "topic",
Entries: sampleEntries,
Metadata: map[string]string{
"cloudevent-source": "unit-test",
"cloudevent-topic": "overridetopic", // noop -- if this modified the envelope the test would fail
"cloudevent-pubsub": "overridepubsub", // noop -- if this modified the envelope the test would fail
},
})
assert.Nil(t, err)
assert.Empty(t, res.FailedEntries)
})

t.Run("all failures from component", func(t *testing.T) {
res, err := client.BulkPublishEventAlpha1(context.Background(), &runtimev1pb.BulkPublishRequest{
PubsubName: "pubsub",
Expand Down
10 changes: 4 additions & 6 deletions pkg/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2267,7 +2267,7 @@ func (a *api) onPublish(reqCtx *fasthttp.RequestCtx) {
TraceID: corID,
TraceState: traceState,
Pubsub: pubsubName,
})
}, metadata)
if err != nil {
msg := NewErrorResponse("ERR_PUBSUB_CLOUD_EVENTS_SER",
fmt.Sprintf(messages.ErrPubsubCloudEventCreation, err.Error()))
Expand Down Expand Up @@ -2420,18 +2420,16 @@ func (a *api) onBulkPublish(reqCtx *fasthttp.RequestCtx) {
corID := diag.SpanContextToW3CString(childSpan.SpanContext())
spanMap[i] = childSpan

var envelope map[string]interface{}

envelope, err = runtimePubsub.NewCloudEvent(&runtimePubsub.CloudEvent{
envelope, envelopeErr := runtimePubsub.NewCloudEvent(&runtimePubsub.CloudEvent{
ID: a.id,
Topic: topic,
DataContentType: entries[i].ContentType,
Data: entries[i].Event,
TraceID: corID,
TraceState: traceState,
Pubsub: pubsubName,
})
if err != nil {
}, entries[i].Metadata)
if envelopeErr != nil {
msg := NewErrorResponse("ERR_PUBSUB_CLOUD_EVENTS_SER",
fmt.Sprintf(messages.ErrPubsubCloudEventCreation, err.Error()))
respond(reqCtx, withError(fasthttp.StatusInternalServerError, msg), closeChildSpans)
Expand Down
5 changes: 5 additions & 0 deletions pkg/http/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,11 @@ func TestBulkPubSubEndpoints(t *testing.T) {
"value": "third value",
},
ContentType: "application/json",
Metadata: map[string]string{
"cloudevent-source": "unit-test",
"cloudevent-topic": "overridetopic", // noop -- if this modified the envelope the test would fail
"cloudevent-pubsub": "overridepubsub", // noop -- if this modified the envelope the test would fail
},
},
}

Expand Down
36 changes: 26 additions & 10 deletions pkg/runtime/pubsub/cloudevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,28 +14,44 @@ limitations under the License.
package pubsub

import (
"github.com/google/uuid"
"github.com/mitchellh/mapstructure"

contribContenttype "github.com/dapr/components-contrib/contenttype"
contribPubsub "github.com/dapr/components-contrib/pubsub"
)

// CloudEvent is a request object to create a Dapr compliant cloudevent.
// The cloud event properties can manually be overwritten by using metadata beginning with "cloudevent-" as prefix.
type CloudEvent struct {
ID string
Data []byte
Topic string
Pubsub string
DataContentType string
TraceID string
TraceState string
ID string `mapstructure:"cloudevent-id"`
Data []byte `mapstructure:"-"` // cannot be overridden
Topic string `mapstructure:"-"` // cannot be overridden
Pubsub string `mapstructure:"-"` // cannot be overridden
DataContentType string `mapstructure:"-"` // cannot be overridden
TraceID string `mapstructure:"cloudevent-traceid"`
TraceState string `mapstructure:"cloudevent-tracestate"`
Source string `mapstructure:"cloudevent-source"`
Type string `mapstructure:"cloudevent-type"`
TraceParent string `mapstructure:"cloudevent-traceparent"`
}

// NewCloudEvent encapsulates the creation of a Dapr cloudevent from an existing cloudevent or a raw payload.
func NewCloudEvent(req *CloudEvent) (map[string]interface{}, error) {
func NewCloudEvent(req *CloudEvent, metadata map[string]string) (map[string]interface{}, error) {
if contribContenttype.IsCloudEventContentType(req.DataContentType) {
return contribPubsub.FromCloudEvent(req.Data, req.Topic, req.Pubsub, req.TraceID, req.TraceState)
}
return contribPubsub.NewCloudEventsEnvelope(uuid.New().String(), req.ID, contribPubsub.DefaultCloudEventType,

// certain metadata beginning with "cloudevent-" are considered overrides to the cloudevent envelope
// we ignore any error here as the original cloud event envelope is still valid
_ = mapstructure.WeakDecode(metadata, req) // allows ignoring of case

// the final cloud event envelope contains both "traceid" and "traceparent" set to the same value (req.TraceID)
// eventually "traceid" will be deprecated as it was superseded by "traceparent"
// currently "traceparent" is not set by the pubsub component and can only set by the user via metadata override
// therefore, if an override is set for "traceparent", we use it, otherwise we use the original or overridden "traceid" value
if req.TraceParent != "" {
req.TraceID = req.TraceParent
}
return contribPubsub.NewCloudEventsEnvelope(req.ID, req.Source, req.Type,
"", req.Topic, req.Pubsub, req.DataContentType, req.Data, req.TraceID, req.TraceState), nil
}
55 changes: 48 additions & 7 deletions pkg/runtime/pubsub/cloudevents_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,35 +23,76 @@ import (
func TestNewCloudEvent(t *testing.T) {
t.Run("raw payload", func(t *testing.T) {
ce, err := NewCloudEvent(&CloudEvent{
ID: "a",
ID: "",
Source: "a",
Topic: "b",
Data: []byte("hello"),
Pubsub: "c",
DataContentType: "",
TraceID: "d",
})
Type: "custom-type",
}, map[string]string{})
assert.NoError(t, err)
assert.NotEmpty(t, ce["id"]) // validates that the ID is generated
assert.Equal(t, "a", ce["source"].(string))
assert.Equal(t, "b", ce["topic"].(string))
assert.Equal(t, "hello", ce["data"].(string))
assert.Equal(t, "text/plain", ce["datacontenttype"].(string))
assert.Equal(t, "d", ce["traceid"].(string))
assert.Equal(t, "custom-type", ce["type"].(string))
})

t.Run("raw payload no data", func(t *testing.T) {
ce, err := NewCloudEvent(&CloudEvent{
ID: "a",
ID: "testid",
Source: "", // defaults to "Dapr"
Topic: "b",
Pubsub: "c",
DataContentType: "",
DataContentType: "", // defaults to "text/plain"
TraceID: "d",
})
Type: "", // defaults to "com.dapr.event.sent"
}, map[string]string{})
assert.NoError(t, err)
assert.Equal(t, "a", ce["source"].(string))
assert.Equal(t, "testid", ce["id"].(string))
assert.Equal(t, "Dapr", ce["source"].(string))
assert.Equal(t, "b", ce["topic"].(string))
assert.Empty(t, ce["data"])
assert.Equal(t, "text/plain", ce["datacontenttype"].(string))
assert.Equal(t, "d", ce["traceid"].(string))
assert.Equal(t, "com.dapr.event.sent", ce["type"].(string))
})

t.Run("cloud event metadata override", func(t *testing.T) {
ce, err := NewCloudEvent(&CloudEvent{
Topic: "originaltopic",
Pubsub: "originalpubsub",
DataContentType: "originaldatacontenttype",
Data: []byte("originaldata"),
}, map[string]string{
// these properties should not actually override anything
"cloudevent-topic": "overridetopic",
"cldouevent-pubsub": "overridepubsub",
"cloudevent-data": "overridedata",
"cloudevent-datacontenttype": "overridedatacontenttype",
// these properties should override
"cloudevent-source": "overridesource",
"cloudevent-id": "overrideid",
"cloudevent-type": "overridetype",
"cloudevent-traceparent": "overridetraceparent",
"cloudevent-tracestate": "overridetracestate",
})
assert.NoError(t, err)
assert.Equal(t, "originalpubsub", ce["pubsubname"].(string))
assert.Equal(t, "originaltopic", ce["topic"].(string))
assert.Equal(t, "originaldata", ce["data"].(string))
assert.Equal(t, "originaldatacontenttype", ce["datacontenttype"].(string))
assert.Equal(t, "overridetraceparent", ce["traceid"].(string))
assert.Equal(t, "overridetracestate", ce["tracestate"].(string))
assert.Equal(t, "overridetype", ce["type"].(string))
assert.Equal(t, "overridesource", ce["source"].(string))
assert.Equal(t, "overrideid", ce["id"].(string))
assert.Equal(t, "overridetraceparent", ce["traceparent"].(string))
assert.Equal(t, "overridetracestate", ce["tracestate"].(string))
})

t.Run("custom cloudevent", func(t *testing.T) {
Expand All @@ -69,7 +110,7 @@ func TestNewCloudEvent(t *testing.T) {
Topic: "topic1",
TraceID: "trace1",
Pubsub: "pubsub",
})
}, map[string]string{})
assert.NoError(t, err)
assert.Equal(t, "world", ce["data"].(string))
assert.Equal(t, "text/plain", ce["datacontenttype"].(string))
Expand Down

0 comments on commit 75046d6

Please sign in to comment.