-
Notifications
You must be signed in to change notification settings - Fork 132
Introduce Client.JobUpdate function that can store output incrementally
#1098
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@bgentry I think I broke some tests here, but wanted to get your thoughts on the rough shape of this approach. I didn't add tests yet. |
ecb6ed1 to
c016eff
Compare
CHANGELOG.md
Outdated
|
|
||
| ### Added | ||
|
|
||
| - Added `Client.JobUpdate` which can be used to persist job output partway through a running work function instead of having to wait until the job is completed. [PR #1093](https://github.com/riverqueue/river/pull/1093). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| - Added `Client.JobUpdate` which can be used to persist job output partway through a running work function instead of having to wait until the job is completed. [PR #1093](https://github.com/riverqueue/river/pull/1093). | |
| - Added `Client.JobUpdate` which can be used to persist job output partway through a running work function instead of having to wait until the job is completed. [PR #1098](https://github.com/riverqueue/river/pull/1098). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed, thx.
| -- name: JobUpdateFast :one | ||
| WITH locked_job AS ( | ||
| SELECT id | ||
| FROM /* TEMPLATE: schema */river_job | ||
| WHERE river_job.id = @id | ||
| FOR UPDATE | ||
| ) | ||
| UPDATE /* TEMPLATE: schema */river_job | ||
| SET | ||
| metadata = CASE WHEN @metadata_do_merge::boolean THEN metadata || @metadata::jsonb ELSE metadata END | ||
| FROM | ||
| locked_job | ||
| WHERE river_job.id = locked_job.id | ||
| RETURNING river_job.*; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name JobUpdateFast doesn't seem right—it's not really faster than an alternative option and in its current form it merely updates metadata. Maybe we will expand its purpose, I'm not sure, but in the mean time we could probably still pick a better suffix.
One idea is to rename the current JobUpdate to JobUpdateFull or something of that sort and let this one take the JobUpdate name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
K, I changed this to JobUpdate + JobUpdateFull.
| // JobUpdate updates the job with the given ID. | ||
| // | ||
| // Currently, no fields are explicitly updatable via this method, but if called | ||
| // inside a work function, it will set output recorded with RecordOutput, if | ||
| // any. | ||
| func (c *Client[TTx]) JobUpdate(ctx context.Context, id int64, params *JobUpdateParams) (*rivertype.JobRow, error) { | ||
| return c.jobUpdate(ctx, c.driver.GetExecutor(), id, params) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm fully onboard with providing an API to make it easy to write metadata updates, particularly for output. At the same time I'm wondering if we want to couple this functionality for "store the recorded output immediately instead of waiting for job completion" to something that's named as a general-purpose JobUpdate. People will probably want to use this to make arbitrary updates to jobs, including those that aren't even the job currently being executed. That'd lead to this method having some surprising and potentially undesired behavior just depending on whether it's called for the current running job or another one.
It also feels a little weird/confusing/unnatural to have to first call the river.RecordOutput() function (which buffers up or lazily records the output) followed by a subsequent call to some client method to immediately save the already-buffered output.
Despite all this, I understand why you went down this route. A special-purpose API like Client.JobRecordOutput / Client.JobRecordOutputNow feels quite specific. And if you exposed metadata as a field on a generalized JobUpdateParams it'd be unclear whether it fully replaces or merges with metadata. Additionally, the nicety of river.RecordOutput is that it both marshals for you and enforces length restrictions that would have to be reproduced elsewhere.
I don't really have a strong preference on an approach here, but I do think it's important to raise these points and hear your perspective before we commit to one. I think I have a slight preference for a single client method that'd allow directly passing output or other metadata updates to be merged in (while applying the same rules & restrictions we do in RecordOutput / with generalized metadata updates from middleware), primarily for two reasons:
- Avoid the two-step process of
river.RecordOutput+client.JobUpdatewithout params, in favor of a single explicit call. Letriver.RecordOutputbe clearly documented to do so lazily, and cross-reference w/ the new method documentation to explain when to use each. - Avoid behavioral differences depending on which job you're updating (the currently running one or another, say a dependency).
Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I can think of a good half dozen alternatives to this, but I couldn't think of any that are better than the proposed route, just different. For example:
- Specific
Client.JobRecordOutputfunctions: definitely thought about this, but just feels too specific and unnecessary given it wouldn't have a major advantage overJobUpdateand we might want to expandJobUpdate. Client.JobUpdate(orClient.JobRecordOutput) instead ofriver.RecordOutput+Client.JobUpdate: I see this as a possibility, but it definitely felt to me that it'd be surprising if the normalriver.RecordOutputdidn't work as expected when doing a mid-work output update.
I've pushed a new change in which I augment the new JobUpdate so it now has two modes of operation:
- If
river.RecordOutputhas been used, it uses the output out of that to save to the job row. - Adds a
JobUpdateParams.Outputthat can be used to override output directly, thereby allowing a one-step operation.
In case both context output and JobUpdateParams.Output is set, we error as it's ambiguous which one was intended.
This feels like a pretty good compromise to me: you get shorter usage if you want it, but we don't introduce any API footguns.
I still need to make a few augmentation like checking output length and such, but do you want to take a look and see if you're okay with this direction?
08478f0 to
166b496
Compare
bgentry
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor comments only, let's ship this! I think this is a good approach generally, and I like that it gives you flexibility.
We should probably also tweak docs on RecordOutput to make its lazy-write behavior clear and could call out JobUpdate / JobUpdateTx for cases where you want an immediate extra write.
client.go
Outdated
| // .JobUpdateFull updates the job with the given ID. | ||
| // | ||
| // Currently, no fields are explicitly updatable via this method, but if called | ||
| // inside a work function, it will set output recorded with RecordOutput, if | ||
| // any. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo at the start and slightly outdated given the new behavior here. Tx variant below also needs updating.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, good call. Fixed.
client.go
Outdated
|
|
||
| var ( | ||
| metadataDoMerge bool | ||
| metadataUpdatesBytes = []byte("{}") // even in the even of no update, still valid jsonb |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| metadataUpdatesBytes = []byte("{}") // even in the even of no update, still valid jsonb | |
| metadataUpdatesBytes = []byte("{}") // even in the event of no update, still valid jsonb |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thx, fixed.
client.go
Outdated
| if outputFromWorkContext != nil && params.Output != nil { | ||
| return nil, errors.New("should not set job output both from work context (via RecordOutput) and in JobUpdateParams") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, feels a bit surprising, but then so is the alternative of quietly allowing JobUpdate to take precedence over the previously recorded output. I think I lean slightly in the direction of allowing it with documentation instead of erroring as done here since that would only happen if you're explicitly updating the job with an Output after you've already called RecordOutput, but I could go either way as long as clearly documented.
For reference we already make the last-write-wins behavior clear in RecordOutput so IMO allowing it here too would be consistent with that:
// Only one output can be stored per job. If this function is called more than
// once, the output will be overwritten with the latest value. The output also
// must be recorded _before_ the job finishes executing so that it can be stored
// when the job's row is updated.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright, works for me. I tweaked the code so it's no longer an error and that params output always takes precedence.
166b496 to
8a1d016
Compare
…ally Here, take a stab at a solution for #1064. We introduce a new client function `Client.JobUpdate` that takes any output currently recorded in context and stores it to the given job row. `JobUpdate` currently only sets output, but the idea is that it could be expanded in the future in case it's useful to do so. The reason that we don't have a `river.PersistOutput` in line with `river.RecordOutput` is that once we're talking about storing data, we're dealing with the usual persistence semantics. We need a client instance with access to an executor, and we probably want to have a `*Tx` variant like we have for all other functions like this one. Fixes #1064.
8a1d016 to
8365c8a
Compare
|
Great, thx!
Done. |
bgentry
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great! ![]()
|
Amazing! 🚢 |
Here, take a stab at a solution for #1064. We introduce a new client
function
Client.JobUpdatethat takes any output currently recorded incontext and stores it to the given job row.
JobUpdatecurrently onlysets output, but the idea is that it could be expanded in the future in
case it's useful to do so.
The reason that we don't have a
river.PersistOutputin line withriver.RecordOutputis that once we're talking about storing data,we're dealing with the usual persistence semantics. We need a client
instance with access to an executor, and we probably want to have a
*Txvariant like we have for all other functions like this one.Fixes #1064.