Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added

- Basic stuck detection after a job's exceeded its timeout and still not returned after the executor's initiated context cancellation and waited a short margin for the cancellation to take effect. [PR #1097](https://github.com/riverqueue/river/pull/1097).
- Added `Client.JobUpdate` which can be used to persist job output partway through a running work function instead of having to wait until the job is completed. [PR #1098](https://github.com/riverqueue/river/pull/1098).
- Add a little more error flavor for when encountering a deadline exceeded error on leadership election suggesting that the user may want to try increasing their database pool size. [PR #1101](https://github.com/riverqueue/river/pull/1101).

## [0.29.0-rc.1] - 2025-12-04

### Added

- Added `HookPeriodicJobsStart` that can be used to run custom logic when a periodic job enqueuer starts up on a new leader. [PR #1084](https://github.com/riverqueue/river/pull/1084).
- Added `Client.Notify().RequestResign` and `Client.Notify().RequestResignTx` functions allowing any client to request that the current leader resign. [PR #1085](https://github.com/riverqueue/river/pull/1085).

Expand Down
92 changes: 92 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/riverqueue/river/internal/dbunique"
"github.com/riverqueue/river/internal/hooklookup"
"github.com/riverqueue/river/internal/jobcompleter"
"github.com/riverqueue/river/internal/jobexecutor"
"github.com/riverqueue/river/internal/leadership"
"github.com/riverqueue/river/internal/maintenance"
"github.com/riverqueue/river/internal/middlewarelookup"
Expand Down Expand Up @@ -1514,6 +1515,97 @@ func (c *Client[TTx]) jobRetry(ctx context.Context, exec riverdriver.Executor, i
})
}

// JobUpdateParams contains parameters for Client.JobUpdate and Client.JobUpdateTx.
type JobUpdateParams struct {
// Output is a new output value for a job.
//
// If not set, and a job is updated from inside a work function, the job's
// output is set based on output recorded so far using RecordOutput.
Output any
}

// JobUpdate updates the job with the given ID.
//
// If JobUpdateParams.Output is not set, this function may be used inside a job
// work function to set a job's output based on output recorded so far using
// RecordOutput.
func (c *Client[TTx]) JobUpdate(ctx context.Context, id int64, params *JobUpdateParams) (*rivertype.JobRow, error) {
return c.jobUpdate(ctx, c.driver.GetExecutor(), id, params)
}

// JobUpdateTx updates the job with the given ID.
//
// If JobUpdateParams.Output is not set, this function may be used inside a job
// work function to set a job's output based on output recorded so far using
// RecordOutput.
//
// This variant updates the job inside of a transaction.
func (c *Client[TTx]) JobUpdateTx(ctx context.Context, tx TTx, id int64, params *JobUpdateParams) (*rivertype.JobRow, error) {
return c.jobUpdate(ctx, c.driver.UnwrapExecutor(tx), id, params)
}

func (c *Client[TTx]) jobUpdate(ctx context.Context, exec riverdriver.Executor, id int64, params *JobUpdateParams) (*rivertype.JobRow, error) {
if params == nil {
params = &JobUpdateParams{}
}

outputFromWorkContext := func() json.RawMessage {
metadataUpdates, hasMetadataUpdates := jobexecutor.MetadataUpdatesFromWorkContext(ctx)
if !hasMetadataUpdates {
return nil
}

if val, ok := metadataUpdates[rivertype.MetadataKeyOutput]; ok {
return val.(json.RawMessage) //nolint:forcetypeassert
}

return nil
}()

var (
metadataDoMerge bool
metadataUpdatesBytes = []byte("{}") // even in the event of no update, still valid jsonb
)
if outputFromWorkContext != nil || params.Output != nil {
metadataDoMerge = true

var outputBytes json.RawMessage

switch {
// comes first because params takes precedence over context output
case params.Output != nil:
var err error
outputBytes, err = json.Marshal(params.Output)
if err != nil {
return nil, err
}

if err := checkOutputSize(outputBytes); err != nil {
return nil, err
}

case outputFromWorkContext != nil:
// no size check necessary here because it's already been checked in RecordOutput
outputBytes = outputFromWorkContext
}

var err error
metadataUpdatesBytes, err = json.Marshal(map[string]json.RawMessage{
rivertype.MetadataKeyOutput: outputBytes,
})
if err != nil {
return nil, fmt.Errorf("error marshaling metadata updates to JSON: %w", err)
}
}

return exec.JobUpdate(ctx, &riverdriver.JobUpdateParams{
ID: id,
MetadataDoMerge: metadataDoMerge,
Metadata: metadataUpdatesBytes,
Schema: c.config.Schema,
})
}

// ID returns the unique ID of this client as set in its config or
// auto-generated if not specified.
func (c *Client[TTx]) ID() string {
Expand Down
188 changes: 185 additions & 3 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/tidwall/sjson"

"github.com/riverqueue/river/internal/dbunique"
"github.com/riverqueue/river/internal/jobexecutor"
"github.com/riverqueue/river/internal/maintenance"
"github.com/riverqueue/river/internal/middlewarelookup"
"github.com/riverqueue/river/internal/notifier"
Expand Down Expand Up @@ -4534,6 +4535,187 @@ func Test_Client_JobRetry(t *testing.T) {
})
}

func Test_Client_JobUpdate(t *testing.T) {
t.Parallel()

ctx := context.Background()

type testBundle struct {
dbPool *pgxpool.Pool
}

setup := func(t *testing.T) (*Client[pgx.Tx], *testBundle) {
t.Helper()

var (
dbPool = riversharedtest.DBPool(ctx, t)
driver = riverpgxv5.New(dbPool)
schema = riverdbtest.TestSchema(ctx, t, driver, nil)
config = newTestConfig(t, schema)
client = newTestClient(t, dbPool, config)
)

return client, &testBundle{dbPool: dbPool}
}

t.Run("AllParams", func(t *testing.T) {
t.Parallel()

client, _ := setup(t)

insertRes, err := client.Insert(ctx, noOpArgs{}, &InsertOpts{})
require.NoError(t, err)

job, err := client.JobUpdate(ctx, insertRes.Job.ID, &JobUpdateParams{
Output: "my job output",
})
require.NoError(t, err)
require.Equal(t, `"my job output"`, string(job.Output()))

updatedJob, err := client.JobGet(ctx, job.ID)
require.NoError(t, err)
require.Equal(t, `"my job output"`, string(updatedJob.Output()))
})

t.Run("NoParams", func(t *testing.T) {
t.Parallel()

client, _ := setup(t)

insertRes, err := client.Insert(ctx, noOpArgs{}, &InsertOpts{})
require.NoError(t, err)

_, err = client.JobUpdate(ctx, insertRes.Job.ID, nil)
require.NoError(t, err)
})

t.Run("OutputFromContext", func(t *testing.T) {
t.Parallel()

client, _ := setup(t)

insertRes, err := client.Insert(ctx, noOpArgs{}, &InsertOpts{})
require.NoError(t, err)

ctx := context.WithValue(ctx, jobexecutor.ContextKeyMetadataUpdates, map[string]any{})
require.NoError(t, RecordOutput(ctx, "my job output from context"))

job, err := client.JobUpdate(ctx, insertRes.Job.ID, &JobUpdateParams{})
require.NoError(t, err)
require.Equal(t, `"my job output from context"`, string(job.Output()))

updatedJob, err := client.JobGet(ctx, job.ID)
require.NoError(t, err)
require.Equal(t, `"my job output from context"`, string(updatedJob.Output()))
})

t.Run("ParamOutputTakesPrecedenceOverContextOutput", func(t *testing.T) {
t.Parallel()

client, _ := setup(t)

insertRes, err := client.Insert(ctx, noOpArgs{}, &InsertOpts{})
require.NoError(t, err)

ctx := context.WithValue(ctx, jobexecutor.ContextKeyMetadataUpdates, map[string]any{})
require.NoError(t, RecordOutput(ctx, "my job output from context"))

job, err := client.JobUpdate(ctx, insertRes.Job.ID, &JobUpdateParams{
Output: "my job output from params",
})
require.NoError(t, err)
require.Equal(t, `"my job output from params"`, string(job.Output()))

updatedJob, err := client.JobGet(ctx, job.ID)
require.NoError(t, err)
require.Equal(t, `"my job output from params"`, string(updatedJob.Output()))
})

t.Run("ParamOutputTooLarge", func(t *testing.T) {
t.Parallel()

client, _ := setup(t)

insertRes, err := client.Insert(ctx, noOpArgs{}, &InsertOpts{})
require.NoError(t, err)

_, err = client.JobUpdate(ctx, insertRes.Job.ID, &JobUpdateParams{
Output: strings.Repeat("x", maxOutputSizeBytes+1),
})
require.ErrorContains(t, err, "output is too large")
})
}

func Test_Client_JobUpdateTx(t *testing.T) {
t.Parallel()

ctx := context.Background()

type testBundle struct {
dbPool *pgxpool.Pool
executorTx riverdriver.ExecutorTx
tx pgx.Tx
}

setup := func(t *testing.T) (*Client[pgx.Tx], *testBundle) {
t.Helper()

var (
dbPool = riversharedtest.DBPool(ctx, t)
driver = riverpgxv5.New(dbPool)
schema = riverdbtest.TestSchema(ctx, t, driver, nil)
config = newTestConfig(t, schema)
client = newTestClient(t, dbPool, config)
)

tx, err := dbPool.Begin(ctx)
require.NoError(t, err)
t.Cleanup(func() { tx.Rollback(ctx) })

return client, &testBundle{
dbPool: dbPool,
executorTx: client.driver.UnwrapExecutor(tx),
tx: tx,
}
}

t.Run("AllParams", func(t *testing.T) {
t.Parallel()

client, bundle := setup(t)

insertRes, err := client.Insert(ctx, noOpArgs{}, &InsertOpts{})
require.NoError(t, err)

job, err := client.JobUpdateTx(ctx, bundle.tx, insertRes.Job.ID, &JobUpdateParams{
Output: "my job output",
})
require.NoError(t, err)
require.Equal(t, `"my job output"`, string(job.Output()))

updatedJob, err := client.JobGetTx(ctx, bundle.tx, job.ID)
require.NoError(t, err)
require.Equal(t, `"my job output"`, string(updatedJob.Output()))

// Outside of transaction shows original
updatedJob, err = client.JobGet(ctx, job.ID)
require.NoError(t, err)
require.Empty(t, string(updatedJob.Output()))
})

t.Run("NoParams", func(t *testing.T) {
t.Parallel()

client, bundle := setup(t)

insertRes, err := client.Insert(ctx, noOpArgs{}, &InsertOpts{})
require.NoError(t, err)

_, err = client.JobUpdateTx(ctx, bundle.tx, insertRes.Job.ID, nil)
require.NoError(t, err)
})
}

func Test_Client_ErrorHandler(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -5859,7 +6041,7 @@ func Test_Client_RetryPolicy(t *testing.T) {
// regression protection to ensure we're testing the right number of jobs:
require.Equal(t, rivercommon.MaxAttemptsDefault, insertRes.Job.MaxAttempts)

updatedJob, err := client.driver.GetExecutor().JobUpdate(ctx, &riverdriver.JobUpdateParams{
updatedJob, err := client.driver.GetExecutor().JobUpdateFull(ctx, &riverdriver.JobUpdateFullParams{
ID: insertRes.Job.ID,
AttemptedAtDoUpdate: true,
AttemptedAt: &now, // we want a value here, but it'll be overwritten as jobs are locked by the producer
Expand Down Expand Up @@ -6647,7 +6829,7 @@ func Test_Client_JobCompletion(t *testing.T) {
}

AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
_, err := client.driver.GetExecutor().JobUpdate(ctx, &riverdriver.JobUpdateParams{
_, err := client.driver.GetExecutor().JobUpdateFull(ctx, &riverdriver.JobUpdateFullParams{
ID: job.ID,
FinalizedAtDoUpdate: true,
FinalizedAt: &now,
Expand Down Expand Up @@ -6753,7 +6935,7 @@ func Test_Client_JobCompletion(t *testing.T) {
}

AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
_, err := client.driver.GetExecutor().JobUpdate(ctx, &riverdriver.JobUpdateParams{
_, err := client.driver.GetExecutor().JobUpdateFull(ctx, &riverdriver.JobUpdateFullParams{
ID: job.ID,
ErrorsDoUpdate: true,
Errors: [][]byte{[]byte("{\"error\": \"oops\"}")},
Expand Down
4 changes: 2 additions & 2 deletions internal/jobexecutor/job_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ func TestJobExecutor_Execute(t *testing.T) {
require.Equal(t, rivertype.JobStateAvailable, job.State)
}

_, err := bundle.exec.JobUpdate(ctx, &riverdriver.JobUpdateParams{
_, err := bundle.exec.JobUpdateFull(ctx, &riverdriver.JobUpdateFullParams{
ID: bundle.jobRow.ID,
StateDoUpdate: true,
State: rivertype.JobStateRunning,
Expand Down Expand Up @@ -373,7 +373,7 @@ func TestJobExecutor_Execute(t *testing.T) {

// add a unique key so we can verify it's cleared
var err error
bundle.jobRow, err = bundle.exec.JobUpdate(ctx, &riverdriver.JobUpdateParams{
bundle.jobRow, err = bundle.exec.JobUpdateFull(ctx, &riverdriver.JobUpdateFullParams{
ID: bundle.jobRow.ID,
State: rivertype.JobStateAvailable, // required for encoding but ignored
})
Expand Down
Loading
Loading