Skip to content

Commit

Permalink
fix: race condition
Browse files Browse the repository at this point in the history
Signed-off-by: Keran Yang <[email protected]>
  • Loading branch information
KeranYang committed Feb 14, 2024
1 parent cf708ae commit 717aade
Showing 1 changed file with 23 additions and 5 deletions.
28 changes: 23 additions & 5 deletions pkg/sessionreducer/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ type sessionReduceTask struct {
outputCh chan Message
doneCh chan struct{}
merged *atomic.Bool
lock sync.RWMutex
}

// buildSessionReduceResponse builds the session reduce response from the messages.
func (rt *sessionReduceTask) buildSessionReduceResponse(message Message) *v1.SessionReduceResponse {

rt.lock.RLock()
defer rt.lock.RUnlock()
response := &v1.SessionReduceResponse{
Result: &v1.SessionReduceResponse_Result{
Keys: message.Keys(),
Expand All @@ -32,22 +34,38 @@ func (rt *sessionReduceTask) buildSessionReduceResponse(message Message) *v1.Ses
},
KeyedWindow: rt.keyedWindow,
}

return response
}

// buildEOFResponse builds the EOF response for the session reduce task.
func (rt *sessionReduceTask) buildEOFResponse() *v1.SessionReduceResponse {
rt.lock.RLock()
defer rt.lock.RUnlock()
response := &v1.SessionReduceResponse{
KeyedWindow: rt.keyedWindow,
EOF: true,
}

return response
}

// assignNewKeyedWindow updates the window for the current session reduce task.
func (rt *sessionReduceTask) assignNewKeyedWindow(kw *v1.KeyedWindow) {
rt.lock.Lock()
defer rt.lock.Unlock()
rt.keyedWindow = kw
}

// getKeyedWindow returns the window for the current session reduce task.
func (rt *sessionReduceTask) getKeyedWindow() *v1.KeyedWindow {
rt.lock.RLock()
defer rt.lock.RUnlock()
return rt.keyedWindow
}

// uniqueKey returns the unique key for the session reduce task to be used in the task manager to identify the task.
func (rt *sessionReduceTask) uniqueKey() string {
rt.lock.RLock()
defer rt.lock.RUnlock()
return fmt.Sprintf("%d:%d:%s",
rt.keyedWindow.GetStart().AsTime().UnixMilli(),
rt.keyedWindow.GetEnd().AsTime().UnixMilli(),
Expand Down Expand Up @@ -111,7 +129,7 @@ func (rtm *sessionReduceTaskManager) CreateTask(ctx context.Context, request *v1
}
}()

task.sessionReducer.SessionReduce(ctx, task.keyedWindow.GetKeys(), task.inputCh, task.outputCh)
task.sessionReducer.SessionReduce(ctx, task.getKeyedWindow().GetKeys(), task.inputCh, task.outputCh)
// close the output channel and wait for the response to be forwarded
close(task.outputCh)
wg.Wait()
Expand Down Expand Up @@ -258,7 +276,7 @@ func (rtm *sessionReduceTaskManager) ExpandTask(request *v1.SessionReduceRequest
}

// assign the new keyedWindow to the task
task.keyedWindow = request.Operation.KeyedWindows[1]
task.assignNewKeyedWindow(request.Operation.KeyedWindows[1])

// delete the old entry from the tasks map and add the new entry
delete(rtm.tasks, key)
Expand Down

0 comments on commit 717aade

Please sign in to comment.