Skip to content

Commit 1fde20c

Browse files
committed
use new unique jobs implementation
1 parent 54b4549 commit 1fde20c

File tree

20 files changed

+285
-1191
lines changed

20 files changed

+285
-1191
lines changed

Gemfile.lock

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ GEM
143143

144144
PLATFORMS
145145
arm64-darwin-22
146+
arm64-darwin-23
146147
x86_64-linux
147148

148149
DEPENDENCIES

docs/README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@ insert_res.job # inserted job row
4242

4343
Job args should:
4444

45-
* Respond to `#kind` with a unique string that identifies them in the database, and which a Go worker will recognize.
46-
* Response to `#to_json` with a JSON serialization that'll be parseable as an object in Go.
45+
- Respond to `#kind` with a unique string that identifies them in the database, and which a Go worker will recognize.
46+
- Response to `#to_json` with a JSON serialization that'll be parseable as an object in Go.
4747

4848
They may also respond to `#insert_opts` with an instance of `InsertOpts` to define insertion options that'll be used for all jobs of the kind.
4949

@@ -89,7 +89,7 @@ insert_res.unique_skipped_as_duplicated
8989
Unique job insertion takes a Postgres advisory lock to make sure that its uniqueness check still works even if two conflicting insert operations are occurring in parallel. Postgres advisory locks share a global 64-bit namespace, which is a large enough space that it's unlikely for two advisory locks to ever conflict, but to _guarantee_ that River's advisory locks never interfere with an application's, River can be configured with a 32-bit advisory lock prefix which it will use for all its locks:
9090

9191
```ruby
92-
client = River::Client.new(mock_driver, advisory_lock_prefix: 123456)
92+
client = River::Client.new(mock_driver)
9393
```
9494

9595
Doing so has the downside of leaving only 32 bits for River's locks (64 bits total - 32-bit prefix), making them somewhat more likely to conflict with each other.

driver/riverqueue-activerecord/Gemfile.lock

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ GEM
120120

121121
PLATFORMS
122122
arm64-darwin-22
123+
arm64-darwin-23
123124
x86_64-linux
124125

125126
DEPENDENCIES

driver/riverqueue-activerecord/lib/driver.rb

Lines changed: 18 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -31,36 +31,19 @@ def errors = {}
3131
end
3232
end
3333

34-
def advisory_lock(key)
35-
::ActiveRecord::Base.connection.execute("SELECT pg_advisory_xact_lock(#{key})")
36-
nil
37-
end
38-
39-
def advisory_lock_try(key)
40-
::ActiveRecord::Base.connection.execute("SELECT pg_try_advisory_xact_lock(123)").first["pg_try_advisory_xact_lock"]
41-
end
42-
4334
def job_get_by_id(id)
4435
data_set = RiverJob.where(id: id)
4536
data_set.first ? to_job_row_from_model(data_set.first) : nil
4637
end
4738

48-
def job_get_by_kind_and_unique_properties(get_params)
49-
data_set = RiverJob.where(kind: get_params.kind)
50-
data_set = data_set.where("tstzrange(?, ?, '[)') @> created_at", get_params.created_at[0], get_params.created_at[1]) if get_params.created_at
51-
data_set = data_set.where(args: get_params.encoded_args) if get_params.encoded_args
52-
data_set = data_set.where(queue: get_params.queue) if get_params.queue
53-
data_set = data_set.where(state: get_params.state) if get_params.state
54-
data_set.first ? to_job_row_from_model(data_set.first) : nil
55-
end
56-
5739
def job_insert(insert_params)
58-
to_job_row_from_model(RiverJob.create(insert_params_to_hash(insert_params)))
40+
res = job_insert_many([insert_params]).first
41+
[to_job_row_from_raw(res), res.send(:hash_rows)[0]["unique_skipped_as_duplicate"]]
5942
end
6043

61-
def job_insert_unique(insert_params, unique_key)
62-
res = RiverJob.upsert(
63-
insert_params_to_hash(insert_params).merge(unique_key: unique_key),
44+
def job_insert_many(insert_params_many)
45+
RiverJob.upsert_all(
46+
insert_params_many.map { |param| insert_params_to_hash(param) },
6447
on_duplicate: Arel.sql("kind = EXCLUDED.kind"),
6548
returning: Arel.sql("*, (xmax != 0) AS unique_skipped_as_duplicate"),
6649

@@ -69,15 +52,9 @@ def job_insert_unique(insert_params, unique_key)
6952
# ActiveRecord tries to look up a unique index instead of letting
7053
# Postgres handle that, and of course it doesn't support a `WHERE`
7154
# clause. The workaround is to target the index name instead of columns.
72-
unique_by: "river_job_kind_unique_key_idx"
55+
unique_by: "river_job_unique_idx"
7356
)
74-
75-
[to_job_row_from_raw(res), res.send(:hash_rows)[0]["unique_skipped_as_duplicate"]]
76-
end
77-
78-
def job_insert_many(insert_params_many)
79-
RiverJob.insert_all(insert_params_many.map { |p| insert_params_to_hash(p) })
80-
insert_params_many.count
57+
.map { |row| to_insert_result(row) }
8158
end
8259

8360
def job_list
@@ -104,10 +81,16 @@ def transaction(&)
10481
queue: insert_params.queue,
10582
state: insert_params.state,
10683
scheduled_at: insert_params.scheduled_at,
107-
tags: insert_params.tags
84+
tags: insert_params.tags,
85+
unique_key: insert_params.unique_key,
86+
unique_states: insert_params.unique_states
10887
}.compact
10988
end
11089

