Skip to content

Commit

Permalink
source transformer
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 committed Mar 21, 2024
1 parent bf841dd commit b8d8374
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 2 deletions.
1 change: 1 addition & 0 deletions pkg/sourcetransformer/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ type Datum interface {
Value() []byte
EventTime() time.Time
Watermark() time.Time
Headers() map[string]string
}

// SourceTransformer is the interface of SourceTransformer function implementation.
Expand Down
2 changes: 1 addition & 1 deletion pkg/sourcetransformer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (fs *Service) IsReady(context.Context, *emptypb.Empty) (*v1.ReadyResponse,
// In addition to map function, SourceTransformFn also supports assigning a new event time to response.
// SourceTransformFn can be used only at source vertex by source data transformer.
func (fs *Service) SourceTransformFn(ctx context.Context, d *v1.SourceTransformRequest) (*v1.SourceTransformResponse, error) {
var hd = NewHandlerDatum(d.GetValue(), d.EventTime.AsTime(), d.Watermark.AsTime())
var hd = NewHandlerDatum(d.GetValue(), d.EventTime.AsTime(), d.Watermark.AsTime(), d.Headers)
messageTs := fs.Transformer.Transform(ctx, d.GetKeys(), hd)
var results []*v1.SourceTransformResponse_Result
for _, m := range messageTs.Items() {
Expand Down
8 changes: 7 additions & 1 deletion pkg/sourcetransformer/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ type handlerDatum struct {
value []byte
eventTime time.Time
watermark time.Time
headers map[string]string
}

func NewHandlerDatum(value []byte, eventTime time.Time, watermark time.Time) Datum {
func NewHandlerDatum(value []byte, eventTime time.Time, watermark time.Time, headers map[string]string) Datum {
return &handlerDatum{
value: value,
eventTime: eventTime,
watermark: watermark,
headers: headers,
}
}

Expand All @@ -30,3 +32,7 @@ func (h *handlerDatum) EventTime() time.Time {
func (h *handlerDatum) Watermark() time.Time {
return h.watermark
}

func (h *handlerDatum) Headers() map[string]string {
return h.headers
}

0 comments on commit b8d8374

Please sign in to comment.