✨ Add ScheduledTaskTool with global scheduler and frontend polling#3216
✨ Add ScheduledTaskTool with global scheduler and frontend polling#32162862282695gjh-afk wants to merge 9 commits into
Conversation
963cdbe to
66f0180
Compare
| except (ValueError, IndexError): | ||
| return None | ||
|
|
||
| @staticmethod |
There was a problem hiding this comment.
Cron 解析器只支持精确数字和 *,不支持 */15、1-5、1,3,5 等标准 cron 语法。用户尝试这些时会抛 ValueError。建议在 description 中说明支持的语法子集,或使用 croniter 库。
|
|
||
| if month_match and dom_match and hour_match and minute_match: | ||
| return candidate | ||
|
|
There was a problem hiding this comment.
性能问题:逐分钟遍历计算下次触发时间,最多 525,960 次循环。建议直接用 datetime 算术计算下一个匹配时间点。
| t = threading.Thread( | ||
| target=_run_scheduled_task_from_db, | ||
| args=(task_dict,), | ||
| daemon=True, |
There was a problem hiding this comment.
串行阻塞:t.join(timeout=300) 让调度器为每个任务阻塞最多 5 分钟。10 个任务同时到期时,最后一个要等 50 分钟。建议用 ThreadPoolExecutor 并行执行。
| query=user_content, | ||
| history=[], | ||
| tenant_id=tenant_id, | ||
| user_id=user_id, |
There was a problem hiding this comment.
asyncio.run() 和手动 new_event_loop() 混用会导致事件循环冲突和资源泄漏。建议统一在一个 async 函数中执行:
async def _execute():
agent_run_info = await create_agent_run_info(...)
async for chunk in agent_run(agent_run_info):
chunks.append(chunk)
asyncio.run(_execute())| user_id, tenant_id = get_current_user_id(authorization) | ||
| result = get_new_messages_service(conversation_id, user_id, since_index) | ||
| return JSONResponse(status_code=HTTPStatus.OK, content=result) | ||
| except Exception as e: |
There was a problem hiding this comment.
[代码规范] except Exception: 过于宽泛,建议捕获更具体的异常类型,避免掩盖潜在错误。
| if cid is not None: | ||
| results[str(cid)] = get_new_messages_service(cid, user_id, since) | ||
| return JSONResponse(status_code=HTTPStatus.OK, content={"results": results}) | ||
| except Exception as e: |
There was a problem hiding this comment.
[代码规范] except Exception: 过于宽泛,建议捕获更具体的异常类型,避免掩盖潜在错误。
|
|
||
| logger.info(f"Scheduled task {task_uuid} executed successfully") | ||
|
|
||
| except Exception as e: |
There was a problem hiding this comment.
[代码规范] except Exception: 过于宽泛,建议捕获更具体的异常类型,避免掩盖潜在错误。
| if t.class_name != "ScheduledTaskTool" | ||
| ] | ||
|
|
||
| # Run agent and collect response chunks |
There was a problem hiding this comment.
[逻辑漏洞] _run_scheduled_task_from_db 中先使用 asyncio.run() 创建了一个事件循环,随后又用 asyncio.new_event_loop() 创建了另一个事件循环。asyncio.run() 会自动创建并管理事件循环,在同一函数中混用两种方式可能导致事件循环冲突。建议统一使用 asyncio.new_event_loop() + loop.run_until_complete() 的方式,或统一使用 asyncio.run()。
| app.include_router(skill_creator_router) | ||
|
|
||
|
|
||
| @app.on_event("startup") |
There was a problem hiding this comment.
[代码规范] @app.on_event("startup") 和 @app.on_event("shutdown") 在 FastAPI 中已被标记为弃用(deprecated),建议使用 lifespan 上下文管理器替代。参考:https://fastapi.tiangolo.com/advanced/events/
| @router.post("/batch_new_messages", response_model=Dict[str, Any]) | ||
| async def batch_check_new_messages_endpoint(request: Dict[str, Any], authorization: Optional[str] = Header(None)): | ||
| """ | ||
| Batch check for new messages across multiple conversations. |
There was a problem hiding this comment.
[代码规范] batch_check_new_messages_endpoint 的请求体使用了 Dict[str, Any],缺少类型约束和输入验证。建议使用 Pydantic 模型定义请求结构(如 BatchMessageCheckRequest),确保 checks 列表中的每个元素都包含必要的字段和类型校验。
| self, | ||
| action: str, | ||
| task_name: Optional[str] = None, | ||
| task_prompt: Optional[str] = None, |
There was a problem hiding this comment.
ScheduledTaskTool 使用 asyncio.run() 在同步 forward 方法中执行异步任务,但如果 forward 已经在异步上下文中被调用(如通过 asyncio.to_thread),会抛出 RuntimeError。建议检查调用上下文,或使用 asyncio.get_event_loop().run_until_complete() 替代。
Add a scheduled task tool that enables agents to create, list, and cancel cron-based or one-shot scheduled tasks. The architecture uses a global scheduler singleton (independent of agent lifecycle), DB persistence, and frontend polling for real-time message delivery. Changes: - SDK: ScheduledTaskTool as thin CRUD wrapper with cron parser - Backend: global ScheduledTaskScheduler (10s DB poll), scheduled_task_db - Backend: new_messages polling API + batch endpoint - Backend: conversation_id passthrough for task-to-session binding - Backend: scheduler strips ScheduledTaskTool from triggered agent to prevent recursive task creation - Frontend: two-layer polling (5s active, 10s background conversations) - DB: scheduled_tasks_t table with multi-tenant fields No existing Nexent logic is modified — all changes are additive.
|
Good addition. The ScheduledTaskTool integration with cron support looks reasonable. Please ensure the cron expression validation is covered by tests before merging. |
66f0180 to
499442f
Compare
|
Thanks @WMC001! Added unit tests covering cron expression validation and next-fire-time computation in Coverage includes:
The tests load the tool module via |
|
Bug 1:
for check in request.checks[:50]:
results[str(check.conversation_id)] = get_new_messages_service( # <-- no try/except
check.conversation_id, user_id, check.since_index
)Fix: wrap the call in a try/except and record an error indicator per conversation instead of propagating the exception. |
|
Bug 2: cron
Note: Python's Fix: when both |
…batch endpoint errors Address WMC001 review: - _parse_cron now tracks day_of_month/day_of_week restriction flags - _compute_next_fire uses standard cron OR semantics: when both day fields are restricted, a day matches if it satisfies either field (previously used AND, which is non-standard) - batch_check_new_messages_endpoint isolates per-conversation failures so one error returns an error marker instead of failing the whole batch - add tests covering OR semantics and the restriction flags
|
Thanks @WMC001, both issues fixed in the latest commit: Bug 1 (batch error isolation) — Bug 2 (day-of-month / day-of-week OR semantics) — good catch, the previous AND logic was non-standard. Now Added 5 new tests covering this:
All 32 tests passing locally. |
Address several issues that blocked the scheduled-task feature from working in practice, found while driving the full flow: - task_type auto-inference (_resolve_task_type): passing cron_expression alone was silently treated as oneshot and demanded delay_seconds. Now inferred from whichever time field is present; schema rewritten to drop examples and state the mutual-exclusion clearly. - cancel accepts task_uuid OR task_name: callers often pass the task id via task_name; now it cancels by uuid first, then falls back to a name lookup, so the first attempt succeeds. - assistant message rendering: _save_assistant_chunks parses/merges agent_run chunks (drops metadata, folds same-type units) instead of dumping raw JSON strings; also fixed a NameError (MESSAGE_ROLE import). - trigger message wording: split the display text (short 'task fired' line) from the agent query (full instruction), so internal prompts are never shown in the chat bubble. - frontend polling races: skip refresh while a conversation is actively streaming, and reuse the standard message extractors in refresh so tool-call steps survive (previously wiped mid-stream or on finish). - tool descriptions simplified: pure contract, no scenario examples, so complex tasks (e.g. scheduled briefings) aren't biased to 'reminder'. Add unit tests for _resolve_task_type and _handle_cancel (44 total).
|
Pushed a follow-up commit ( 1. 2. 3. Assistant message rendering ( 4. Trigger message split into display vs. query 5. Frontend polling races Tests: added coverage for |
…escription - cancel_task now cancels tasks in 'pending' OR 'fired' state, so a task can be cancelled even while it is executing mid-run. - Add reschedule_if_active: cron tasks are re-armed only while still 'fired', so a task cancelled mid-run is never resurrected on its next cycle. - _run_and_reschedule uses the atomic reschedule guard instead of unconditionally resetting to 'pending'. - Trim the tool description to a concise, codebase-consistent length and make the cancel flow explicit (list -> cancel by uuid -> verify). - Add unit tests for the cancel_task and reschedule_if_active DB functions. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
Heads-up for reviewers on the cancel reliability of this tool, based on end-to-end testing: This feature's reliability depends on the backing LLM's tool-use discipline. The Mitigations already in this PR (code-side, model-independent):
In testing, switching the agent from If a deployment uses a weaker model and needs a hard guarantee, the next step would be a verification hook in the agent run: intercept a |
Address review feedback (YehongPan) on overly broad except clauses: - Polling endpoints (check_new_messages / batch_check_new_messages): catch SQLAlchemyError instead of Exception, so non-DB errors are no longer masked and can surface to FastAPI's default handler. - Scheduler: the inner best-effort status updates now catch SQLAlchemyError. The daemon-loop and per-task handlers stay broad on purpose (a worker must survive any single failure), but each is now commented to state that intent and relies on exc_info so the real cause is logged, not hidden. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…task-tool # Conflicts: # backend/agents/create_agent_info.py
…task-tool # Conflicts: # backend/agents/create_agent_info.py
…task-tool # Conflicts: # docker/init.sql
Summary
Add a ScheduledTaskTool that enables Nexent agents to create, list, and cancel cron-based or one-shot scheduled tasks.
Architecture
Why not just a tool module?
Nexent creates a new agent instance per message — tools, scheduler threads, and all state are GCd after each run. A scheduled task tool needs to survive beyond a single agent run, which requires:
Key Design Decisions
message_idxfrom DBmax()save_messagescomputes idx fromhistory=[]→ always 0, causing collisionsget_new_messages_servicevalidatesconversation.created_by == user_idMulti-Tenant Compatibility
ScheduledTaskRecordincludestenant_idanduser_idfieldsquery_tasks_by_agentandcancel_taskfilter bytenant_idtenant_id/user_idfor agent runsFiles Changed (15 files, +877 lines, all additive)
New files (3):
backend/database/scheduled_task_db.py— 5 CRUD functionsbackend/services/scheduled_task_scheduler.py— global scheduler singletonsdk/nexent/core/tools/scheduled_task_tool.py— tool with cron parserModified files (12, no existing logic changed):
backend/database/db_models.py—ScheduledTaskRecordmodelbackend/database/conversation_db.py—get_max_message_index()backend/services/conversation_management_service.py—get_new_messages_service()backend/apps/conversation_management_app.py— polling endpointsbackend/apps/runtime_app.py— scheduler lifecyclebackend/agents/create_agent_info.py—conversation_idpassthroughsdk/nexent/core/agents/nexent_agent.py— ScheduledTaskTool metadata injectionsdk/nexent/core/tools/__init__.py— export ScheduledTaskTooldocker/init.sql—scheduled_tasks_ttablefrontend/services/api.ts— new endpointsfrontend/services/conversationService.ts— polling methodsfrontend/app/[locale]/chat/internal/chatInterface.tsx— polling useEffectTesting
Tested with Docker (v2.2.0 image + bind mounts):
ScheduledTaskTool not founderrors ✅