Skip to content

Commit 79d4922

Browse files
✨ 适配ceobecanteen平台
Co-authored-by: phidiaLam <[email protected]>
1 parent 2a33d51 commit 79d4922

31 files changed

+2198
-68
lines changed

nonebot_bison/admin_page/token_manager.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
import random
22
import string
3+
from datetime import timedelta
34

4-
from expiringdict import ExpiringDict
5+
from expiringdictx import ExpiringDict
56

67

78
class TokenManager:
89
def __init__(self):
9-
self.token_manager = ExpiringDict(max_len=100, max_age_seconds=60 * 10)
10+
self.token_manager = ExpiringDict[str, tuple](capacity=100, default_age=timedelta(minutes=10))
1011

1112
def get_user(self, token: str) -> tuple | None:
1213
res = self.token_manager.get(token)

nonebot_bison/platform/arknights.py

+3-5
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from pydantic import Field, BaseModel
77

88
from ..post import Post
9+
from ..utils import text_fletten
910
from ..types import Target, RawPost, Category
1011
from .platform import NewMessage, StatusChange
1112
from ..utils.scheduler_config import SchedulerConfig
@@ -93,17 +94,14 @@ async def parse(self, raw_post: BulletinListItem) -> Post:
9394
)
9495
data = ArkBulletinResponse.parse_obj(raw_data.json()).data
9596

96-
def title_escape(text: str) -> str:
97-
return text.replace("\n", " - ")
98-
9997
# gen title, content
10098
if data.header:
10199
# header是title的更详细版本
102100
# header会和content一起出现
103-
title = data.header
101+
title = text_fletten(data.header, replace=" - ")
104102
else:
105103
# 只有一张图片
106-
title = title_escape(data.title)
104+
title = text_fletten(data.title, replace=" - ")
107105

