-
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathratelimiter.py
80 lines (63 loc) · 2.47 KB
/
ratelimiter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# Copyright 2020-present Michael Hall
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import asyncio
import time
from collections import deque
from threading import RLock
from . import _typings as t
__all__ = ("RateLimiter",)
class RateLimiter:
"""Asyncio-specific internal application ratelimiter.
This is an asyncio specific ratelimit implementation which does not
account for various networking effects / responses and
should only be used for internal limiting.
This is thread-safe and re-entrant.
Parameters
----------
ratelimit: int
The number of things to allow (see period)
period: float
The amount of time in seconds for which the ratelimit is allowed
(ratelimit per period seconds)
granularity: float
The amount of time in seconds to wake waiting tasks if the period has
expired.
"""
def __init_subclass__(cls) -> t.Never:
msg = "Don't subclass this"
raise RuntimeError(msg)
__final__ = True
def __init__(self, rate_limit: int, period: float, granularity: float) -> None:
self.rate_limit: int = rate_limit
self.period: float = period
self.granularity: float = granularity
self._monotonics: deque[float] = deque()
self._lock = RLock()
async def __aenter__(self) -> None:
with self._lock:
while len(self._monotonics) >= self.rate_limit:
try:
self._lock.release()
await asyncio.sleep(self.granularity, True)
finally:
self._lock.acquire()
now = time.monotonic()
with self._lock:
while self._monotonics and (now - self._monotonics[0] > self.period):
self._monotonics.popleft()
with self._lock:
self._monotonics.append(time.monotonic())
async def __aexit__(self, *_dont_care: object) -> None:
pass