diff --git a/queue/controller/message.go b/queue/controller/message.go index ae178da..7412d75 100644 --- a/queue/controller/message.go +++ b/queue/controller/message.go @@ -92,7 +92,7 @@ func (mc *MessageController) updateWatchChannel(messageID int64, mr MessageRespo for _, ch := range mc.watchChannels[messageID] { select { case ch <- mr: - default: + case <-time.After(time.Second / 10): } } } @@ -122,7 +122,7 @@ func (mc *MessageController) updateWatchListChannels(mr MessageResponse) { for _, ch := range mc.watchListChannels { select { case ch <- mr: - default: + case <-time.After(time.Second / 10): } } } diff --git a/runner/runner.go b/runner/runner.go index 1ef4935..58501a7 100644 --- a/runner/runner.go +++ b/runner/runner.go @@ -173,6 +173,16 @@ func (r *Runner) runOnceSync(ctx context.Context, id string, logger *slog.Logger return errors.Join(errs...) } + err = r.client.Heartbeat(ctx, resp.MessageID, client.HeartbeatRequest{ + Lease: id, + }) + if err != nil { + _ = r.client.Cancel(ctx, resp.MessageID, client.CancelRequest{ + Lease: id, + }) + return err + } + var bmMut sync.Mutex var bm []model.Blob var updated bool