diff --git a/pkg/sessionreducer/service_test.go b/pkg/sessionreducer/service_test.go index bcce002d..b2dcab35 100644 --- a/pkg/sessionreducer/service_test.go +++ b/pkg/sessionreducer/service_test.go @@ -770,6 +770,674 @@ func TestService_SessionReduceFn(t *testing.T) { }, expectedErr: false, }, + { + name: "open_merge_append_close", + handler: &SessionSumCreator{}, + input: []*sessionreducepb.SessionReduceRequest{ + { + Payload: &sessionreducepb.SessionReduceRequest_Payload{ + Keys: []string{"client1"}, + Value: []byte(strconv.Itoa(10)), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_OPEN, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(70000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + }, + }, + }, + { + Payload: &sessionreducepb.SessionReduceRequest_Payload{ + Keys: []string{"client2"}, + Value: []byte(strconv.Itoa(20)), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_OPEN, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(70000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + }, + }, + }, + { + Payload: &sessionreducepb.SessionReduceRequest_Payload{ + Keys: []string{"client1"}, + Value: []byte(strconv.Itoa(10)), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_OPEN, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(75000)), + End: timestamppb.New(time.UnixMilli(85000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + }, + }, + }, + { + Payload: &sessionreducepb.SessionReduceRequest_Payload{ + Keys: []string{"client2"}, + Value: []byte(strconv.Itoa(20)), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_OPEN, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(78000)), + End: timestamppb.New(time.UnixMilli(88000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + }, + }, + }, + { + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_MERGE, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(70000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + { + Start: timestamppb.New(time.UnixMilli(75000)), + End: timestamppb.New(time.UnixMilli(85000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + }, + }, + }, + { + Payload: &sessionreducepb.SessionReduceRequest_Payload{ + Keys: []string{"client1"}, + Value: []byte(strconv.Itoa(10)), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_APPEND, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(85000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + }, + }, + }, + { + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_MERGE, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(70000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + { + Start: timestamppb.New(time.UnixMilli(78000)), + End: timestamppb.New(time.UnixMilli(88000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + }, + }, + }, + { + Payload: &sessionreducepb.SessionReduceRequest_Payload{ + Keys: []string{"client2"}, + Value: []byte(strconv.Itoa(10)), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_APPEND, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(88000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + }, + }, + }, + { + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_CLOSE, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(85000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(88000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + }, + }, + }, + }, + expected: []*sessionreducepb.SessionReduceResponse{ + { + Result: &sessionreducepb.SessionReduceResponse_Result{ + Keys: []string{"client1_test"}, + Value: []byte(strconv.Itoa(30)), + }, + KeyedWindow: &sessionreducepb.KeyedWindow{ + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(85000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + EOF: false, + }, + { + Result: &sessionreducepb.SessionReduceResponse_Result{ + Keys: []string{"client2_test"}, + Value: []byte(strconv.Itoa(50)), + }, + KeyedWindow: &sessionreducepb.KeyedWindow{ + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(88000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + EOF: false, + }, + }, + expectedErr: false, + }, + { + name: "open_merge_expand_close", + handler: &SessionSumCreator{}, + input: []*sessionreducepb.SessionReduceRequest{ + { + Payload: &sessionreducepb.SessionReduceRequest_Payload{ + Keys: []string{"client1"}, + Value: []byte(strconv.Itoa(10)), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_OPEN, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(70000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + }, + }, + }, + { + Payload: &sessionreducepb.SessionReduceRequest_Payload{ + Keys: []string{"client2"}, + Value: []byte(strconv.Itoa(20)), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_OPEN, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(70000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + }, + }, + }, + { + Payload: &sessionreducepb.SessionReduceRequest_Payload{ + Keys: []string{"client1"}, + Value: []byte(strconv.Itoa(10)), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_OPEN, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(75000)), + End: timestamppb.New(time.UnixMilli(85000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + }, + }, + }, + { + Payload: &sessionreducepb.SessionReduceRequest_Payload{ + Keys: []string{"client2"}, + Value: []byte(strconv.Itoa(20)), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_OPEN, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(78000)), + End: timestamppb.New(time.UnixMilli(88000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + }, + }, + }, + { + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_MERGE, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(70000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + { + Start: timestamppb.New(time.UnixMilli(75000)), + End: timestamppb.New(time.UnixMilli(85000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + }, + }, + }, + { + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_MERGE, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(70000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + { + Start: timestamppb.New(time.UnixMilli(78000)), + End: timestamppb.New(time.UnixMilli(88000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + }, + }, + }, + { + Payload: &sessionreducepb.SessionReduceRequest_Payload{ + Keys: []string{"client1"}, + Value: []byte(strconv.Itoa(10)), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_EXPAND, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(85000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(95000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + }, + }, + }, + { + Payload: &sessionreducepb.SessionReduceRequest_Payload{ + Keys: []string{"client1"}, + Value: []byte(strconv.Itoa(10)), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_EXPAND, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(88000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(98000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + }, + }, + }, + { + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_CLOSE, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(95000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(98000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + }, + }, + }, + }, + expected: []*sessionreducepb.SessionReduceResponse{ + { + Result: &sessionreducepb.SessionReduceResponse_Result{ + Keys: []string{"client1_test"}, + Value: []byte(strconv.Itoa(30)), + }, + KeyedWindow: &sessionreducepb.KeyedWindow{ + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(95000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + EOF: false, + }, + { + Result: &sessionreducepb.SessionReduceResponse_Result{ + Keys: []string{"client2_test"}, + Value: []byte(strconv.Itoa(50)), + }, + KeyedWindow: &sessionreducepb.KeyedWindow{ + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(98000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + EOF: false, + }, + }, + expectedErr: false, + }, + { + name: "open_merge_merge_close", + handler: &SessionSumCreator{}, + input: []*sessionreducepb.SessionReduceRequest{ + { + Payload: &sessionreducepb.SessionReduceRequest_Payload{ + Keys: []string{"client1"}, + Value: []byte(strconv.Itoa(10)), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_OPEN, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(70000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + }, + }, + }, + { + Payload: &sessionreducepb.SessionReduceRequest_Payload{ + Keys: []string{"client2"}, + Value: []byte(strconv.Itoa(20)), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_OPEN, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(70000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + }, + }, + }, + { + Payload: &sessionreducepb.SessionReduceRequest_Payload{ + Keys: []string{"client1"}, + Value: []byte(strconv.Itoa(10)), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_OPEN, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(75000)), + End: timestamppb.New(time.UnixMilli(85000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + }, + }, + }, + { + Payload: &sessionreducepb.SessionReduceRequest_Payload{ + Keys: []string{"client2"}, + Value: []byte(strconv.Itoa(20)), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_OPEN, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(78000)), + End: timestamppb.New(time.UnixMilli(88000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + }, + }, + }, + { + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_MERGE, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(70000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + { + Start: timestamppb.New(time.UnixMilli(75000)), + End: timestamppb.New(time.UnixMilli(85000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + }, + }, + }, + { + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_MERGE, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(70000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + { + Start: timestamppb.New(time.UnixMilli(78000)), + End: timestamppb.New(time.UnixMilli(88000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + }, + }, + }, + { + Payload: &sessionreducepb.SessionReduceRequest_Payload{ + Keys: []string{"client1"}, + Value: []byte(strconv.Itoa(10)), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_OPEN, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(50000)), + End: timestamppb.New(time.UnixMilli(80000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + }, + }, + }, + { + Payload: &sessionreducepb.SessionReduceRequest_Payload{ + Keys: []string{"client2"}, + Value: []byte(strconv.Itoa(10)), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_OPEN, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(50000)), + End: timestamppb.New(time.UnixMilli(80000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + }, + }, + }, + { + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_MERGE, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(85000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + { + Start: timestamppb.New(time.UnixMilli(50000)), + End: timestamppb.New(time.UnixMilli(80000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + }, + }, + }, + { + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_MERGE, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(88000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + { + Start: timestamppb.New(time.UnixMilli(50000)), + End: timestamppb.New(time.UnixMilli(80000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + }, + }, + }, + { + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_CLOSE, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(50000)), + End: timestamppb.New(time.UnixMilli(85000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + { + Start: timestamppb.New(time.UnixMilli(50000)), + End: timestamppb.New(time.UnixMilli(88000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + }, + }, + }, + }, + expected: []*sessionreducepb.SessionReduceResponse{ + { + Result: &sessionreducepb.SessionReduceResponse_Result{ + Keys: []string{"client1_test"}, + Value: []byte(strconv.Itoa(30)), + }, + KeyedWindow: &sessionreducepb.KeyedWindow{ + Start: timestamppb.New(time.UnixMilli(50000)), + End: timestamppb.New(time.UnixMilli(85000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + EOF: false, + }, + { + Result: &sessionreducepb.SessionReduceResponse_Result{ + Keys: []string{"client2_test"}, + Value: []byte(strconv.Itoa(50)), + }, + KeyedWindow: &sessionreducepb.KeyedWindow{ + Start: timestamppb.New(time.UnixMilli(50000)), + End: timestamppb.New(time.UnixMilli(88000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + EOF: false, + }, + }, + expectedErr: false, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) {