Skip to content

Commit cacb1a0

Browse files
authored
Merge pull request #38 from lrascao/backport-get-work-items-send-timeout
Merge pull request #24 from famarting/get-work-items-send-timeout
2 parents 1315145 + 98ebb91 commit cacb1a0

File tree

1 file changed

+29
-1
lines changed

1 file changed

+29
-1
lines changed

backend/executor.go

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ type grpcExecutor struct {
5656
onWorkItemConnection func(context.Context) error
5757
onWorkItemDisconnect func(context.Context) error
5858
streamShutdownChan <-chan any
59+
streamSendTimeout *time.Duration
5960
}
6061

6162
type grpcExecutorOptions func(g *grpcExecutor)
@@ -90,6 +91,12 @@ func WithStreamShutdownChannel(c <-chan any) grpcExecutorOptions {
9091
}
9192
}
9293

94+
func WithStreamSendTimeout(d time.Duration) grpcExecutorOptions {
95+
return func(g *grpcExecutor) {
96+
g.streamSendTimeout = &d
97+
}
98+
}
99+
93100
// NewGrpcExecutor returns the Executor object and a method to invoke to register the gRPC server in the executor.
94101
func NewGrpcExecutor(be Backend, logger Logger, opts ...grpcExecutorOptions) (executor Executor, registerServerFn func(grpcServer grpc.ServiceRegistrar)) {
95102
grpcExecutor := &grpcExecutor{
@@ -322,7 +329,7 @@ func (g *grpcExecutor) GetWorkItems(req *protos.GetWorkItemsRequest, stream prot
322329
}
323330
}
324331

325-
if err := stream.Send(wi); err != nil {
332+
if err := g.sendWorkItem(stream, wi); err != nil {
326333
g.logger.Errorf("encountered an error while sending work item: %v", err)
327334
return err
328335
}
@@ -336,6 +343,27 @@ func (g *grpcExecutor) GetWorkItems(req *protos.GetWorkItemsRequest, stream prot
336343
}
337344
}
338345

346+
func (g *grpcExecutor) sendWorkItem(stream protos.TaskHubSidecarService_GetWorkItemsServer, wi *protos.WorkItem) error {
347+
ctx := stream.Context()
348+
if g.streamSendTimeout != nil {
349+
var cancel context.CancelFunc
350+
ctx, cancel = context.WithTimeout(ctx, *g.streamSendTimeout)
351+
defer cancel()
352+
}
353+
354+
errCh := make(chan error, 2)
355+
go func() {
356+
select {
357+
case errCh <- stream.Send(wi):
358+
case <-ctx.Done():
359+
g.logger.Errorf("timed out while sending work item")
360+
errCh <- ctx.Err()
361+
}
362+
}()
363+
364+
return <-errCh
365+
}
366+
339367
// CompleteOrchestratorTask implements protos.TaskHubSidecarServiceServer
340368
func (g *grpcExecutor) CompleteOrchestratorTask(ctx context.Context, res *protos.OrchestratorResponse) (*protos.CompleteTaskResponse, error) {
341369
iid := api.InstanceID(res.InstanceId)

0 commit comments

Comments
 (0)