1
1
from __future__ import annotations
2
2
3
+ import sys
4
+ import asyncio
3
5
import functools
4
- from typing import TypeVar , Callable , Awaitable
6
+ import contextvars
7
+ from typing import Any , TypeVar , Callable , Awaitable
5
8
from typing_extensions import ParamSpec
6
9
7
- import anyio
8
- import anyio .to_thread
9
-
10
- from ._reflection import function_has_argument
11
-
12
10
T_Retval = TypeVar ("T_Retval" )
13
11
T_ParamSpec = ParamSpec ("T_ParamSpec" )
14
12
15
13
16
- # copied from `asyncer`, https://github.com/tiangolo/asyncer
17
- def asyncify (
18
- function : Callable [T_ParamSpec , T_Retval ],
19
- * ,
20
- cancellable : bool = False ,
21
- limiter : anyio .CapacityLimiter | None = None ,
22
- ) -> Callable [T_ParamSpec , Awaitable [T_Retval ]]:
14
+ if sys .version_info >= (3 , 9 ):
15
+ to_thread = asyncio .to_thread
16
+ else :
17
+ # backport of https://docs.python.org/3/library/asyncio-task.html#asyncio.to_thread
18
+ # for Python 3.8 support
19
+ async def to_thread (
20
+ func : Callable [T_ParamSpec , T_Retval ], / , * args : T_ParamSpec .args , ** kwargs : T_ParamSpec .kwargs
21
+ ) -> Any :
22
+ """Asynchronously run function *func* in a separate thread.
23
+
24
+ Any *args and **kwargs supplied for this function are directly passed
25
+ to *func*. Also, the current :class:`contextvars.Context` is propagated,
26
+ allowing context variables from the main thread to be accessed in the
27
+ separate thread.
28
+
29
+ Returns a coroutine that can be awaited to get the eventual result of *func*.
30
+ """
31
+ loop = asyncio .events .get_running_loop ()
32
+ ctx = contextvars .copy_context ()
33
+ func_call = functools .partial (ctx .run , func , * args , ** kwargs )
34
+ return await loop .run_in_executor (None , func_call )
35
+
36
+
37
+ # inspired by `asyncer`, https://github.com/tiangolo/asyncer
38
+ def asyncify (function : Callable [T_ParamSpec , T_Retval ]) -> Callable [T_ParamSpec , Awaitable [T_Retval ]]:
23
39
"""
24
40
Take a blocking function and create an async one that receives the same
25
- positional and keyword arguments, and that when called, calls the original function
26
- in a worker thread using `anyio.to_thread.run_sync()`. Internally,
27
- `asyncer.asyncify()` uses the same `anyio.to_thread.run_sync()`, but it supports
28
- keyword arguments additional to positional arguments and it adds better support for
29
- autocompletion and inline errors for the arguments of the function called and the
30
- return value.
31
-
32
- If the `cancellable` option is enabled and the task waiting for its completion is
33
- cancelled, the thread will still run its course but its return value (or any raised
34
- exception) will be ignored.
41
+ positional and keyword arguments. For python version 3.9 and above, it uses
42
+ asyncio.to_thread to run the function in a separate thread. For python version
43
+ 3.8, it uses locally defined copy of the asyncio.to_thread function which was
44
+ introduced in python 3.9.
35
45
36
- Use it like this :
46
+ Usage :
37
47
38
- ```Python
39
- def do_work (arg1, arg2, kwarg1="", kwarg2="") -> str :
40
- # Do work
41
- return "Some result"
48
+ ```python
49
+ def blocking_func (arg1, arg2, kwarg1=None) :
50
+ # blocking code
51
+ return result
42
52
43
53
44
- result = await to_thread.asyncify(do_work)("spam", "ham", kwarg1="a", kwarg2="b")
45
- print(result)
54
+ result = asyncify(blocking_function)(arg1, arg2, kwarg1=value1)
46
55
```
47
56
48
57
## Arguments
49
58
50
59
`function`: a blocking regular callable (e.g. a function)
51
- `cancellable`: `True` to allow cancellation of the operation
52
- `limiter`: capacity limiter to use to limit the total amount of threads running
53
- (if omitted, the default limiter is used)
54
60
55
61
## Return
56
62
@@ -60,22 +66,6 @@ def do_work(arg1, arg2, kwarg1="", kwarg2="") -> str:
60
66
"""
61
67
62
68
async def wrapper (* args : T_ParamSpec .args , ** kwargs : T_ParamSpec .kwargs ) -> T_Retval :
63
- partial_f = functools .partial (function , * args , ** kwargs )
64
-
65
- # In `v4.1.0` anyio added the `abandon_on_cancel` argument and deprecated the old
66
- # `cancellable` argument, so we need to use the new `abandon_on_cancel` to avoid
67
- # surfacing deprecation warnings.
68
- if function_has_argument (anyio .to_thread .run_sync , "abandon_on_cancel" ):
69
- return await anyio .to_thread .run_sync (
70
- partial_f ,
71
- abandon_on_cancel = cancellable ,
72
- limiter = limiter ,
73
- )
74
-
75
- return await anyio .to_thread .run_sync (
76
- partial_f ,
77
- cancellable = cancellable ,
78
- limiter = limiter ,
79
- )
69
+ return await to_thread (function , * args , ** kwargs )
80
70
81
71
return wrapper
0 commit comments