Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added

- Basic stuck detection after a job's exceeded its timeout and still not returned after the executor's initiated context cancellation and waited a short margin for the cancellation to take effect. [PR #1097](https://github.com/riverqueue/river/pull/1097).
- Add a little more error flavor for when encountering a deadline exceeded error on leadership election suggesting that the user may want to try increasing their database pool size. [PR #1101](https://github.com/riverqueue/river/pull/1101).

## [0.29.0-rc.1] - 2025-12-04

Expand Down
52 changes: 32 additions & 20 deletions internal/leadership/elector.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"strings"
"sync"
Expand Down Expand Up @@ -567,27 +568,38 @@ func attemptElectOrReelect(ctx context.Context, exec riverdriver.Executor, alrea
ctx, cancel := context.WithTimeout(ctx, deadlineTimeout)
defer cancel()

return dbutil.WithTxV(ctx, exec, func(ctx context.Context, exec riverdriver.ExecutorTx) (bool, error) {
if _, err := exec.LeaderDeleteExpired(ctx, &riverdriver.LeaderDeleteExpiredParams{
Now: params.Now,
Schema: params.Schema,
}); err != nil {
return false, err
execTx, err := exec.Begin(ctx)
if err != nil {
Comment on lines +571 to +572
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I spent a moment wondering why we had to ditch dbutil.WithTxV before realizing that it's the Begin which fails and it's before our function here gets called so of course we can't customize the error. But then I wondered: is this extra error detail something we could/should just apply to all Begin calls within River which time out? Are there cases where this would be misleading or inaccurate? Because there definitely other places where the constrained pool size could manifest in the same way. Or do you think that'd be too noisy?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I think the main reason that the elector is the most important is that it's the most common place in River for any user to start a transaction:

They may be starting transactions or performing operations in their Work code more than anywhere else, but this isn't part of River.

Other than that, the elector will be running frequently, and they won't get transactions otherwise unless they're using InsertTx, JobGetTx, etc., or some other feature like scheduled jobs.

I think it might make sense to make this a broad error (although it wouldn't be just on transactions then, it'd be on every driver call), but IMO for now it makes sense to try and improve things incrementally with something like this. We've had a few reports empirically now that this is where people tend to run into the problem, so let's patch this up and see how things go from there.

var additionalDetail string
if errors.Is(err, context.DeadlineExceeded) {
additionalDetail = " (a common cause of this is a database pool that's at its connection limit; you may need to increase maximum connections)"
}

var (
elected bool
err error
)
if alreadyElected {
elected, err = exec.LeaderAttemptReelect(ctx, params)
} else {
elected, err = exec.LeaderAttemptElect(ctx, params)
}
if err != nil {
return false, err
}
return false, fmt.Errorf("error beginning transaction: %w%s", err, additionalDetail)
}
defer dbutil.RollbackWithoutCancel(ctx, execTx)

return elected, nil
})
if _, err := execTx.LeaderDeleteExpired(ctx, &riverdriver.LeaderDeleteExpiredParams{
Now: params.Now,
Schema: params.Schema,
}); err != nil {
return false, err
}

var elected bool
if alreadyElected {
elected, err = execTx.LeaderAttemptReelect(ctx, params)

} else {
elected, err = execTx.LeaderAttemptElect(ctx, params)
}
if err != nil {
return false, err
}

if err := execTx.Commit(ctx); err != nil {
return false, fmt.Errorf("error committing transaction: %w", err)
}

return elected, nil
}