Skip to content

Commit

Permalink
fix: require specifying event time when dropping a message using sour…
Browse files Browse the repository at this point in the history
…ce transformer (#90)

Signed-off-by: Keran Yang <[email protected]>
  • Loading branch information
KeranYang authored Dec 4, 2023
1 parent ab523a9 commit c6d81fd
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"github.com/numaproj/numaflow-go/pkg/sourcetransformer"
)

func FilterEventTime(keys []string, d sourcetransformer.Datum) sourcetransformer.Messages {
func FilterEventTime(_ []string, d sourcetransformer.Datum) sourcetransformer.Messages {
janFirst2022 := time.Date(2022, 1, 1, 0, 0, 0, 0, time.UTC)
janFirst2023 := time.Date(2023, 1, 1, 0, 0, 0, 0, time.UTC)
if d.EventTime().Before(janFirst2022) {
Expand Down
15 changes: 6 additions & 9 deletions pkg/sourcetransformer/messaget.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,7 @@ import (
"time"
)

var (
DROP = fmt.Sprintf("%U__DROP__", '\\') // U+005C__DROP__
// Watermark are at millisecond granularity, hence we use epoch(0) - 1 to indicate watermark is not available.
// eventTimeForDrop is used to indicate that the message is dropped hence, excluded from watermark calculation
eventTimeForDrop = time.Unix(0, -int64(time.Millisecond))
)
var DROP = fmt.Sprintf("%U__DROP__", '\\') // U+005C__DROP__

// Message is used to wrap the data return by SourceTransformer functions.
// Compared with Message of other UDFs, source transformer Message contains one more field,
Expand Down Expand Up @@ -60,9 +55,11 @@ func (m Message) Tags() []string {
return m.tags
}

// MessageToDrop creates a Message to be dropped
func MessageToDrop() Message {
return Message{eventTime: eventTimeForDrop, value: []byte{}, tags: []string{DROP}}
// MessageToDrop creates a Message to be dropped with eventTime.
// eventTime is required because, even though a message is dropped, it is still considered as being processed,
// hence the watermark should be updated accordingly using the provided event time.
func MessageToDrop(eventTime time.Time) Message {
return Message{eventTime: eventTime, value: []byte{}, tags: []string{DROP}}
}

type Messages []Message
Expand Down
4 changes: 2 additions & 2 deletions pkg/sourcetransformer/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestService_sourceTransformFn(t *testing.T) {
{
name: "sourceTransform_fn_forward_msg_drop_msg",
handler: SourceTransformFunc(func(ctx context.Context, keys []string, datum Datum) Messages {
return MessagesBuilder().Append(MessageToDrop())
return MessagesBuilder().Append(MessageToDrop(testTime))
}),
args: args{
ctx: context.Background(),
Expand All @@ -93,7 +93,7 @@ func TestService_sourceTransformFn(t *testing.T) {
want: &stpb.SourceTransformResponse{
Results: []*stpb.SourceTransformResponse_Result{
{
EventTime: timestamppb.New(eventTimeForDrop),
EventTime: timestamppb.New(testTime),
Tags: []string{DROP},
Value: []byte{},
},
Expand Down

0 comments on commit c6d81fd

Please sign in to comment.