1
1
from functools import partial
2
+ from datetime import timedelta
2
3
from types import MappingProxyType
3
4
from collections .abc import Callable
4
- from typing import Any , TypeVar , cast
5
- from datetime import datetime , timedelta
6
5
7
- from expiringdict import ExpiringDict
6
+ from expiringdictx import ExpiringDict
8
7
from hishel import Controller , AsyncCacheClient , AsyncFileStorage
9
8
10
9
from .const import DATASOURCE_URL
18
17
storage = AsyncFileStorage (ttl = 300 ),
19
18
)
20
19
21
- StoreType = TypeVar ("StoreType" )
22
-
23
-
24
- class SimpleCache :
25
- """简单缓存"""
26
-
27
- def __init__ (self , default_lifetime : timedelta = timedelta (minutes = 5 )):
28
- self .default_lifetime = default_lifetime
29
- self ._cache : dict [str , tuple [Any , datetime , timedelta ]] = {}
30
-
31
- def __setitem__ (self , key : str , value : tuple [Any , timedelta | None ]):
32
- value , lifetime = value
33
- self ._cache [key ] = (value , datetime .now (), lifetime or self .default_lifetime )
34
-
35
- def __getitem__ (self , key : str ):
36
- if item := self ._cache .get (key ):
37
- value , create_time , lifetime = item
38
- if datetime .now () - create_time <= lifetime :
39
- return value
40
- else :
41
- del self ._cache [key ]
42
- return None
43
-
44
- def set (self , key : str , value : Any , lifetime : timedelta | None = None ):
45
- self [key ] = (value , lifetime )
46
-
47
- def get (
48
- self , key : str , default : Any = None , value_convert_func : Callable [[Any ], StoreType | None ] | None = None
49
- ) -> StoreType | None :
50
- if value := self [key ]:
51
- if value_convert_func :
52
- return value_convert_func (value )
53
- return value
54
- return default
55
-
56
20
57
21
class CeobeDataSourceCache :
58
22
"""数据源缓存"""
59
23
60
24
def __init__ (self ):
61
- self ._cache = ExpiringDict ( max_len = 100 , max_age_seconds = 60 * 60 * 24 * 7 )
25
+ self ._cache = ExpiringDict [ str , CeobeTarget ]( capacity = 100 , default_age = timedelta ( days = 7 ) )
62
26
self .client = CeobeClient ()
63
27
self .url = DATASOURCE_URL
64
28
self .init_requested = False
@@ -86,7 +50,7 @@ def select_one(self, cond_func: Callable[[CeobeTarget], bool]) -> CeobeTarget |
86
50
87
51
不会刷新缓存
88
52
"""
89
- cache = cast ( list [ CeobeTarget ], self ._cache .values () )
53
+ cache = self ._cache .values ()
90
54
return next (filter (cond_func , cache ), None )
91
55
92
56
async def get_by_unique_id (self , unique_id : str ) -> CeobeTarget | None :
@@ -95,9 +59,9 @@ async def get_by_unique_id(self, unique_id: str) -> CeobeTarget | None:
95
59
如果在缓存中找不到,会刷新缓存
96
60
"""
97
61
if target := self ._cache .get (unique_id ):
98
- return cast ( CeobeTarget , target )
62
+ return target
99
63
await self .refresh_data_sources ()
100
- return cast ( CeobeTarget | None , self ._cache .get (unique_id ) )
64
+ return self ._cache .get (unique_id )
101
65
102
66
async def get_by_nickname (self , nickname : str ) -> CeobeTarget | None :
103
67
"""根据nickname获取数据源
0 commit comments