Skip to content

Commit

Permalink
review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 committed Mar 21, 2024
1 parent 716be4a commit a32faff
Show file tree
Hide file tree
Showing 10 changed files with 48 additions and 3 deletions.
4 changes: 4 additions & 0 deletions pkg/mapper/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,13 @@ import (

// Datum contains methods to get the payload information.
type Datum interface {
// Value returns the payload of the message.
Value() []byte
// EventTime returns the event time of the message.
EventTime() time.Time
// Watermark returns the watermark of the message.
Watermark() time.Time
// Headers returns the headers of the message.
Headers() map[string]string
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/mapstreamer/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,13 @@ import (

// Datum contains methods to get the payload information.
type Datum interface {
// Value returns the payload of the message.
Value() []byte
// EventTime returns the event time of the message.
EventTime() time.Time
// Watermark returns the watermark of the message.
Watermark() time.Time
// Headers returns the headers of the message.
Headers() map[string]string
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/reducer/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,13 @@ import (

// Datum contains methods to get the payload information.
type Datum interface {
// Value returns the payload of the message.
Value() []byte
// EventTime returns the event time of the message.
EventTime() time.Time
// Watermark returns the watermark of the message.
Watermark() time.Time
// Headers returns the headers of the message.
Headers() map[string]string
}

Expand Down
7 changes: 6 additions & 1 deletion pkg/reducer/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,5 +166,10 @@ func generateKey(window *v1.Window, keys []string) string {
}

func buildDatum(request *v1.ReduceRequest) Datum {
return NewHandlerDatum(request.Payload.GetValue(), request.Payload.EventTime.AsTime(), request.Payload.Watermark.AsTime(), request.Payload.Headers)
return NewHandlerDatum(
request.GetPayload().GetValue(),
request.GetPayload().GetEventTime().AsTime(),
request.GetPayload().GetWatermark().AsTime(),
request.GetPayload().GetHeaders(),
)
}
4 changes: 4 additions & 0 deletions pkg/reducestreamer/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,13 @@ import (

// Datum contains methods to get the payload information.
type Datum interface {
// Value returns the payload of the message.
Value() []byte
// EventTime returns the event time of the message.
EventTime() time.Time
// Watermark returns the watermark of the message.
Watermark() time.Time
// Headers returns the headers of the message.
Headers() map[string]string
}

Expand Down
7 changes: 6 additions & 1 deletion pkg/reducestreamer/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,5 +173,10 @@ func generateKey(window *v1.Window, keys []string) string {
}

func buildDatum(request *v1.ReduceRequest) Datum {
return NewHandlerDatum(request.Payload.GetValue(), request.Payload.EventTime.AsTime(), request.Payload.Watermark.AsTime(), request.Payload.Headers)
return NewHandlerDatum(
request.GetPayload().GetValue(),
request.GetPayload().GetEventTime().AsTime(),
request.GetPayload().GetWatermark().AsTime(),
request.GetPayload().GetHeaders(),
)
}
4 changes: 4 additions & 0 deletions pkg/sessionreducer/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,13 @@ import (

// Datum contains methods to get the payload information.
type Datum interface {
// Value returns the payload of the message.
Value() []byte
// EventTime returns the event time of the message.
EventTime() time.Time
// Watermark returns the watermark of the message.
Watermark() time.Time
// Headers returns the headers of the message.
Headers() map[string]string
}

Expand Down
7 changes: 6 additions & 1 deletion pkg/sessionreducer/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,5 +320,10 @@ func generateKey(keyedWindow *v1.KeyedWindow) string {
}

func buildDatum(payload *v1.SessionReduceRequest_Payload) Datum {
return NewHandlerDatum(payload.GetValue(), payload.EventTime.AsTime(), payload.Watermark.AsTime(), payload.Headers)
return NewHandlerDatum(
payload.GetValue(),
payload.GetEventTime().AsTime(),
payload.GetWatermark().AsTime(),
payload.GetHeaders(),
)
}
6 changes: 6 additions & 0 deletions pkg/sinker/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,17 @@ import (

// Datum is the interface of incoming message payload for sink function.
type Datum interface {
// Keys returns the keys of the message.
Keys() []string
// Value returns the payload of the message.
Value() []byte
// EventTime returns the event time of the message.
EventTime() time.Time
// Watermark returns the watermark of the message.
Watermark() time.Time
// ID returns the ID of the message.
ID() string
// Headers returns the headers of the message.
Headers() map[string]string
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/sourcetransformer/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,13 @@ import (

// Datum contains methods to get the payload information.
type Datum interface {
// Value returns the payload of the message.
Value() []byte
// EventTime returns the event time of the message.
EventTime() time.Time
// Watermark returns the watermark of the message.
Watermark() time.Time
// Headers returns the headers of the message.
Headers() map[string]string
}

Expand Down

0 comments on commit a32faff

Please sign in to comment.