From cf708aeb9cac38c054d404e42f55e12f6b069761 Mon Sep 17 00:00:00 2001 From: Keran Yang Date: Wed, 14 Feb 2024 13:47:02 -0500 Subject: [PATCH] chore: unit test merge operation as a blocking call This change adds some unit tests covering cases when a merge operation is followed by non-close operations, verifying the blocking behavior of merge. Signed-off-by: Keran Yang --- pkg/sessionreducer/service_test.go | 668 +++++++++++++++++++++++++++++ 1 file changed, 668 insertions(+) diff --git a/pkg/sessionreducer/service_test.go b/pkg/sessionreducer/service_test.go index bcce002d..b2dcab35 100644 --- a/pkg/sessionreducer/service_test.go +++ b/pkg/sessionreducer/service_test.go @@ -770,6 +770,674 @@ func TestService_SessionReduceFn(t *testing.T) { }, expectedErr: false, }, + { + name: "open_merge_append_close", + handler: &SessionSumCreator{}, + input: []*sessionreducepb.SessionReduceRequest{ + { + Payload: &sessionreducepb.SessionReduceRequest_Payload{ + Keys: []string{"client1"}, + Value: []byte(strconv.Itoa(10)), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_OPEN, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(70000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + }, + }, + }, + { + Payload: &sessionreducepb.SessionReduceRequest_Payload{ + Keys: []string{"client2"}, + Value: []byte(strconv.Itoa(20)), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_OPEN, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(70000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + }, + }, + }, + { + Payload: &sessionreducepb.SessionReduceRequest_Payload{ + Keys: []string{"client1"}, + Value: []byte(strconv.Itoa(10)), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_OPEN, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(75000)), + End: timestamppb.New(time.UnixMilli(85000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + }, + }, + }, + { + Payload: &sessionreducepb.SessionReduceRequest_Payload{ + Keys: []string{"client2"}, + Value: []byte(strconv.Itoa(20)), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_OPEN, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(78000)), + End: timestamppb.New(time.UnixMilli(88000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + }, + }, + }, + { + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_MERGE, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(70000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + { + Start: timestamppb.New(time.UnixMilli(75000)), + End: timestamppb.New(time.UnixMilli(85000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + }, + }, + }, + { + Payload: &sessionreducepb.SessionReduceRequest_Payload{ + Keys: []string{"client1"}, + Value: []byte(strconv.Itoa(10)), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_APPEND, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(85000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + }, + }, + }, + { + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_MERGE, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(70000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + { + Start: timestamppb.New(time.UnixMilli(78000)), + End: timestamppb.New(time.UnixMilli(88000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + }, + }, + }, + { + Payload: &sessionreducepb.SessionReduceRequest_Payload{ + Keys: []string{"client2"}, + Value: []byte(strconv.Itoa(10)), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_APPEND, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(88000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + }, + }, + }, + { + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_CLOSE, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(85000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(88000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + }, + }, + }, + }, + expected: []*sessionreducepb.SessionReduceResponse{ + { + Result: &sessionreducepb.SessionReduceResponse_Result{ + Keys: []string{"client1_test"}, + Value: []byte(strconv.Itoa(30)), + }, + KeyedWindow: &sessionreducepb.KeyedWindow{ + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(85000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + EOF: false, + }, + { + Result: &sessionreducepb.SessionReduceResponse_Result{ + Keys: []string{"client2_test"}, + Value: []byte(strconv.Itoa(50)), + }, + KeyedWindow: &sessionreducepb.KeyedWindow{ + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(88000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + EOF: false, + }, + }, + expectedErr: false, + }, + { + name: "open_merge_expand_close", + handler: &SessionSumCreator{}, + input: []*sessionreducepb.SessionReduceRequest{ + { + Payload: &sessionreducepb.SessionReduceRequest_Payload{ + Keys: []string{"client1"}, + Value: []byte(strconv.Itoa(10)), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_OPEN, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(70000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + }, + }, + }, + { + Payload: &sessionreducepb.SessionReduceRequest_Payload{ + Keys: []string{"client2"}, + Value: []byte(strconv.Itoa(20)), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_OPEN, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(70000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + }, + }, + }, + { + Payload: &sessionreducepb.SessionReduceRequest_Payload{ + Keys: []string{"client1"}, + Value: []byte(strconv.Itoa(10)), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_OPEN, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(75000)), + End: timestamppb.New(time.UnixMilli(85000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + }, + }, + }, + { + Payload: &sessionreducepb.SessionReduceRequest_Payload{ + Keys: []string{"client2"}, + Value: []byte(strconv.Itoa(20)), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_OPEN, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(78000)), + End: timestamppb.New(time.UnixMilli(88000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + }, + }, + }, + { + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_MERGE, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(70000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + { + Start: timestamppb.New(time.UnixMilli(75000)), + End: timestamppb.New(time.UnixMilli(85000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + }, + }, + }, + { + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_MERGE, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(70000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + { + Start: timestamppb.New(time.UnixMilli(78000)), + End: timestamppb.New(time.UnixMilli(88000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + }, + }, + }, + { + Payload: &sessionreducepb.SessionReduceRequest_Payload{ + Keys: []string{"client1"}, + Value: []byte(strconv.Itoa(10)), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_EXPAND, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(85000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(95000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + }, + }, + }, + { + Payload: &sessionreducepb.SessionReduceRequest_Payload{ + Keys: []string{"client1"}, + Value: []byte(strconv.Itoa(10)), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_EXPAND, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(88000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(98000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + }, + }, + }, + { + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_CLOSE, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(95000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(98000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + }, + }, + }, + }, + expected: []*sessionreducepb.SessionReduceResponse{ + { + Result: &sessionreducepb.SessionReduceResponse_Result{ + Keys: []string{"client1_test"}, + Value: []byte(strconv.Itoa(30)), + }, + KeyedWindow: &sessionreducepb.KeyedWindow{ + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(95000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + EOF: false, + }, + { + Result: &sessionreducepb.SessionReduceResponse_Result{ + Keys: []string{"client2_test"}, + Value: []byte(strconv.Itoa(50)), + }, + KeyedWindow: &sessionreducepb.KeyedWindow{ + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(98000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + EOF: false, + }, + }, + expectedErr: false, + }, + { + name: "open_merge_merge_close", + handler: &SessionSumCreator{}, + input: []*sessionreducepb.SessionReduceRequest{ + { + Payload: &sessionreducepb.SessionReduceRequest_Payload{ + Keys: []string{"client1"}, + Value: []byte(strconv.Itoa(10)), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_OPEN, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(70000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + }, + }, + }, + { + Payload: &sessionreducepb.SessionReduceRequest_Payload{ + Keys: []string{"client2"}, + Value: []byte(strconv.Itoa(20)), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_OPEN, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(70000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + }, + }, + }, + { + Payload: &sessionreducepb.SessionReduceRequest_Payload{ + Keys: []string{"client1"}, + Value: []byte(strconv.Itoa(10)), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_OPEN, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(75000)), + End: timestamppb.New(time.UnixMilli(85000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + }, + }, + }, + { + Payload: &sessionreducepb.SessionReduceRequest_Payload{ + Keys: []string{"client2"}, + Value: []byte(strconv.Itoa(20)), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_OPEN, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(78000)), + End: timestamppb.New(time.UnixMilli(88000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + }, + }, + }, + { + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_MERGE, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(70000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + { + Start: timestamppb.New(time.UnixMilli(75000)), + End: timestamppb.New(time.UnixMilli(85000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + }, + }, + }, + { + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_MERGE, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(70000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + { + Start: timestamppb.New(time.UnixMilli(78000)), + End: timestamppb.New(time.UnixMilli(88000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + }, + }, + }, + { + Payload: &sessionreducepb.SessionReduceRequest_Payload{ + Keys: []string{"client1"}, + Value: []byte(strconv.Itoa(10)), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_OPEN, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(50000)), + End: timestamppb.New(time.UnixMilli(80000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + }, + }, + }, + { + Payload: &sessionreducepb.SessionReduceRequest_Payload{ + Keys: []string{"client2"}, + Value: []byte(strconv.Itoa(10)), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_OPEN, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(50000)), + End: timestamppb.New(time.UnixMilli(80000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + }, + }, + }, + { + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_MERGE, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(85000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + { + Start: timestamppb.New(time.UnixMilli(50000)), + End: timestamppb.New(time.UnixMilli(80000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + }, + }, + }, + { + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_MERGE, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(88000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + { + Start: timestamppb.New(time.UnixMilli(50000)), + End: timestamppb.New(time.UnixMilli(80000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + }, + }, + }, + { + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_CLOSE, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(50000)), + End: timestamppb.New(time.UnixMilli(85000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + { + Start: timestamppb.New(time.UnixMilli(50000)), + End: timestamppb.New(time.UnixMilli(88000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + }, + }, + }, + }, + expected: []*sessionreducepb.SessionReduceResponse{ + { + Result: &sessionreducepb.SessionReduceResponse_Result{ + Keys: []string{"client1_test"}, + Value: []byte(strconv.Itoa(30)), + }, + KeyedWindow: &sessionreducepb.KeyedWindow{ + Start: timestamppb.New(time.UnixMilli(50000)), + End: timestamppb.New(time.UnixMilli(85000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + EOF: false, + }, + { + Result: &sessionreducepb.SessionReduceResponse_Result{ + Keys: []string{"client2_test"}, + Value: []byte(strconv.Itoa(50)), + }, + KeyedWindow: &sessionreducepb.KeyedWindow{ + Start: timestamppb.New(time.UnixMilli(50000)), + End: timestamppb.New(time.UnixMilli(88000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + EOF: false, + }, + }, + expectedErr: false, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) {