90+
private def to_insert_result(result)
91+
[to_job_row_from_model(result), result.send(:hash_rows)[0]["unique_skipped_as_duplicate"]]
92+
end
93+
11194
private def to_job_row_from_model(river_job)
11295
# needs to be accessed through values because `errors` is shadowed by both
11396
# ActiveRecord and the patch above
@@ -139,7 +122,8 @@ def transaction(&)
139122
scheduled_at: river_job.scheduled_at.getutc,
140123
state: river_job.state,
141124
tags: river_job.tags,
142-
unique_key: river_job.unique_key
125+
unique_key: river_job.unique_key,
126+
unique_states: river_job.unique_states
143127
)
144128
end
145129

@@ -182,7 +166,8 @@ def transaction(&)
182166
scheduled_at: river_job["scheduled_at"].getutc,
183167
state: river_job["state"],
184168
tags: river_job["tags"],
185-
unique_key: river_job["unique_key"]
169+
unique_key: river_job["unique_key"],
170+
unique_states: river_job["unique_states"]
186171
)
187172
end
188173
end

driver/riverqueue-sequel/Gemfile.lock

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ GEM
7878

7979
PLATFORMS
8080
arm64-darwin-22
81+
arm64-darwin-23
8182
x86_64-linux
8283

8384
DEPENDENCIES

driver/riverqueue-sequel/lib/driver.rb

Lines changed: 20 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -13,51 +13,27 @@ def initialize(db)
1313
@db.extension(:pg_json)
1414
end
1515

16-
def advisory_lock(key)
17-
@db.fetch("SELECT pg_advisory_xact_lock(?)", key).first
18-
nil
19-
end
20-
21-
def advisory_lock_try(key)
22-
@db.fetch("SELECT pg_try_advisory_xact_lock(?)", key).first[:pg_try_advisory_xact_lock]
23-
end
24-
2516
def job_get_by_id(id)
2617
data_set = @db[:river_job].where(id: id)
2718
data_set.first ? to_job_row(data_set.first) : nil
2819
end
2920

30-
def job_get_by_kind_and_unique_properties(get_params)
31-
data_set = @db[:river_job].where(kind: get_params.kind)
32-
data_set = data_set.where(::Sequel.lit("tstzrange(?, ?, '[)') @> created_at", get_params.created_at[0], get_params.created_at[1])) if get_params.created_at
33-
data_set = data_set.where(args: get_params.encoded_args) if get_params.encoded_args
34-
data_set = data_set.where(queue: get_params.queue) if get_params.queue
35-
data_set = data_set.where(state: get_params.state) if get_params.state
36-
data_set.first ? to_job_row(data_set.first) : nil
37-
end
38-
3921
def job_insert(insert_params)
40-
to_job_row(@db[:river_job].returning.insert_select(insert_params_to_hash(insert_params)))
22+
job_insert_many([insert_params]).first
4123
end
4224

43-
def job_insert_unique(insert_params, unique_key)
44-
values = @db[:river_job]
25+
def job_insert_many(insert_params_array)
26+
@db[:river_job]
4527
.insert_conflict(
46-
target: [:kind, :unique_key],
47-
conflict_where: ::Sequel.lit("unique_key IS NOT NULL"),
28+
target: [:unique_key],
29+
conflict_where: ::Sequel.lit(
30+
"unique_key IS NOT NULL AND unique_states IS NOT NULL AND river_job_state_in_bitmask(unique_states, state)"
31+
),
4832
update: {kind: ::Sequel[:excluded][:kind]}
4933
)
5034
.returning(::Sequel.lit("*, (xmax != 0) AS unique_skipped_as_duplicate"))
51-
.insert_select(
52-
insert_params_to_hash(insert_params).merge(unique_key: ::Sequel.blob(unique_key))
53-
)
54-
55-
[to_job_row(values), values[:unique_skipped_as_duplicate]]
56-
end
57-
58-
def job_insert_many(insert_params_many)
59-
@db[:river_job].multi_insert(insert_params_many.map { |p| insert_params_to_hash(p) })
60-
insert_params_many.count
35+
.multi_insert(insert_params_array.map { |p| insert_params_to_hash(p) })
36+
.map { |row| to_insert_result(row) }
6137
end
6238

6339
def job_list
@@ -76,6 +52,7 @@ def transaction(&)
7652
private def insert_params_to_hash(insert_params)
7753
# the call to `#compact` is important so that we remove nils and table
7854
# default values get picked up instead
55+
# TODO: but I had to remove it for bulk unique inserts...
7956
{
8057
args: insert_params.encoded_args,
8158
kind: insert_params.kind,
@@ -84,8 +61,14 @@ def transaction(&)
8461
queue: insert_params.queue,
8562
state: insert_params.state,
8663
scheduled_at: insert_params.scheduled_at,
87-
tags: insert_params.tags ? ::Sequel.pg_array(insert_params.tags) : nil
88-
}.compact
64+
tags: insert_params.tags ? ::Sequel.pg_array(insert_params.tags, :text) : nil,
65+
unique_key: insert_params.unique_key ? ::Sequel.blob(insert_params.unique_key) : nil,
66+
unique_states: insert_params.unique_states
67+
}
68+
end
69+
70+
private def to_insert_result(result)
71+
[to_job_row(result), result[:unique_skipped_as_duplicate]]
8972
end
9073

9174
private def to_job_row(river_job)
@@ -113,7 +96,8 @@ def transaction(&)
11396
scheduled_at: river_job[:scheduled_at].getutc,
11497
state: river_job[:state],
11598
tags: river_job[:tags].to_a,
116-
unique_key: river_job[:unique_key]&.to_s
99+
unique_key: river_job[:unique_key]&.to_s,
100+
unique_states: river_job[:unique_states]&.to_i(2)
117101
)
118102
end
119103
end

0 commit comments

Comments
 (0)