fix: segmented reply not applied to send_message_to_user messages#9111
fix: segmented reply not applied to send_message_to_user messages#9111tangtaizong666 wants to merge 1 commit into
Conversation
There was a problem hiding this comment.
Code Review
This pull request refactors the segmented reply logic by extracting it into a shared utility module (segmented_reply.py) and integrates it into SendMessageToUserTool to ensure consistent message-splitting behavior across both pipeline and tool-sent messages. It also adds corresponding unit tests. The review feedback highlights several robustness issues in the newly introduced utility functions, specifically pointing out potential runtime crashes (such as TypeError, ZeroDivisionError, or negative sleep intervals in asyncio.sleep) if user configurations contain invalid types or out-of-bounds numeric values.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| def split_text_by_regex(text: str, regex: str) -> list[str]: | ||
| """使用正则表达式分段文本,正则非法时回退到默认分段正则。""" | ||
| try: | ||
| return re.findall(regex, text, re.DOTALL | re.MULTILINE) | ||
| except re.error: | ||
| logger.error( | ||
| f"分段回复正则表达式错误,使用默认分段方式: {traceback.format_exc()}", | ||
| ) | ||
| return re.findall(DEFAULT_SPLIT_REGEX, text, re.DOTALL | re.MULTILINE) |
There was a problem hiding this comment.
If regex is not a string (e.g., None from user configuration), re.findall will raise a TypeError which is not caught by except re.error:. This can crash the message sending process. We should validate that regex is a string and catch all exceptions to fallback safely.
| def split_text_by_regex(text: str, regex: str) -> list[str]: | |
| """使用正则表达式分段文本,正则非法时回退到默认分段正则。""" | |
| try: | |
| return re.findall(regex, text, re.DOTALL | re.MULTILINE) | |
| except re.error: | |
| logger.error( | |
| f"分段回复正则表达式错误,使用默认分段方式: {traceback.format_exc()}", | |
| ) | |
| return re.findall(DEFAULT_SPLIT_REGEX, text, re.DOTALL | re.MULTILINE) | |
| def split_text_by_regex(text: str, regex: str) -> list[str]: | |
| """使用正则表达式分段文本,正则非法时回退到默认分段正则。""" | |
| if not isinstance(regex, str): | |
| logger.error(f"分段回复正则表达式类型错误: {type(regex)},使用默认分段方式") | |
| regex = DEFAULT_SPLIT_REGEX | |
| try: | |
| return re.findall(regex, text, re.DOTALL | re.MULTILINE) | |
| except Exception: | |
| logger.error( | |
| f"分段回复正则表达式错误,使用默认分段方式: {traceback.format_exc()}", | |
| ) | |
| return re.findall(DEFAULT_SPLIT_REGEX, text, re.DOTALL | re.MULTILINE) |
| def calc_segment_interval( | ||
| text: str | None, | ||
| interval_method: str, | ||
| interval_range: tuple[float, float], | ||
| log_base: float, | ||
| ) -> float: | ||
| """计算一段消息发送前的间隔时间。text 为 None 表示非纯文本消息段。""" | ||
| if interval_method == "log": | ||
| if text is not None: | ||
| wc = count_words(text) | ||
| i = math.log(wc + 1, log_base) | ||
| return random.uniform(i, i + 0.5) | ||
| return random.uniform(1, 1.75) | ||
| # random | ||
| return random.uniform(interval_range[0], interval_range[1]) |
There was a problem hiding this comment.
Several potential issues exist here:
- If
log_baseis <= 1.0 (e.g., 1.0 or 0.5),math.logwill raise aZeroDivisionErroror return negative values, leading to negative sleep intervals which crashasyncio.sleepwith aValueError. - If
interval_rangecontains negative values,random.uniformcan return a negative value, also crashingasyncio.sleep.
We should ensure log_base is strictly greater than 1.0, clamp the calculated intervals to be non-negative, and wrap the log calculation in a try-except block.
def calc_segment_interval(
text: str | None,
interval_method: str,
interval_range: tuple[float, float],
log_base: float,
) -> float:
"""计算一段消息发送前的间隔时间。text 为 None 表示非纯文本消息段。"""
if interval_method == "log":
if text is not None:
wc = count_words(text)
safe_log_base = log_base if log_base > 1.0 else 2.6
try:
i = math.log(wc + 1, safe_log_base)
val = random.uniform(i, i + 0.5)
except Exception:
val = random.uniform(1, 1.75)
return max(0.0, val)
return random.uniform(1, 1.75)
# random
low = max(0.0, interval_range[0])
high = max(0.0, interval_range[1])
return random.uniform(low, high)| def compile_split_words_pattern(split_words: list[str]) -> re.Pattern | None: | ||
| """编译分段词列表对应的正则。split_words 为空时返回 None。""" | ||
| if not split_words: | ||
| return None | ||
| escaped_words = sorted( | ||
| [re.escape(word) for word in split_words], key=len, reverse=True | ||
| ) | ||
| return re.compile(f"(.*?({'|'.join(escaped_words)})|.+$)", re.DOTALL) |
There was a problem hiding this comment.
If split_words is not a list/tuple or contains non-string elements (e.g., due to user configuration), it can cause runtime errors during pattern compilation. Adding a type check and converting elements to strings ensures robustness.
| def compile_split_words_pattern(split_words: list[str]) -> re.Pattern | None: | |
| """编译分段词列表对应的正则。split_words 为空时返回 None。""" | |
| if not split_words: | |
| return None | |
| escaped_words = sorted( | |
| [re.escape(word) for word in split_words], key=len, reverse=True | |
| ) | |
| return re.compile(f"(.*?({'|'.join(escaped_words)})|.+$)", re.DOTALL) | |
| def compile_split_words_pattern(split_words: list[str]) -> re.Pattern | None: | |
| """编译分段词列表对应的正则。split_words 为空时返回 None。""" | |
| if not isinstance(split_words, (list, tuple)) or not split_words: | |
| return None | |
| escaped_words = sorted( | |
| [re.escape(str(word)) for word in split_words if word], key=len, reverse=True | |
| ) | |
| if not escaped_words: | |
| return None | |
| return re.compile(f"(.*?({'|'.join(escaped_words)})|.+$)", re.DOTALL) |
There was a problem hiding this comment.
Hey - I've found 2 issues
Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments
### Comment 1
<location path="astrbot/core/utils/segmented_reply.py" line_range="64-73" />
<code_context>
+ return result if result else [text]
+
+
+def split_text_by_regex(text: str, regex: str) -> list[str]:
+ """使用正则表达式分段文本,正则非法时回退到默认分段正则。"""
+ try:
+ return re.findall(regex, text, re.DOTALL | re.MULTILINE)
+ except re.error:
+ logger.error(
+ f"分段回复正则表达式错误,使用默认分段方式: {traceback.format_exc()}",
+ )
+ return re.findall(DEFAULT_SPLIT_REGEX, text, re.DOTALL | re.MULTILINE)
+
+
+def cleanup_segments(segments: list[str], content_cleanup_rule: str) -> list[str]:
+ """对分段结果应用内容过滤正则并去除空白段。"""
+ result = []
</code_context>
<issue_to_address>
**issue (bug_risk):** split_text_by_regex/cleanup_segments can break when user regex contains capture groups
When the user regex includes capture groups, `re.findall` returns tuples instead of strings. `cleanup_segments` will then pass these tuples to `re.sub`, causing a runtime error because `re.sub` expects a string. This was already a latent issue, but the new shared logic makes it more central. To harden this, normalize all `raw_segments` to strings before returning from `split_text_by_regex` (e.g., take `seg[0]` or `"".join(seg` for tuple results) so that `cleanup_segments` always receives a `list[str]`, or explicitly reject patterns with capture groups.
</issue_to_address>
### Comment 2
<location path="astrbot/core/tools/message_tools.py" line_range="202" />
<code_context>
raise FileNotFoundError(f"{component_type} path does not exist: {path}")
+ def _get_segmented_reply_conf(
+ self,
+ context: ContextWrapper[AstrAgentContext],
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting the platform support checks, chain-building logic, and async send loop into shared `segmented_reply` utilities so this tool remains thin and mirrors the pipeline’s behavior.
You can keep the new behavior but move most of the orchestration into `segmented_reply` utilities so this tool stays thin and aligned with the pipeline.
### 1. Centralize platform support logic
Right now `_get_segmented_reply_conf` knows too much about platform details. Move that logic into `segmented_reply.py`:
```python
# astrbot/core/utils/segmented_reply.py
SEGMENTED_REPLY_UNSUPPORTED_PLATFORMS = {...}
def is_segmented_reply_supported(
platform_id: str,
platform_manager: Any | None,
) -> bool:
if platform_id in SEGMENTED_REPLY_UNSUPPORTED_PLATFORMS:
return False
if not platform_manager:
return True
try:
for inst in platform_manager.platform_insts:
meta = inst.meta()
if (
meta.id == platform_id
and meta.name in SEGMENTED_REPLY_UNSUPPORTED_PLATFORMS
):
return False
except Exception:
# be conservative, don’t break sending
return True
return True
```
Then `_get_segmented_reply_conf` becomes flatter and only does config plumbing:
```python
# in the tool
from astrbot.core.utils.segmented_reply import is_segmented_reply_supported
def _get_segmented_reply_conf(
self,
context: ContextWrapper[AstrAgentContext],
target_session: MessageSession,
) -> dict | None:
try:
cfg = context.context.context.get_config(umo=str(target_session))
except Exception:
return None
if not isinstance(cfg, dict):
return None
seg_conf = cfg.get("platform_settings", {}).get("segmented_reply", {})
if not seg_conf.get("enable", False):
return None
platform_manager = getattr(context.context.context, "platform_manager", None)
if not is_segmented_reply_supported(target_session.platform_id, platform_manager):
return None
return seg_conf
```
### 2. Extract generic chain-building into a shared utility
`_build_segmented_chains` is generic (“components → chains”) and mostly mirrors pipeline behavior. Move it into `segmented_reply.py` and keep it pure:
```python
# astrbot/core/utils/segmented_reply.py
import astrbot.core.message.components as Comp
def build_segmented_chains(
components: list[Comp.BaseMessageComponent],
seg_conf: dict,
) -> list[list[Comp.BaseMessageComponent]] | None:
try:
words_count_threshold = int(seg_conf.get("words_count_threshold", 150))
except (TypeError, ValueError):
words_count_threshold = 150
split_mode = seg_conf.get("split_mode", "regex")
regex = seg_conf.get("regex", DEFAULT_SPLIT_REGEX)
split_words = seg_conf.get("split_words", DEFAULT_SPLIT_WORDS)
content_cleanup_rule = seg_conf.get("content_cleanup_rule", "")
split_words_pattern = (
compile_split_words_pattern(split_words) if split_mode == "words" else None
)
header_comps = [
comp for comp in components if isinstance(comp, (Comp.At, Comp.Reply))
]
body_comps = [
comp for comp in components if not isinstance(comp, (Comp.At, Comp.Reply))
]
if not body_comps:
return None
units: list[Comp.BaseMessageComponent] = []
for comp in body_comps:
if not isinstance(comp, Comp.Plain) or len(comp.text) > words_count_threshold:
units.append(comp)
continue
if split_mode == "words":
split_response = split_text_by_words(
comp.text,
split_words,
split_words_pattern,
)
else:
split_response = split_text_by_regex(comp.text, regex)
if not split_response:
units.append(comp)
continue
segments = cleanup_segments(split_response, content_cleanup_rule)
if segments:
units.extend(Comp.Plain(text=seg) for seg in segments)
else:
units.append(comp)
chains: list[list[Comp.BaseMessageComponent]] = []
pending_header = list(header_comps)
for unit in units:
if isinstance(unit, Comp.Record):
chains.append([unit])
else:
chains.append([*pending_header, unit])
pending_header = []
if pending_header:
chains.insert(0, pending_header)
return chains
```
Then the tool doesn’t need `_build_segmented_chains` at all:
```python
from astrbot.core.utils.segmented_reply import build_segmented_chains
seg_conf = self._get_segmented_reply_conf(context, target_session)
seg_chains = build_segmented_chains(components, seg_conf) if seg_conf else None
```
### 3. Generalize the async sending loop
The sleep/scheduling logic in `_send_segmented_chains` can also be a shared helper, since it already uses shared primitives (`parse_interval_range`, `calc_segment_interval`):
```python
# astrbot/core/utils/segmented_reply.py
from astrbot.core.message import MessageChain
async def send_segmented_chains(
context: Any,
target_session: Any,
chains: list[list[Comp.BaseMessageComponent]],
seg_conf: dict,
) -> None:
interval_method = seg_conf.get("interval_method", "random")
interval_range = parse_interval_range(str(seg_conf.get("interval", "1.5,3.5")))
try:
log_base = float(seg_conf.get("log_base", 2.6))
except (TypeError, ValueError):
log_base = 2.6
for chain in chains:
main_comp = chain[-1]
interval = calc_segment_interval(
main_comp.text if isinstance(main_comp, Comp.Plain) else None,
interval_method,
interval_range,
log_base,
)
await asyncio.sleep(interval)
await context.send_message(target_session, MessageChain(chain=chain))
```
And in the tool you only adapt the context type:
```python
from astrbot.core.utils.segmented_reply import (
build_segmented_chains,
send_segmented_chains,
)
seg_conf = self._get_segmented_reply_conf(context, target_session)
seg_chains = build_segmented_chains(components, seg_conf) if seg_conf else None
if seg_chains:
await send_segmented_chains(
context.context.context, # existing ContextImpl
target_session,
seg_chains,
seg_conf,
)
else:
await context.context.context.send_message(target_session, message_chain)
```
This keeps all current behavior (including header/body handling, `Record` special casing, and interval logic), but:
- removes three tightly-coupled private helpers from the tool,
- pushes orchestration and platform rules into `segmented_reply.py`,
- leaves the tool’s `call()` flow close to “get config → maybe segment → send”.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| def split_text_by_regex(text: str, regex: str) -> list[str]: | ||
| """使用正则表达式分段文本,正则非法时回退到默认分段正则。""" | ||
| try: | ||
| return re.findall(regex, text, re.DOTALL | re.MULTILINE) | ||
| except re.error: | ||
| logger.error( | ||
| f"分段回复正则表达式错误,使用默认分段方式: {traceback.format_exc()}", | ||
| ) | ||
| return re.findall(DEFAULT_SPLIT_REGEX, text, re.DOTALL | re.MULTILINE) | ||
|
|
There was a problem hiding this comment.
issue (bug_risk): split_text_by_regex/cleanup_segments can break when user regex contains capture groups
When the user regex includes capture groups, re.findall returns tuples instead of strings. cleanup_segments will then pass these tuples to re.sub, causing a runtime error because re.sub expects a string. This was already a latent issue, but the new shared logic makes it more central. To harden this, normalize all raw_segments to strings before returning from split_text_by_regex (e.g., take seg[0] or "".join(seg for tuple results) so that cleanup_segments always receives a list[str], or explicitly reject patterns with capture groups.
|
|
||
| raise FileNotFoundError(f"{component_type} path does not exist: {path}") | ||
|
|
||
| def _get_segmented_reply_conf( |
There was a problem hiding this comment.
issue (complexity): Consider extracting the platform support checks, chain-building logic, and async send loop into shared segmented_reply utilities so this tool remains thin and mirrors the pipeline’s behavior.
You can keep the new behavior but move most of the orchestration into segmented_reply utilities so this tool stays thin and aligned with the pipeline.
1. Centralize platform support logic
Right now _get_segmented_reply_conf knows too much about platform details. Move that logic into segmented_reply.py:
# astrbot/core/utils/segmented_reply.py
SEGMENTED_REPLY_UNSUPPORTED_PLATFORMS = {...}
def is_segmented_reply_supported(
platform_id: str,
platform_manager: Any | None,
) -> bool:
if platform_id in SEGMENTED_REPLY_UNSUPPORTED_PLATFORMS:
return False
if not platform_manager:
return True
try:
for inst in platform_manager.platform_insts:
meta = inst.meta()
if (
meta.id == platform_id
and meta.name in SEGMENTED_REPLY_UNSUPPORTED_PLATFORMS
):
return False
except Exception:
# be conservative, don’t break sending
return True
return TrueThen _get_segmented_reply_conf becomes flatter and only does config plumbing:
# in the tool
from astrbot.core.utils.segmented_reply import is_segmented_reply_supported
def _get_segmented_reply_conf(
self,
context: ContextWrapper[AstrAgentContext],
target_session: MessageSession,
) -> dict | None:
try:
cfg = context.context.context.get_config(umo=str(target_session))
except Exception:
return None
if not isinstance(cfg, dict):
return None
seg_conf = cfg.get("platform_settings", {}).get("segmented_reply", {})
if not seg_conf.get("enable", False):
return None
platform_manager = getattr(context.context.context, "platform_manager", None)
if not is_segmented_reply_supported(target_session.platform_id, platform_manager):
return None
return seg_conf2. Extract generic chain-building into a shared utility
_build_segmented_chains is generic (“components → chains”) and mostly mirrors pipeline behavior. Move it into segmented_reply.py and keep it pure:
# astrbot/core/utils/segmented_reply.py
import astrbot.core.message.components as Comp
def build_segmented_chains(
components: list[Comp.BaseMessageComponent],
seg_conf: dict,
) -> list[list[Comp.BaseMessageComponent]] | None:
try:
words_count_threshold = int(seg_conf.get("words_count_threshold", 150))
except (TypeError, ValueError):
words_count_threshold = 150
split_mode = seg_conf.get("split_mode", "regex")
regex = seg_conf.get("regex", DEFAULT_SPLIT_REGEX)
split_words = seg_conf.get("split_words", DEFAULT_SPLIT_WORDS)
content_cleanup_rule = seg_conf.get("content_cleanup_rule", "")
split_words_pattern = (
compile_split_words_pattern(split_words) if split_mode == "words" else None
)
header_comps = [
comp for comp in components if isinstance(comp, (Comp.At, Comp.Reply))
]
body_comps = [
comp for comp in components if not isinstance(comp, (Comp.At, Comp.Reply))
]
if not body_comps:
return None
units: list[Comp.BaseMessageComponent] = []
for comp in body_comps:
if not isinstance(comp, Comp.Plain) or len(comp.text) > words_count_threshold:
units.append(comp)
continue
if split_mode == "words":
split_response = split_text_by_words(
comp.text,
split_words,
split_words_pattern,
)
else:
split_response = split_text_by_regex(comp.text, regex)
if not split_response:
units.append(comp)
continue
segments = cleanup_segments(split_response, content_cleanup_rule)
if segments:
units.extend(Comp.Plain(text=seg) for seg in segments)
else:
units.append(comp)
chains: list[list[Comp.BaseMessageComponent]] = []
pending_header = list(header_comps)
for unit in units:
if isinstance(unit, Comp.Record):
chains.append([unit])
else:
chains.append([*pending_header, unit])
pending_header = []
if pending_header:
chains.insert(0, pending_header)
return chainsThen the tool doesn’t need _build_segmented_chains at all:
from astrbot.core.utils.segmented_reply import build_segmented_chains
seg_conf = self._get_segmented_reply_conf(context, target_session)
seg_chains = build_segmented_chains(components, seg_conf) if seg_conf else None3. Generalize the async sending loop
The sleep/scheduling logic in _send_segmented_chains can also be a shared helper, since it already uses shared primitives (parse_interval_range, calc_segment_interval):
# astrbot/core/utils/segmented_reply.py
from astrbot.core.message import MessageChain
async def send_segmented_chains(
context: Any,
target_session: Any,
chains: list[list[Comp.BaseMessageComponent]],
seg_conf: dict,
) -> None:
interval_method = seg_conf.get("interval_method", "random")
interval_range = parse_interval_range(str(seg_conf.get("interval", "1.5,3.5")))
try:
log_base = float(seg_conf.get("log_base", 2.6))
except (TypeError, ValueError):
log_base = 2.6
for chain in chains:
main_comp = chain[-1]
interval = calc_segment_interval(
main_comp.text if isinstance(main_comp, Comp.Plain) else None,
interval_method,
interval_range,
log_base,
)
await asyncio.sleep(interval)
await context.send_message(target_session, MessageChain(chain=chain))And in the tool you only adapt the context type:
from astrbot.core.utils.segmented_reply import (
build_segmented_chains,
send_segmented_chains,
)
seg_conf = self._get_segmented_reply_conf(context, target_session)
seg_chains = build_segmented_chains(components, seg_conf) if seg_conf else None
if seg_chains:
await send_segmented_chains(
context.context.context, # existing ContextImpl
target_session,
seg_chains,
seg_conf,
)
else:
await context.context.context.send_message(target_session, message_chain)This keeps all current behavior (including header/body handling, Record special casing, and interval logic), but:
- removes three tightly-coupled private helpers from the tool,
- pushes orchestration and platform rules into
segmented_reply.py, - leaves the tool’s
call()flow close to “get config → maybe segment → send”.
Fixes #8325.
When the LLM mentions a group member, it must go through the
send_message_to_usertool (amention_usercomponent cannot be produced by the plain-text reply path). The tool delivers messages viacontext.send_message()→platform.send_by_session()directly, which bypasses bothResultDecorateStage(text splitting / content cleanup) andRespondStage(interval sending). As a result, any message containing a mention is sent as one whole message and the user's segmented reply (分段回复) settings are silently ignored, while normal replies are segmented correctly — exactly the inconsistency reported in #8325.Modifications / 改动点
astrbot/core/utils/segmented_reply.py(new): shared segmented-reply helpers (split-words pattern compilation, words/regex splitting, content cleanup, interval parsing/calculation), extracted verbatim from the two pipeline stages so both paths behave identically.astrbot/core/pipeline/result_decorate/stage.pyandastrbot/core/pipeline/respond/stage.py: refactored to use the shared helpers (behavior-preserving; the hardcoded unsupported-platform lists are also unified into one constant).astrbot/core/tools/message_tools.py:SendMessageToUserToolnow applies the target session's segmented reply config before sending:Plaintext is split with the samesplit_mode/regex/split_words/content_cleanup_rule/words_count_thresholdsemantics as the pipeline;At/Replycomponents ride on the first segment (matchingRespondStage),Recordis still sent separately;random/log), matching pipeline pacing;qq_official_webhook,weixin_official_account,dingtalk) keep the single-send behavior;_send_message_to_user_current_session_plain_texts) still records the original full text, so the existing duplicate-reply suppression inRespondStageis unaffected.only_llm_resultis intentionally not checked here: tool messages are always LLM-initiated, so they qualify as LLM results.tests/unit/test_message_tools.py: 5 new regression tests covering the issue scenario (mention + custom split word), disabled config, words-count threshold, regex mode + cleanup rule, and unsupported platforms.This is NOT a breaking change. / 这不是一个破坏性变更。
Screenshots or Test Results / 运行截图或测试结果
New regression test reproducing the exact scenario from #8325 (mention + custom split word
|) fails on master and passes with this fix:Checklist / 检查清单
😊 If there are new features added in the PR, I have discussed it with the authors through issues/emails, etc.
/ 如果 PR 中有新加入的功能,已经通过 Issue / 邮件等方式和作者讨论过。
👀 My changes have been well-tested, and "Verification Steps" and "Screenshots" have been provided above.
/ 我的更改经过了良好的测试,并已在上方提供了“验证步骤”和“运行截图”。
🤓 I have ensured that no new dependencies are introduced, OR if new dependencies are introduced, they have been added to the appropriate locations in
requirements.txtandpyproject.toml./ 我确保没有引入新依赖库,或者引入了新依赖库的同时将其添加到
requirements.txt和pyproject.toml文件相应位置。😮 My changes do not introduce malicious code.
/ 我的更改没有引入恶意代码。
Summary by Sourcery
Apply segmented reply behavior to tool-sent messages and centralize segmented-reply logic for consistent behavior across pipelines and direct sends.
Bug Fixes:
Enhancements:
Tests: