-
Notifications
You must be signed in to change notification settings - Fork 131
Description
Was skimming through this to debug a customer issue. Realized that we explicitly remove all cancellation from the context, which is intentional because it's derived from work context and we don't want to halt an in-progress fetch during shutdown because we may have already locked/updated those jobs and would need to cleanly release them to avoid them needing to be rescued. With the pro pilot, we have a retry loop with individualized per-attempt timeouts applied. However, for the standard pilot, there are no additional timeouts applied at all to this method:
Lines 735 to 764 in ed11967
| func (p *producer) dispatchWork(workCtx context.Context, count int, fetchResultCh chan<- producerFetchResult) { | |
| // This intentionally removes any deadlines or cancellation from the parent | |
| // context because we don't want it to get cancelled if the producer is asked | |
| // to shut down. In that situation, we want to finish fetching any jobs we are | |
| // in the midst of fetching, work them, and then stop. Otherwise we'd have a | |
| // risk of shutting down when we had already fetched jobs in the database, | |
| // leaving those jobs stranded. We'd then potentially have to release them | |
| // back to the queue. | |
| ctx := context.WithoutCancel(workCtx) | |
| // Maximum size of the `attempted_by` array on each job row. This maximum is | |
| // rarely hit, but exists to protect against degenerate cases. | |
| const maxAttemptedBy = 100 | |
| jobs, err := p.pilot.JobGetAvailable(ctx, p.exec, p.state, &riverdriver.JobGetAvailableParams{ | |
| ClientID: p.config.ClientID, | |
| MaxAttemptedBy: maxAttemptedBy, | |
| MaxToLock: count, | |
| Now: p.Time.NowUTCOrNil(), | |
| Queue: p.config.Queue, | |
| ProducerID: p.id.Load(), | |
| Schema: p.config.Schema, | |
| }) | |
| if err != nil { | |
| fetchResultCh <- producerFetchResult{err: err} | |
| return | |
| } | |
| fetchResultCh <- producerFetchResult{jobs: jobs} | |
| } |
What's your thought @brandur? Should we apply a fixed timeout to these fetches in the standard pilot so that it doesn't hang indefinitely (freezing the producer and/or blocking shutdown), or something else?