108106
return Post(
109107
self,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
from .platform import CeobeCanteen as CeobeCanteen
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
from functools import partial
2+
from datetime import timedelta
3+
from types import MappingProxyType
4+
from collections.abc import Callable
5+
6+
from expiringdictx import ExpiringDict
7+
from httpx import AsyncClient, AsyncHTTPTransport
8+
from hishel import Controller, AsyncCacheTransport, AsyncInMemoryStorage
9+
10+
from .const import DATASOURCE_URL
11+
from .utils import process_response
12+
from .models import CeobeSource, CeobeTarget, DataSourceResponse
13+
14+
cache_transport = AsyncCacheTransport(
15+
AsyncHTTPTransport(),
16+
storage=AsyncInMemoryStorage(),
17+
controller=Controller(
18+
always_revalidate=True,
19+
),
20+
)
21+
22+
CeobeClient = partial(
23+
AsyncClient,
24+
transport=cache_transport,
25+
)
26+
27+
28+
class CeobeDataSourceCache:
29+
"""数据源缓存"""
30+
31+
def __init__(self):
32+
self._cache = ExpiringDict[str, CeobeTarget](capacity=100, default_age=timedelta(days=7))
33+
self.client = CeobeClient()
34+
self.url = DATASOURCE_URL
35+
self.init_requested = False
36+
37+
@property
38+
def cache(self) -> MappingProxyType[str, CeobeTarget]:
39+
return MappingProxyType(self._cache)
40+
41+
async def refresh_data_sources(self):
42+
"""请求数据源API刷新缓存"""
43+
data_sources_resp = await self.client.get(self.url)
44+
data_sources = process_response(data_sources_resp, DataSourceResponse).data
45+
for ds in data_sources:
46+
self._cache[ds.unique_id] = ds
47+
return self.cache
48+
49+
async def get_all(self):
50+
if not self.init_requested:
51+
await self.refresh_data_sources()
52+
self.init_requested = True
53+
return self.cache
54+
55+
def select_one(self, cond_func: Callable[[CeobeTarget], bool]) -> CeobeTarget | None:
56+
"""根据条件获取数据源
57+
58+
不会刷新缓存
59+
"""
60+
cache = self._cache.values()
61+
return next(filter(cond_func, cache), None)
62+
63+
async def get_by_unique_id(self, unique_id: str) -> CeobeTarget | None:
64+
"""根据unique_id获取数据源
65+
66+
如果在缓存中找不到,会刷新缓存
67+
"""
68+
if target := self._cache.get(unique_id):
69+
return target
70+
await self.refresh_data_sources()
71+
return self._cache.get(unique_id)
72+
73+
async def get_by_nickname(self, nickname: str) -> CeobeTarget | None:
74+
"""根据nickname获取数据源
75+
76+
如果在缓存中找不到,会刷新缓存
77+
"""
78+
79+
def cond_func(target: CeobeTarget):
80+
return target.nickname == nickname
81+
82+
if target := self.select_one(cond_func):
83+
return target
84+
await self.refresh_data_sources()
85+
return self.select_one(cond_func)
86+
87+
async def get_by_source(self, source: CeobeSource) -> CeobeTarget | None:
88+
"""根据source获取数据源
89+
90+
如果在缓存中找不到,会刷新缓存
91+
"""
92+
93+
def cond_func(target: CeobeTarget):
94+
return target.db_unique_key == source.data and target.datasource == source.type
95+
96+
if target := self.select_one(cond_func):
97+
return target
98+
await self.refresh_data_sources()
99+
return self.select_one(cond_func)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
DATASOURCE_URL = "https://server.ceobecanteen.top/api/v1/canteen/config/datasource/list"
2+
COMB_ID_URL = "https://server.ceobecanteen.top/api/v1/canteen/user/getDatasourceComb"
3+
COOKIE_ID_URL = "http://cdn.ceobecanteen.top/datasource-comb"
4+
COOKIES_URL = "https://server-cdn.ceobecanteen.top/api/v1/cdn/cookie/mainList/cookieList"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
class CeobeResponseError(Exception): ...
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
from typing import Literal, TypeVar
2+
3+
from pydantic import BaseModel
4+
5+
6+
class CeobeTarget(BaseModel):
7+
"""账户结构"""
8+
9+
avatar: str
10+
"""数据源头像"""
11+
datasource: str
12+
"""数据源类型"""
13+
db_unique_key: str
14+
"""数据源相关唯一id"""
15+
nickname: str
16+
"""数据源昵称"""
17+
platform: str
18+
"""平台代码"""
19+
unique_id: str
20+
"""数据源唯一标识(用于前后端交互标识)"""
21+
jump_url: str | None = None
22+
"""跳转url(null就是没办法跳转)"""
23+
24+
25+
class DataSourceResponse(BaseModel):
26+
code: int
27+
message: str
28+
data: list[CeobeTarget]
29+
30+
31+
class CeobeImage(BaseModel):
32+
origin_url: str
33+
"""原图"""
34+
compress_url: str | None = None
35+
"""压缩图,为null就是没有原图对应压缩图"""
36+
37+
38+
class CeobeDefaultCookie(BaseModel):
39+
text: str
40+
images: list[CeobeImage] | None
41+
42+
43+
class CeobeRetweeted(BaseModel):
44+
author_name: str
45+
author_avatar: str
46+
text: str
47+
images: list[CeobeImage] | None = None
48+
49+
50+
class CeobeItem(BaseModel):
51+
id: str
52+
"""单条id"""
53+
url: str
54+
"""跳转链接"""
55+
type: str | None = None
56+
"""类型"""
57+
is_long_text: bool | None = None
58+
"""是否长文"""
59+
is_retweeted: bool = False
60+
"""是否转发"""
61+
retweeted: CeobeRetweeted | None = None
62+
63+
class Config:
64+
extra = "allow"
65+
66+
67+
class CeobeSource(BaseModel):
68+
data: str
69+
"""数据源id"""
70+
type: str
71+
"""数据源类型"""
72+
73+
74+
class CeobeTimestamp(BaseModel):
75+
fetcher: int
76+
"""蹲饼时间,毫秒"""
77+
platform_precision: Literal["none", "day", "hour", "minute", "second", "ms"]
78+
"""平台时间精度"""
79+
platform: int | None = None
80+
"""平台时间戳,毫秒"""
81+
82+
83+
class CeobeCookie(BaseModel):
84+
datasource: str
85+
"""数据源名字"""
86+
icon: str
87+
"""数据源头像"""
88+
timestamp: CeobeTimestamp
89+
"""时间戳"""
90+
default_cookie: CeobeDefaultCookie
91+
"""原始饼"""
92+
item: CeobeItem
93+
"""数据源信息,有平台的特殊字段"""
94+
source: CeobeSource
95+
"""数据源"""
96+
97+
98+
class CeobeData(BaseModel):
99+
cookies: list[CeobeCookie]
100+
next_page_id: str | None = None
101+
102+
103+
class CookiesResponse(BaseModel):
104+
code: int
105+
message: str
106+
data: CeobeData
107+
108+
109+
class CombIdResponse(BaseModel):
110+
code: int
111+
message: str
112+
data: dict[Literal["datasource_comb_id"], str]
113+
114+
115+
class CookieIdResponse(BaseModel):
116+
cookie_id: str
117+
update_cookie_id: str
118+
119+
120+
ResponseModel = TypeVar("ResponseModel", bound=CookiesResponse | CombIdResponse | CookieIdResponse | DataSourceResponse)

0 commit comments

Comments
 (0)