Skip to content

Commit

Permalink
Fix queue and runner
Browse files Browse the repository at this point in the history
  • Loading branch information
wzshiming committed Jan 21, 2025
1 parent 72f7c2f commit f2f0b28
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 2 deletions.
4 changes: 2 additions & 2 deletions queue/controller/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (mc *MessageController) updateWatchChannel(messageID int64, mr MessageRespo
for _, ch := range mc.watchChannels[messageID] {
select {
case ch <- mr:
default:
case <-time.After(time.Second / 10):
}
}
}
Expand Down Expand Up @@ -122,7 +122,7 @@ func (mc *MessageController) updateWatchListChannels(mr MessageResponse) {
for _, ch := range mc.watchListChannels {
select {
case ch <- mr:
default:
case <-time.After(time.Second / 10):
}
}
}
Expand Down
10 changes: 10 additions & 0 deletions runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,16 @@ func (r *Runner) runOnceSync(ctx context.Context, id string, logger *slog.Logger
return errors.Join(errs...)
}

err = r.client.Heartbeat(ctx, resp.MessageID, client.HeartbeatRequest{
Lease: id,
})
if err != nil {
_ = r.client.Cancel(ctx, resp.MessageID, client.CancelRequest{
Lease: id,
})
return err
}

var bmMut sync.Mutex
var bm []model.Blob
var updated bool
Expand Down

0 comments on commit f2f0b28

Please sign in to comment.