-
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathscheduler.py
176 lines (138 loc) · 5.31 KB
/
scheduler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# Copyright 2020-present Michael Hall
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import asyncio
from functools import total_ordering
from time import time
from . import _typings as t
__all__ = ("CancellationToken", "Scheduler")
MISSING: t.Any = object()
class CancellationToken:
"""An object to use for cancelation of a task.
Not meant for public construction.
"""
__slots__ = ()
@total_ordering
class _Task[T]:
__slots__ = ("cancel_token", "canceled", "payload", "timestamp")
def __init__(self, timestamp: float, payload: T, /) -> None:
self.timestamp: float = timestamp
self.payload: T = payload
self.canceled: bool = False
self.cancel_token: CancellationToken = CancellationToken()
def __lt__(self, other: _Task[T]) -> bool:
return (self.timestamp, id(self)) < (other.timestamp, id(self))
class Scheduler[T]:
"""A scheduler.
The scheduler is implemented as an async context manager that it an
async iterator.
Payloads can be scheduled to the context manager, and will be yielded
by the iterator when the time for them has come.
Parameters
----------
granularity: float
The number of seconds to compare schedule events at.
If this is set lower than the precision of time.monotonic
on the host system, this is effectively the same as setting
it to time.monotonic's precision.
"""
def __init_subclass__(cls) -> t.Never:
msg = "Don't subclass this"
raise RuntimeError(msg)
__final__ = True
__tasks: dict[CancellationToken, _Task[T]]
__tqueue: asyncio.PriorityQueue[_Task[T]]
__closed: bool
__l: asyncio.Lock
__granularity: float
__slots__ = ("__closed", "__granularity", "__l", "__tasks", "__tqueue")
def __init__(self, granularity: float, /) -> None:
self.__granularity = granularity
self.__closed = MISSING
self.__tasks = MISSING
self.__tqueue = MISSING
self.__l = MISSING
async def __aenter__(self) -> t.Self:
self.__closed = False
asyncio.get_running_loop()
# lock is only needeed on modifying or removing tasks
# insertion is not guarded and only racy in the order of emitted events
# when inserting a task that is scheduled in the past
# or within 1 full iteration of pending tasks on the event loop
# (generally, ms range (subsecond), depending on application)
self.__l = asyncio.Lock()
self.__tasks = {}
self.__tqueue = asyncio.PriorityQueue()
return self
async def __aexit__(self, *_dont_care: object) -> None:
self.__closed = True
def __aiter__(self) -> t.Self:
return self
async def __anext__(self) -> T:
while await asyncio.sleep(self.__granularity, True):
if self.__closed:
raise StopAsyncIteration
t = await self.__tqueue.get()
try:
if t.canceled:
continue
now = time()
delta = t.timestamp - now
if delta < self.__granularity:
async with self.__l:
self.__tasks.pop(t.cancel_token, None)
return t.payload
else:
await self.__tqueue.put(t)
finally:
self.__tqueue.task_done()
raise StopAsyncIteration
async def create_task(self, timestamp: float, payload: T, /) -> CancellationToken:
"""Create a task.
Parameters
----------
timestamp: float
The utc timestamp for when a payload should be emitted.
payload:
The payload to emit
Returns
-------
CancellationToken:
An opaque object that can be used to cancel a task.
You should not rely on details of this class's type.
"""
t = _Task(timestamp, payload)
self.__tasks[t.cancel_token] = t
await self.__tqueue.put(t)
return t.cancel_token
async def cancel_task(self, cancel_token: CancellationToken, /) -> None:
"""Cancel a task.
Canceling an already canceled task is not an error
Parameters
----------
cancel_token: CancellationToken
The object which the scheduler gave you upon scheduling the task.
"""
async with self.__l:
try:
task = self.__tasks[cancel_token]
task.canceled = True
except KeyError:
pass
def close(self) -> None:
"""Close the scheduler without waiting."""
self.__closed = True
async def join(self) -> None:
"""Wait for the scheduler's internal queue to be empty."""
await self.__tqueue.join()