diff --git a/pkg/mapper/interface.go b/pkg/mapper/interface.go index f68fd5e1..e4f8355f 100644 --- a/pkg/mapper/interface.go +++ b/pkg/mapper/interface.go @@ -7,9 +7,13 @@ import ( // Datum contains methods to get the payload information. type Datum interface { + // Value returns the payload of the message. Value() []byte + // EventTime returns the event time of the message. EventTime() time.Time + // Watermark returns the watermark of the message. Watermark() time.Time + // Headers returns the headers of the message. Headers() map[string]string } diff --git a/pkg/mapstreamer/interface.go b/pkg/mapstreamer/interface.go index 07de8773..57487914 100644 --- a/pkg/mapstreamer/interface.go +++ b/pkg/mapstreamer/interface.go @@ -7,9 +7,13 @@ import ( // Datum contains methods to get the payload information. type Datum interface { + // Value returns the payload of the message. Value() []byte + // EventTime returns the event time of the message. EventTime() time.Time + // Watermark returns the watermark of the message. Watermark() time.Time + // Headers returns the headers of the message. Headers() map[string]string } diff --git a/pkg/reducer/interface.go b/pkg/reducer/interface.go index 665d07ae..08d6dd31 100644 --- a/pkg/reducer/interface.go +++ b/pkg/reducer/interface.go @@ -7,9 +7,13 @@ import ( // Datum contains methods to get the payload information. type Datum interface { + // Value returns the payload of the message. Value() []byte + // EventTime returns the event time of the message. EventTime() time.Time + // Watermark returns the watermark of the message. Watermark() time.Time + // Headers returns the headers of the message. Headers() map[string]string } diff --git a/pkg/reducer/task_manager.go b/pkg/reducer/task_manager.go index b5f4656a..a722eba3 100644 --- a/pkg/reducer/task_manager.go +++ b/pkg/reducer/task_manager.go @@ -166,5 +166,10 @@ func generateKey(window *v1.Window, keys []string) string { } func buildDatum(request *v1.ReduceRequest) Datum { - return NewHandlerDatum(request.Payload.GetValue(), request.Payload.EventTime.AsTime(), request.Payload.Watermark.AsTime(), request.Payload.Headers) + return NewHandlerDatum( + request.GetPayload().GetValue(), + request.GetPayload().GetEventTime().AsTime(), + request.GetPayload().GetWatermark().AsTime(), + request.GetPayload().GetHeaders(), + ) } diff --git a/pkg/reducestreamer/interface.go b/pkg/reducestreamer/interface.go index 79c920b7..c0730570 100644 --- a/pkg/reducestreamer/interface.go +++ b/pkg/reducestreamer/interface.go @@ -7,9 +7,13 @@ import ( // Datum contains methods to get the payload information. type Datum interface { + // Value returns the payload of the message. Value() []byte + // EventTime returns the event time of the message. EventTime() time.Time + // Watermark returns the watermark of the message. Watermark() time.Time + // Headers returns the headers of the message. Headers() map[string]string } diff --git a/pkg/reducestreamer/task_manager.go b/pkg/reducestreamer/task_manager.go index e32242f0..ae04f096 100644 --- a/pkg/reducestreamer/task_manager.go +++ b/pkg/reducestreamer/task_manager.go @@ -173,5 +173,10 @@ func generateKey(window *v1.Window, keys []string) string { } func buildDatum(request *v1.ReduceRequest) Datum { - return NewHandlerDatum(request.Payload.GetValue(), request.Payload.EventTime.AsTime(), request.Payload.Watermark.AsTime(), request.Payload.Headers) + return NewHandlerDatum( + request.GetPayload().GetValue(), + request.GetPayload().GetEventTime().AsTime(), + request.GetPayload().GetWatermark().AsTime(), + request.GetPayload().GetHeaders(), + ) } diff --git a/pkg/sessionreducer/interface.go b/pkg/sessionreducer/interface.go index 2b7573e3..7dac50cd 100644 --- a/pkg/sessionreducer/interface.go +++ b/pkg/sessionreducer/interface.go @@ -7,9 +7,13 @@ import ( // Datum contains methods to get the payload information. type Datum interface { + // Value returns the payload of the message. Value() []byte + // EventTime returns the event time of the message. EventTime() time.Time + // Watermark returns the watermark of the message. Watermark() time.Time + // Headers returns the headers of the message. Headers() map[string]string } diff --git a/pkg/sessionreducer/task_manager.go b/pkg/sessionreducer/task_manager.go index fcc78a46..49500eef 100644 --- a/pkg/sessionreducer/task_manager.go +++ b/pkg/sessionreducer/task_manager.go @@ -320,5 +320,10 @@ func generateKey(keyedWindow *v1.KeyedWindow) string { } func buildDatum(payload *v1.SessionReduceRequest_Payload) Datum { - return NewHandlerDatum(payload.GetValue(), payload.EventTime.AsTime(), payload.Watermark.AsTime(), payload.Headers) + return NewHandlerDatum( + payload.GetValue(), + payload.GetEventTime().AsTime(), + payload.GetWatermark().AsTime(), + payload.GetHeaders(), + ) } diff --git a/pkg/sinker/interface.go b/pkg/sinker/interface.go index c5ce3100..9d00a933 100644 --- a/pkg/sinker/interface.go +++ b/pkg/sinker/interface.go @@ -7,11 +7,17 @@ import ( // Datum is the interface of incoming message payload for sink function. type Datum interface { + // Keys returns the keys of the message. Keys() []string + // Value returns the payload of the message. Value() []byte + // EventTime returns the event time of the message. EventTime() time.Time + // Watermark returns the watermark of the message. Watermark() time.Time + // ID returns the ID of the message. ID() string + // Headers returns the headers of the message. Headers() map[string]string } diff --git a/pkg/sourcetransformer/interface.go b/pkg/sourcetransformer/interface.go index 69135903..131e100c 100644 --- a/pkg/sourcetransformer/interface.go +++ b/pkg/sourcetransformer/interface.go @@ -7,9 +7,13 @@ import ( // Datum contains methods to get the payload information. type Datum interface { + // Value returns the payload of the message. Value() []byte + // EventTime returns the event time of the message. EventTime() time.Time + // Watermark returns the watermark of the message. Watermark() time.Time + // Headers returns the headers of the message. Headers() map[string]string }