Skip to content

Commit f9095c0

Browse files
authored
Rename Args to JobArgs + modify args insert opts check + more tests (#20)
* Rename `Args` to `JobArgs` for better consistency with other River projects. * Use a protocol (`JobArgsWithInsertOpts`) for the check on an args for `insert_opts()` support. (Feels a little more type friendly and also how other River projects do it.) * Add tests that verify that insert opts are respected correctly and various ones that check errors that are returned on invalid insert conditions.
1 parent 00e9a30 commit f9095c0

File tree

5 files changed

+181
-40
lines changed

5 files changed

+181
-40
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10+
### Changed
11+
12+
- Rename `Args` to `JobArgs` and add `JobArgsWithInsertOpts` protocol. [PR #20](https://github.com/riverqueue/river/pull/20).
13+
1014
## [0.1.2] - 2024-07-04
1115

1216
### Changed

README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,10 @@ insert_res = client.insert(
3838
insert_res.job # inserted job row
3939
```
4040

41-
Job args should comply with the following [protocol](https://peps.python.org/pep-0544/):
41+
Job args should comply with the `riverqueue.JobArgs` [protocol](https://peps.python.org/pep-0544/):
4242

4343
```python
44-
class Args(Protocol):
44+
class JobArgs(Protocol):
4545
kind: str
4646

4747
def to_json(self) -> str:
@@ -137,7 +137,7 @@ with engine.begin() as session:
137137
)
138138
```
139139

140-
## Asynchronous I/O (`asyncio`)
140+
## Asynchronous I/O (asyncio)
141141

142142
The package supports River's [`asyncio` (asynchronous I/O)](https://docs.python.org/3/library/asyncio.html) through an alternate `AsyncClient` and `riversqlalchemy.AsyncDriver`. You'll need to make sure to use SQLAlchemy's alternative async engine and an asynchronous Postgres driver like [`asyncpg`](https://github.com/MagicStack/asyncpg), but otherwise usage looks very similar to use without async:
143143

src/riverqueue/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@
88
JOB_STATE_RUNNING as JOB_STATE_RUNNING,
99
JOB_STATE_SCHEDULED as JOB_STATE_SCHEDULED,
1010
AsyncClient as AsyncClient,
11-
Args as Args,
11+
JobArgs as JobArgs,
12+
JobArgsWithInsertOpts as JobArgsWithInsertOpts,
1213
Client as Client,
1314
InsertManyParams as InsertManyParams,
1415
InsertOpts as InsertOpts,

src/riverqueue/client.py

Lines changed: 55 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,16 @@
11
from dataclasses import dataclass
22
from datetime import datetime, timezone, timedelta
3-
from typing import Any, Awaitable, Literal, Optional, Protocol, Tuple, List, Callable
3+
from typing import (
4+
Any,
5+
Awaitable,
6+
Literal,
7+
Optional,
8+
Protocol,
9+
Tuple,
10+
List,
11+
Callable,
12+
runtime_checkable,
13+
)
414

515
from .driver import GetParams, JobInsertParams, DriverProtocol, ExecutorProtocol
616
from .driver.driver_protocol import AsyncDriverProtocol, AsyncExecutorProtocol
@@ -27,27 +37,44 @@
2737
]
2838

2939

30-
class Args(Protocol):
40+
@dataclass
41+
class InsertOpts:
42+
max_attempts: Optional[int] = None
43+
priority: Optional[int] = None
44+
queue: Optional[str] = None
45+
scheduled_at: Optional[datetime] = None
46+
tags: Optional[List[Any]] = None
47+
unique_opts: Optional["UniqueOpts"] = None
48+
49+
50+
class JobArgs(Protocol):
51+
"""
52+
Protocol that should be implemented by all job args.
53+
"""
54+
3155
kind: str
3256

3357
def to_json(self) -> str:
3458
pass
3559

3660

37-
@dataclass
38-
class InsertManyParams:
39-
args: Args
40-
insert_opts: Optional["InsertOpts"] = None
61+
@runtime_checkable
62+
class JobArgsWithInsertOpts(Protocol):
63+
"""
64+
Protocol that's optionally implemented by a JobArgs implementation so that
65+
every inserted instance of them provides the same custom `InsertOpts`.
66+
`InsertOpts` passed to insert functions will take precedence of one returned
67+
by `JobArgsWithInsertOpts`.
68+
"""
69+
70+
def insert_opts(self) -> InsertOpts:
71+
pass
4172

4273

4374
@dataclass
44-
class InsertOpts:
45-
max_attempts: Optional[int] = None
46-
priority: Optional[int] = None
47-
queue: Optional[str] = None
48-
scheduled_at: Optional[datetime] = None
49-
tags: Optional[List[Any]] = None
50-
unique_opts: Optional["UniqueOpts"] = None
75+
class InsertManyParams:
76+
args: JobArgs
77+
insert_opts: Optional[InsertOpts] = None
5178

5279

5380
@dataclass
@@ -68,7 +95,7 @@ def __init__(
6895
)
6996

7097
async def insert(
71-
self, args: Args, insert_opts: Optional[InsertOpts] = None
98+
self, args: JobArgs, insert_opts: Optional[InsertOpts] = None
7299
) -> InsertResult:
73100
async with self.driver.executor() as exec:
74101
if not insert_opts:
@@ -83,7 +110,7 @@ async def insert():
83110
)
84111

85112
async def insert_tx(
86-
self, tx, args: Args, insert_opts: Optional[InsertOpts] = None
113+
self, tx, args: JobArgs, insert_opts: Optional[InsertOpts] = None
87114
) -> InsertResult:
88115
exec = self.driver.unwrap_executor(tx)
89116
if not insert_opts:
@@ -95,11 +122,11 @@ async def insert():
95122

96123
return await self.__check_unique_job(exec, insert_params, unique_opts, insert)
97124

98-
async def insert_many(self, args: List[Args | InsertManyParams]) -> int:
125+
async def insert_many(self, args: List[JobArgs | InsertManyParams]) -> int:
99126
async with self.driver.executor() as exec:
100127
return await exec.job_insert_many(_make_insert_params_many(args))
101128

102-
async def insert_many_tx(self, tx, args: List[Args | InsertManyParams]) -> int:
129+
async def insert_many_tx(self, tx, args: List[JobArgs | InsertManyParams]) -> int:
103130
exec = self.driver.unwrap_executor(tx)
104131
return await exec.job_insert_many(_make_insert_params_many(args))
105132

@@ -137,7 +164,7 @@ def __init__(
137164
)
138165

139166
def insert(
140-
self, args: Args, insert_opts: Optional[InsertOpts] = None
167+
self, args: JobArgs, insert_opts: Optional[InsertOpts] = None
141168
) -> InsertResult:
142169
with self.driver.executor() as exec:
143170
if not insert_opts:
@@ -150,7 +177,7 @@ def insert():
150177
return self.__check_unique_job(exec, insert_params, unique_opts, insert)
151178

152179
def insert_tx(
153-
self, tx, args: Args, insert_opts: Optional[InsertOpts] = None
180+
self, tx, args: JobArgs, insert_opts: Optional[InsertOpts] = None
154181
) -> InsertResult:
155182
exec = self.driver.unwrap_executor(tx)
156183
if not insert_opts:
@@ -162,11 +189,11 @@ def insert():
162189

163190
return self.__check_unique_job(exec, insert_params, unique_opts, insert)
164191

165-
def insert_many(self, args: List[Args | InsertManyParams]) -> int:
192+
def insert_many(self, args: List[JobArgs | InsertManyParams]) -> int:
166193
with self.driver.executor() as exec:
167194
return exec.job_insert_many(_make_insert_params_many(args))
168195

169-
def insert_many_tx(self, tx, args: List[Args | InsertManyParams]) -> int:
196+
def insert_many_tx(self, tx, args: List[JobArgs | InsertManyParams]) -> int:
170197
exec = self.driver.unwrap_executor(tx)
171198
return exec.job_insert_many(_make_insert_params_many(args))
172199

@@ -257,26 +284,25 @@ def _check_advisory_lock_prefix_bounds(
257284
advisory_lock_prefix: Optional[int],
258285
) -> Optional[int]:
259286
if advisory_lock_prefix:
260-
print("in_bytes", advisory_lock_prefix.to_bytes(4))
261287
# We only reserve 4 bytes for the prefix, so make sure the given one
262288
# properly fits. This will error in case that's not the case.
263289
advisory_lock_prefix.to_bytes(4)
264290
return advisory_lock_prefix
265291

266292

267293
def _make_insert_params(
268-
args: Args,
294+
args: JobArgs,
269295
insert_opts: InsertOpts,
270296
is_insert_many: bool = False,
271297
) -> Tuple[JobInsertParams, Optional[UniqueOpts]]:
272-
if not hasattr(args, "kind"):
273-
raise Exception("args should respond to `kind`")
298+
args.kind # fail fast in case args don't respond to kind
274299

275300
args_json = args.to_json()
276-
if args_json is None:
277-
raise Exception("args should return non-nil from `to_json`")
301+
assert args_json is not None, "args should return non-nil from `to_json`"
278302

279-
args_insert_opts = getattr(args, "insert_opts", InsertOpts())
303+
args_insert_opts = InsertOpts()
304+
if isinstance(args, JobArgsWithInsertOpts):
305+
args_insert_opts = args.insert_opts()
280306

281307
scheduled_at = insert_opts.scheduled_at or args_insert_opts.scheduled_at
282308
unique_opts = insert_opts.unique_opts or args_insert_opts.unique_opts
@@ -301,7 +327,7 @@ def _make_insert_params(
301327

302328

303329
def _make_insert_params_many(
304-
args: List[Args | InsertManyParams],
330+
args: List[JobArgs | InsertManyParams],
305331
) -> List[JobInsertParams]:
306332
return [
307333
_make_insert_params(

tests/client_test.py

Lines changed: 117 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from dataclasses import dataclass
12
from datetime import datetime, timezone
23
from unittest.mock import MagicMock, Mock, patch
34

@@ -66,20 +67,95 @@ def mock_unwrap_executor(tx: sqlalchemy.Transaction):
6667
assert insert_res.job == "job_row"
6768

6869

69-
def test_insert_with_opts(client, mock_exec):
70-
insert_opts = InsertOpts(queue="high_priority")
70+
def test_insert_with_insert_opts_from_args(client, mock_exec):
71+
mock_exec.job_insert.return_value = "job_row"
72+
73+
insert_res = client.insert(
74+
SimpleArgs(),
75+
insert_opts=InsertOpts(
76+
max_attempts=23, priority=2, queue="job_custom_queue", tags=["job_custom"]
77+
),
78+
)
79+
80+
mock_exec.job_insert.assert_called_once()
81+
assert insert_res.job == "job_row"
82+
83+
insert_args = mock_exec.job_insert.call_args[0][0]
84+
assert insert_args.max_attempts == 23
85+
assert insert_args.priority == 2
86+
assert insert_args.queue == "job_custom_queue"
87+
assert insert_args.tags == ["job_custom"]
88+
89+
90+
def test_insert_with_insert_opts_from_job(client, mock_exec):
91+
@dataclass
92+
class MyArgs:
93+
kind = "my_args"
94+
95+
@staticmethod
96+
def insert_opts() -> InsertOpts:
97+
return InsertOpts(
98+
max_attempts=23,
99+
priority=2,
100+
queue="job_custom_queue",
101+
tags=["job_custom"],
102+
)
103+
104+
@staticmethod
105+
def to_json() -> str:
106+
return "{}"
71107

72-
mock_exec.job_get_by_kind_and_unique_properties.return_value = None
73108
mock_exec.job_insert.return_value = "job_row"
74109

75-
insert_res = client.insert(SimpleArgs(), insert_opts=insert_opts)
110+
insert_res = client.insert(
111+
MyArgs(),
112+
)
76113

77114
mock_exec.job_insert.assert_called_once()
78115
assert insert_res.job == "job_row"
79116

80-
# Check that the InsertOpts were correctly passed to make_insert_params
81-
call_args = mock_exec.job_insert.call_args[0][0]
82-
assert call_args.queue == "high_priority"
117+
insert_args = mock_exec.job_insert.call_args[0][0]
118+
assert insert_args.max_attempts == 23
119+
assert insert_args.priority == 2
120+
assert insert_args.queue == "job_custom_queue"
121+
assert insert_args.tags == ["job_custom"]
122+
123+
124+
def test_insert_with_insert_opts_precedence(client, mock_exec):
125+
@dataclass
126+
class MyArgs:
127+
kind = "my_args"
128+
129+
@staticmethod
130+
def insert_opts() -> InsertOpts:
131+
return InsertOpts(
132+
max_attempts=23,
133+
priority=2,
134+
queue="job_custom_queue",
135+
tags=["job_custom"],
136+
)
137+
138+
@staticmethod
139+
def to_json() -> str:
140+
return "{}"
141+
142+
mock_exec.job_insert.return_value = "job_row"
143+
144+
insert_res = client.insert(
145+
SimpleArgs(),
146+
insert_opts=InsertOpts(
147+
max_attempts=17, priority=3, queue="my_queue", tags=["custom"]
148+
),
149+
)
150+
151+
mock_exec.job_insert.assert_called_once()
152+
assert insert_res.job == "job_row"
153+
154+
insert_args = mock_exec.job_insert.call_args[0][0]
155+
assert insert_args.max_attempts == 17
156+
assert insert_args.priority == 3
157+
assert insert_args.queue == "my_queue"
158+
assert insert_args.tags == ["custom"]
83159

84160

85161
def test_insert_with_unique_opts_by_args(client, mock_exec):
@@ -149,6 +225,40 @@ def test_insert_with_unique_opts_by_state(client, mock_exec):
149225
assert call_args.kind == "simple"
150226

151227

228+
def test_insert_kind_error(client):
229+
@dataclass
230+
class MyArgs:
231+
pass
232+
233+
with pytest.raises(AttributeError) as ex:
234+
client.insert(MyArgs())
235+
assert "'MyArgs' object has no attribute 'kind'" == str(ex.value)
236+
237+
238+
def test_insert_to_json_attribute_error(client):
239+
@dataclass
240+
class MyArgs:
241+
kind = "my"
242+
243+
with pytest.raises(AttributeError) as ex:
244+
client.insert(MyArgs())
245+
assert "'MyArgs' object has no attribute 'to_json'" == str(ex.value)
246+
247+
248+
def test_insert_to_json_none_error(client):
249+
@dataclass
250+
class MyArgs:
251+
kind = "my"
252+
253+
@staticmethod
254+
def to_json() -> None:
255+
return None
256+
257+
with pytest.raises(AssertionError) as ex:
258+
client.insert(MyArgs())
259+
assert "args should return non-nil from `to_json`" == str(ex.value)
260+
261+
152262
def test_check_advisory_lock_prefix_bounds():
153263
Client(mock_driver, advisory_lock_prefix=123)
154264

0 commit comments

Comments
 (0)