From adcf1f5c1e339d2468b2b3e8ad12ab7eebf86a14 Mon Sep 17 00:00:00 2001 From: Yashash H L Date: Mon, 25 Mar 2024 16:33:56 +0530 Subject: [PATCH] one eof per partition Signed-off-by: Yashash H L --- pkg/reducer/task_manager.go | 18 +++++------------- pkg/reducestreamer/task_manager.go | 18 ++++++------------ 2 files changed, 11 insertions(+), 25 deletions(-) diff --git a/pkg/reducer/task_manager.go b/pkg/reducer/task_manager.go index a722eba3..537f6b59 100644 --- a/pkg/reducer/task_manager.go +++ b/pkg/reducer/task_manager.go @@ -95,8 +95,6 @@ func (rtm *reduceTaskManager) CreateTask(ctx context.Context, request *v1.Reduce // write the output to the output channel, service will forward it to downstream rtm.responseCh <- task.buildReduceResponse(message) } - // send EOF - rtm.responseCh <- task.buildEOFResponse() // close the output channel after the reduce function is done close(task.outputCh) // send a done signal @@ -133,27 +131,21 @@ func (rtm *reduceTaskManager) OutputChannel() <-chan *v1.ReduceResponse { // WaitAll waits for all the reduce tasks to complete. func (rtm *reduceTaskManager) WaitAll() { - tasks := make([]*reduceTask, 0, len(rtm.tasks)) + var eofResponse *v1.ReduceResponse for _, task := range rtm.tasks { - tasks = append(tasks, task) - } - - for _, task := range tasks { <-task.doneCh + if eofResponse == nil { + eofResponse = task.buildEOFResponse() + } } - + rtm.responseCh <- eofResponse // after all the tasks are completed, close the output channel close(rtm.responseCh) } // CloseAll closes all the reduce tasks. func (rtm *reduceTaskManager) CloseAll() { - tasks := make([]*reduceTask, 0, len(rtm.tasks)) for _, task := range rtm.tasks { - tasks = append(tasks, task) - } - - for _, task := range tasks { close(task.inputCh) } } diff --git a/pkg/reducestreamer/task_manager.go b/pkg/reducestreamer/task_manager.go index ae04f096..15c2cc62 100644 --- a/pkg/reducestreamer/task_manager.go +++ b/pkg/reducestreamer/task_manager.go @@ -95,8 +95,6 @@ func (rtm *reduceStreamTaskManager) CreateTask(ctx context.Context, request *v1. // write the output to the output channel, service will forward it to downstream rtm.responseCh <- task.buildReduceResponse(message) } - // send EOF - rtm.responseCh <- task.buildEOFResponse() }() reduceStreamerHandle := rtm.creatorHandle.Create() @@ -141,26 +139,22 @@ func (rtm *reduceStreamTaskManager) OutputChannel() <-chan *v1.ReduceResponse { // WaitAll waits for all the reduceStream tasks to complete. func (rtm *reduceStreamTaskManager) WaitAll() { - tasks := make([]*reduceStreamTask, 0, len(rtm.tasks)) + var eofResponse *v1.ReduceResponse for _, task := range rtm.tasks { - tasks = append(tasks, task) - } - - for _, task := range tasks { <-task.doneCh + if eofResponse == nil { + eofResponse = task.buildEOFResponse() + } } + rtm.responseCh <- eofResponse + // after all the tasks are completed, close the output channel close(rtm.responseCh) } // CloseAll closes all the reduceStream tasks. func (rtm *reduceStreamTaskManager) CloseAll() { - tasks := make([]*reduceStreamTask, 0, len(rtm.tasks)) for _, task := range rtm.tasks { - tasks = append(tasks, task) - } - - for _, task := range tasks { close(task.inputCh) } }