From 717aade1b0ad5e58e1d53ff90c566e8050ff59df Mon Sep 17 00:00:00 2001 From: Keran Yang Date: Wed, 14 Feb 2024 14:27:48 -0500 Subject: [PATCH] fix: race condition Signed-off-by: Keran Yang --- pkg/sessionreducer/task_manager.go | 28 +++++++++++++++++++++++----- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/pkg/sessionreducer/task_manager.go b/pkg/sessionreducer/task_manager.go index e40df8b6..3bfaa050 100644 --- a/pkg/sessionreducer/task_manager.go +++ b/pkg/sessionreducer/task_manager.go @@ -19,11 +19,13 @@ type sessionReduceTask struct { outputCh chan Message doneCh chan struct{} merged *atomic.Bool + lock sync.RWMutex } // buildSessionReduceResponse builds the session reduce response from the messages. func (rt *sessionReduceTask) buildSessionReduceResponse(message Message) *v1.SessionReduceResponse { - + rt.lock.RLock() + defer rt.lock.RUnlock() response := &v1.SessionReduceResponse{ Result: &v1.SessionReduceResponse_Result{ Keys: message.Keys(), @@ -32,22 +34,38 @@ func (rt *sessionReduceTask) buildSessionReduceResponse(message Message) *v1.Ses }, KeyedWindow: rt.keyedWindow, } - return response } // buildEOFResponse builds the EOF response for the session reduce task. func (rt *sessionReduceTask) buildEOFResponse() *v1.SessionReduceResponse { + rt.lock.RLock() + defer rt.lock.RUnlock() response := &v1.SessionReduceResponse{ KeyedWindow: rt.keyedWindow, EOF: true, } - return response } +// assignNewKeyedWindow updates the window for the current session reduce task. +func (rt *sessionReduceTask) assignNewKeyedWindow(kw *v1.KeyedWindow) { + rt.lock.Lock() + defer rt.lock.Unlock() + rt.keyedWindow = kw +} + +// getKeyedWindow returns the window for the current session reduce task. +func (rt *sessionReduceTask) getKeyedWindow() *v1.KeyedWindow { + rt.lock.RLock() + defer rt.lock.RUnlock() + return rt.keyedWindow +} + // uniqueKey returns the unique key for the session reduce task to be used in the task manager to identify the task. func (rt *sessionReduceTask) uniqueKey() string { + rt.lock.RLock() + defer rt.lock.RUnlock() return fmt.Sprintf("%d:%d:%s", rt.keyedWindow.GetStart().AsTime().UnixMilli(), rt.keyedWindow.GetEnd().AsTime().UnixMilli(), @@ -111,7 +129,7 @@ func (rtm *sessionReduceTaskManager) CreateTask(ctx context.Context, request *v1 } }() - task.sessionReducer.SessionReduce(ctx, task.keyedWindow.GetKeys(), task.inputCh, task.outputCh) + task.sessionReducer.SessionReduce(ctx, task.getKeyedWindow().GetKeys(), task.inputCh, task.outputCh) // close the output channel and wait for the response to be forwarded close(task.outputCh) wg.Wait() @@ -258,7 +276,7 @@ func (rtm *sessionReduceTaskManager) ExpandTask(request *v1.SessionReduceRequest } // assign the new keyedWindow to the task - task.keyedWindow = request.Operation.KeyedWindows[1] + task.assignNewKeyedWindow(request.Operation.KeyedWindows[1]) // delete the old entry from the tasks map and add the new entry delete(rtm.tasks, key)