diff --git a/pkg/sessionreducer/service_test.go b/pkg/sessionreducer/service_test.go index bcce002d..b001bd3a 100644 --- a/pkg/sessionreducer/service_test.go +++ b/pkg/sessionreducer/service_test.go @@ -770,6 +770,674 @@ func TestService_SessionReduceFn(t *testing.T) { }, expectedErr: false, }, + { + name: "open_merge_append_close", + handler: &SessionSumCreator{}, + input: []*sessionreducepb.SessionReduceRequest{ + { + Payload: &sessionreducepb.SessionReduceRequest_Payload{ + Keys: []string{"client1"}, + Value: []byte(strconv.Itoa(10)), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_OPEN, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(70000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + }, + }, + }, + { + Payload: &sessionreducepb.SessionReduceRequest_Payload{ + Keys: []string{"client2"}, + Value: []byte(strconv.Itoa(20)), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_OPEN, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(70000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + }, + }, + }, + { + Payload: &sessionreducepb.SessionReduceRequest_Payload{ + Keys: []string{"client1"}, + Value: []byte(strconv.Itoa(10)), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_OPEN, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(75000)), + End: timestamppb.New(time.UnixMilli(85000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + }, + }, + }, + { + Payload: &sessionreducepb.SessionReduceRequest_Payload{ + Keys: []string{"client2"}, + Value: []byte(strconv.Itoa(20)), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_OPEN, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(78000)), + End: timestamppb.New(time.UnixMilli(88000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + }, + }, + }, + { + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_MERGE, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(70000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + { + Start: timestamppb.New(time.UnixMilli(75000)), + End: timestamppb.New(time.UnixMilli(85000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + }, + }, + }, + { + Payload: &sessionreducepb.SessionReduceRequest_Payload{ + Keys: []string{"client1"}, + Value: []byte(strconv.Itoa(10)), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_APPEND, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(85000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + }, + }, + }, + { + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_MERGE, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(70000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + { + Start: timestamppb.New(time.UnixMilli(78000)), + End: timestamppb.New(time.UnixMilli(88000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + }, + }, + }, + { + Payload: &sessionreducepb.SessionReduceRequest_Payload{ + Keys: []string{"client2"}, + Value: []byte(strconv.Itoa(10)), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_APPEND, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(88000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + }, + }, + }, + { + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_CLOSE, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(85000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(88000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + }, + }, + }, + }, + expected: []*sessionreducepb.SessionReduceResponse{ + { + Result: &sessionreducepb.SessionReduceResponse_Result{ + Keys: []string{"client1_test"}, + Value: []byte(strconv.Itoa(30)), + }, + KeyedWindow: &sessionreducepb.KeyedWindow{ + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(85000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + EOF: false, + }, + { + Result: &sessionreducepb.SessionReduceResponse_Result{ + Keys: []string{"client2_test"}, + Value: []byte(strconv.Itoa(50)), + }, + KeyedWindow: &sessionreducepb.KeyedWindow{ + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(88000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + EOF: false, + }, + }, + expectedErr: false, + }, + { + name: "open_merge_expand_close", + handler: &SessionSumCreator{}, + input: []*sessionreducepb.SessionReduceRequest{ + { + Payload: &sessionreducepb.SessionReduceRequest_Payload{ + Keys: []string{"client1"}, + Value: []byte(strconv.Itoa(10)), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_OPEN, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(70000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + }, + }, + }, + { + Payload: &sessionreducepb.SessionReduceRequest_Payload{ + Keys: []string{"client2"}, + Value: []byte(strconv.Itoa(20)), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_OPEN, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(70000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + }, + }, + }, + { + Payload: &sessionreducepb.SessionReduceRequest_Payload{ + Keys: []string{"client1"}, + Value: []byte(strconv.Itoa(10)), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_OPEN, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(75000)), + End: timestamppb.New(time.UnixMilli(85000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + }, + }, + }, + { + Payload: &sessionreducepb.SessionReduceRequest_Payload{ + Keys: []string{"client2"}, + Value: []byte(strconv.Itoa(20)), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_OPEN, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(78000)), + End: timestamppb.New(time.UnixMilli(88000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + }, + }, + }, + { + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_MERGE, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(70000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + { + Start: timestamppb.New(time.UnixMilli(75000)), + End: timestamppb.New(time.UnixMilli(85000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + }, + }, + }, + { + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_MERGE, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(70000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + { + Start: timestamppb.New(time.UnixMilli(78000)), + End: timestamppb.New(time.UnixMilli(88000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + }, + }, + }, + { + Payload: &sessionreducepb.SessionReduceRequest_Payload{ + Keys: []string{"client1"}, + Value: []byte(strconv.Itoa(10)), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_EXPAND, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(85000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(95000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + }, + }, + }, + { + Payload: &sessionreducepb.SessionReduceRequest_Payload{ + Keys: []string{"client2"}, + Value: []byte(strconv.Itoa(10)), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_EXPAND, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(88000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(98000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + }, + }, + }, + { + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_CLOSE, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(95000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(98000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + }, + }, + }, + }, + expected: []*sessionreducepb.SessionReduceResponse{ + { + Result: &sessionreducepb.SessionReduceResponse_Result{ + Keys: []string{"client1_test"}, + Value: []byte(strconv.Itoa(30)), + }, + KeyedWindow: &sessionreducepb.KeyedWindow{ + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(95000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + EOF: false, + }, + { + Result: &sessionreducepb.SessionReduceResponse_Result{ + Keys: []string{"client2_test"}, + Value: []byte(strconv.Itoa(50)), + }, + KeyedWindow: &sessionreducepb.KeyedWindow{ + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(98000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + EOF: false, + }, + }, + expectedErr: false, + }, + { + name: "open_merge_merge_close", + handler: &SessionSumCreator{}, + input: []*sessionreducepb.SessionReduceRequest{ + { + Payload: &sessionreducepb.SessionReduceRequest_Payload{ + Keys: []string{"client1"}, + Value: []byte(strconv.Itoa(10)), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_OPEN, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(70000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + }, + }, + }, + { + Payload: &sessionreducepb.SessionReduceRequest_Payload{ + Keys: []string{"client2"}, + Value: []byte(strconv.Itoa(20)), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_OPEN, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(70000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + }, + }, + }, + { + Payload: &sessionreducepb.SessionReduceRequest_Payload{ + Keys: []string{"client1"}, + Value: []byte(strconv.Itoa(10)), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_OPEN, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(75000)), + End: timestamppb.New(time.UnixMilli(85000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + }, + }, + }, + { + Payload: &sessionreducepb.SessionReduceRequest_Payload{ + Keys: []string{"client2"}, + Value: []byte(strconv.Itoa(20)), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_OPEN, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(78000)), + End: timestamppb.New(time.UnixMilli(88000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + }, + }, + }, + { + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_MERGE, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(70000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + { + Start: timestamppb.New(time.UnixMilli(75000)), + End: timestamppb.New(time.UnixMilli(85000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + }, + }, + }, + { + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_MERGE, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(70000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + { + Start: timestamppb.New(time.UnixMilli(78000)), + End: timestamppb.New(time.UnixMilli(88000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + }, + }, + }, + { + Payload: &sessionreducepb.SessionReduceRequest_Payload{ + Keys: []string{"client1"}, + Value: []byte(strconv.Itoa(10)), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_OPEN, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(50000)), + End: timestamppb.New(time.UnixMilli(80000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + }, + }, + }, + { + Payload: &sessionreducepb.SessionReduceRequest_Payload{ + Keys: []string{"client2"}, + Value: []byte(strconv.Itoa(10)), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_OPEN, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(50000)), + End: timestamppb.New(time.UnixMilli(80000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + }, + }, + }, + { + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_MERGE, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(85000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + { + Start: timestamppb.New(time.UnixMilli(50000)), + End: timestamppb.New(time.UnixMilli(80000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + }, + }, + }, + { + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_MERGE, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(60000)), + End: timestamppb.New(time.UnixMilli(88000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + { + Start: timestamppb.New(time.UnixMilli(50000)), + End: timestamppb.New(time.UnixMilli(80000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + }, + }, + }, + { + Operation: &sessionreducepb.SessionReduceRequest_WindowOperation{ + Event: sessionreducepb.SessionReduceRequest_WindowOperation_CLOSE, + KeyedWindows: []*sessionreducepb.KeyedWindow{ + { + Start: timestamppb.New(time.UnixMilli(50000)), + End: timestamppb.New(time.UnixMilli(85000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + { + Start: timestamppb.New(time.UnixMilli(50000)), + End: timestamppb.New(time.UnixMilli(88000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + }, + }, + }, + }, + expected: []*sessionreducepb.SessionReduceResponse{ + { + Result: &sessionreducepb.SessionReduceResponse_Result{ + Keys: []string{"client1_test"}, + Value: []byte(strconv.Itoa(30)), + }, + KeyedWindow: &sessionreducepb.KeyedWindow{ + Start: timestamppb.New(time.UnixMilli(50000)), + End: timestamppb.New(time.UnixMilli(85000)), + Slot: "slot-0", + Keys: []string{"client1"}, + }, + EOF: false, + }, + { + Result: &sessionreducepb.SessionReduceResponse_Result{ + Keys: []string{"client2_test"}, + Value: []byte(strconv.Itoa(50)), + }, + KeyedWindow: &sessionreducepb.KeyedWindow{ + Start: timestamppb.New(time.UnixMilli(50000)), + End: timestamppb.New(time.UnixMilli(88000)), + Slot: "slot-0", + Keys: []string{"client2"}, + }, + EOF: false, + }, + }, + expectedErr: false, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/pkg/sessionreducer/task_manager.go b/pkg/sessionreducer/task_manager.go index e40df8b6..3bfaa050 100644 --- a/pkg/sessionreducer/task_manager.go +++ b/pkg/sessionreducer/task_manager.go @@ -19,11 +19,13 @@ type sessionReduceTask struct { outputCh chan Message doneCh chan struct{} merged *atomic.Bool + lock sync.RWMutex } // buildSessionReduceResponse builds the session reduce response from the messages. func (rt *sessionReduceTask) buildSessionReduceResponse(message Message) *v1.SessionReduceResponse { - + rt.lock.RLock() + defer rt.lock.RUnlock() response := &v1.SessionReduceResponse{ Result: &v1.SessionReduceResponse_Result{ Keys: message.Keys(), @@ -32,22 +34,38 @@ func (rt *sessionReduceTask) buildSessionReduceResponse(message Message) *v1.Ses }, KeyedWindow: rt.keyedWindow, } - return response } // buildEOFResponse builds the EOF response for the session reduce task. func (rt *sessionReduceTask) buildEOFResponse() *v1.SessionReduceResponse { + rt.lock.RLock() + defer rt.lock.RUnlock() response := &v1.SessionReduceResponse{ KeyedWindow: rt.keyedWindow, EOF: true, } - return response } +// assignNewKeyedWindow updates the window for the current session reduce task. +func (rt *sessionReduceTask) assignNewKeyedWindow(kw *v1.KeyedWindow) { + rt.lock.Lock() + defer rt.lock.Unlock() + rt.keyedWindow = kw +} + +// getKeyedWindow returns the window for the current session reduce task. +func (rt *sessionReduceTask) getKeyedWindow() *v1.KeyedWindow { + rt.lock.RLock() + defer rt.lock.RUnlock() + return rt.keyedWindow +} + // uniqueKey returns the unique key for the session reduce task to be used in the task manager to identify the task. func (rt *sessionReduceTask) uniqueKey() string { + rt.lock.RLock() + defer rt.lock.RUnlock() return fmt.Sprintf("%d:%d:%s", rt.keyedWindow.GetStart().AsTime().UnixMilli(), rt.keyedWindow.GetEnd().AsTime().UnixMilli(), @@ -111,7 +129,7 @@ func (rtm *sessionReduceTaskManager) CreateTask(ctx context.Context, request *v1 } }() - task.sessionReducer.SessionReduce(ctx, task.keyedWindow.GetKeys(), task.inputCh, task.outputCh) + task.sessionReducer.SessionReduce(ctx, task.getKeyedWindow().GetKeys(), task.inputCh, task.outputCh) // close the output channel and wait for the response to be forwarded close(task.outputCh) wg.Wait() @@ -258,7 +276,7 @@ func (rtm *sessionReduceTaskManager) ExpandTask(request *v1.SessionReduceRequest } // assign the new keyedWindow to the task - task.keyedWindow = request.Operation.KeyedWindows[1] + task.assignNewKeyedWindow(request.Operation.KeyedWindows[1]) // delete the old entry from the tasks map and add the new entry delete(rtm.tasks, key)