Skip to content

Commit

Permalink
one eof per partition
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 committed Mar 25, 2024
1 parent ee9c3ee commit adcf1f5
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 25 deletions.
18 changes: 5 additions & 13 deletions pkg/reducer/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,6 @@ func (rtm *reduceTaskManager) CreateTask(ctx context.Context, request *v1.Reduce
// write the output to the output channel, service will forward it to downstream
rtm.responseCh <- task.buildReduceResponse(message)
}
// send EOF
rtm.responseCh <- task.buildEOFResponse()
// close the output channel after the reduce function is done
close(task.outputCh)
// send a done signal
Expand Down Expand Up @@ -133,27 +131,21 @@ func (rtm *reduceTaskManager) OutputChannel() <-chan *v1.ReduceResponse {

// WaitAll waits for all the reduce tasks to complete.
func (rtm *reduceTaskManager) WaitAll() {
tasks := make([]*reduceTask, 0, len(rtm.tasks))
var eofResponse *v1.ReduceResponse
for _, task := range rtm.tasks {
tasks = append(tasks, task)
}

for _, task := range tasks {
<-task.doneCh
if eofResponse == nil {
eofResponse = task.buildEOFResponse()
}
}

rtm.responseCh <- eofResponse
// after all the tasks are completed, close the output channel
close(rtm.responseCh)
}

// CloseAll closes all the reduce tasks.
func (rtm *reduceTaskManager) CloseAll() {
tasks := make([]*reduceTask, 0, len(rtm.tasks))
for _, task := range rtm.tasks {
tasks = append(tasks, task)
}

for _, task := range tasks {
close(task.inputCh)
}
}
Expand Down
18 changes: 6 additions & 12 deletions pkg/reducestreamer/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,6 @@ func (rtm *reduceStreamTaskManager) CreateTask(ctx context.Context, request *v1.
// write the output to the output channel, service will forward it to downstream
rtm.responseCh <- task.buildReduceResponse(message)
}
// send EOF
rtm.responseCh <- task.buildEOFResponse()
}()

reduceStreamerHandle := rtm.creatorHandle.Create()
Expand Down Expand Up @@ -141,26 +139,22 @@ func (rtm *reduceStreamTaskManager) OutputChannel() <-chan *v1.ReduceResponse {

// WaitAll waits for all the reduceStream tasks to complete.
func (rtm *reduceStreamTaskManager) WaitAll() {
tasks := make([]*reduceStreamTask, 0, len(rtm.tasks))
var eofResponse *v1.ReduceResponse
for _, task := range rtm.tasks {
tasks = append(tasks, task)
}

for _, task := range tasks {
<-task.doneCh
if eofResponse == nil {
eofResponse = task.buildEOFResponse()
}
}
rtm.responseCh <- eofResponse

// after all the tasks are completed, close the output channel
close(rtm.responseCh)
}

// CloseAll closes all the reduceStream tasks.
func (rtm *reduceStreamTaskManager) CloseAll() {
tasks := make([]*reduceStreamTask, 0, len(rtm.tasks))
for _, task := range rtm.tasks {
tasks = append(tasks, task)
}

for _, task := range tasks {
close(task.inputCh)
}
}
Expand Down

0 comments on commit adcf1f5

Please sign in to comment.