From b8d83744b3f8702f28293789ab10bad1a87dddde Mon Sep 17 00:00:00 2001 From: Yashash H L Date: Thu, 21 Mar 2024 15:19:54 +0530 Subject: [PATCH] source transformer Signed-off-by: Yashash H L --- pkg/sourcetransformer/interface.go | 1 + pkg/sourcetransformer/service.go | 2 +- pkg/sourcetransformer/types.go | 8 +++++++- 3 files changed, 9 insertions(+), 2 deletions(-) diff --git a/pkg/sourcetransformer/interface.go b/pkg/sourcetransformer/interface.go index 4b1fd609..69135903 100644 --- a/pkg/sourcetransformer/interface.go +++ b/pkg/sourcetransformer/interface.go @@ -10,6 +10,7 @@ type Datum interface { Value() []byte EventTime() time.Time Watermark() time.Time + Headers() map[string]string } // SourceTransformer is the interface of SourceTransformer function implementation. diff --git a/pkg/sourcetransformer/service.go b/pkg/sourcetransformer/service.go index 327c7fdd..004e13dc 100644 --- a/pkg/sourcetransformer/service.go +++ b/pkg/sourcetransformer/service.go @@ -32,7 +32,7 @@ func (fs *Service) IsReady(context.Context, *emptypb.Empty) (*v1.ReadyResponse, // In addition to map function, SourceTransformFn also supports assigning a new event time to response. // SourceTransformFn can be used only at source vertex by source data transformer. func (fs *Service) SourceTransformFn(ctx context.Context, d *v1.SourceTransformRequest) (*v1.SourceTransformResponse, error) { - var hd = NewHandlerDatum(d.GetValue(), d.EventTime.AsTime(), d.Watermark.AsTime()) + var hd = NewHandlerDatum(d.GetValue(), d.EventTime.AsTime(), d.Watermark.AsTime(), d.Headers) messageTs := fs.Transformer.Transform(ctx, d.GetKeys(), hd) var results []*v1.SourceTransformResponse_Result for _, m := range messageTs.Items() { diff --git a/pkg/sourcetransformer/types.go b/pkg/sourcetransformer/types.go index 472d0f26..ad1a2290 100644 --- a/pkg/sourcetransformer/types.go +++ b/pkg/sourcetransformer/types.go @@ -9,13 +9,15 @@ type handlerDatum struct { value []byte eventTime time.Time watermark time.Time + headers map[string]string } -func NewHandlerDatum(value []byte, eventTime time.Time, watermark time.Time) Datum { +func NewHandlerDatum(value []byte, eventTime time.Time, watermark time.Time, headers map[string]string) Datum { return &handlerDatum{ value: value, eventTime: eventTime, watermark: watermark, + headers: headers, } } @@ -30,3 +32,7 @@ func (h *handlerDatum) EventTime() time.Time { func (h *handlerDatum) Watermark() time.Time { return h.watermark } + +func (h *handlerDatum) Headers() map[string]string { + return h.headers +}