Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 81 additions & 9 deletions apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1347,7 +1347,7 @@ def handle_tool_call(self, tool_name: str, args: dict[str, Any], **_kwargs: Any)
}
if bool(args.get("sessionScope", False)):
params["sessionId"] = self._session_id
resp = self._bridge.request(
resp = self._bridge_request_with_retry(
"memory.search",
params,
timeout=_LONG_RPC_TIMEOUT,
Expand All @@ -1366,7 +1366,7 @@ def handle_tool_call(self, tool_name: str, args: dict[str, Any], **_kwargs: Any)
method = methods.get(kind)
if method is None:
return json.dumps({"error": f"unknown memory kind: {kind}"})
item = self._bridge.request(
item = self._bridge_request_with_retry(
method, {"id": item_id, "namespace": self._runtime_namespace()}
)
if not item:
Expand Down Expand Up @@ -1411,7 +1411,7 @@ def handle_tool_call(self, tool_name: str, args: dict[str, Any], **_kwargs: Any)
}
)
if tool_name == "memos_timeline":
resp = self._bridge.request(
resp = self._bridge_request_with_retry(
"memory.timeline",
{
"episodeId": args.get("episodeId", self._episode_id),
Expand All @@ -1426,12 +1426,12 @@ def handle_tool_call(self, tool_name: str, args: dict[str, Any], **_kwargs: Any)
params = {"limit": limit, "namespace": self._runtime_namespace()}
if args.get("status"):
params["status"] = args["status"]
return json.dumps(self._bridge.request("skill.list", params))
return json.dumps(self._bridge_request_with_retry("skill.list", params))
if tool_name == "memos_environment":
query = (args.get("query") or "").strip()
limit = self._int_arg(args, "limit", 5, 1, 30)
if not query:
resp = self._bridge.request(
resp = self._bridge_request_with_retry(
"memory.list_world_models",
{"limit": limit, "offset": 0, "namespace": self._runtime_namespace()},
)
Expand All @@ -1447,7 +1447,7 @@ def handle_tool_call(self, tool_name: str, args: dict[str, Any], **_kwargs: Any)
"queried": False,
}
)
resp = self._bridge.request(
resp = self._bridge_request_with_retry(
"memory.search",
{
"agent": "hermes",
Expand Down Expand Up @@ -1481,7 +1481,7 @@ def handle_tool_call(self, tool_name: str, args: dict[str, Any], **_kwargs: Any)
skill_id = (args.get("id") or "").strip()
if not skill_id:
return json.dumps({"error": "missing id"})
skill = self._bridge.request(
skill = self._bridge_request_with_retry(
"skill.get",
{
"id": skill_id,
Expand Down Expand Up @@ -1800,12 +1800,81 @@ def _open_session(self, session_id: str = "", *, timeout: float = 30.0) -> None:
)
self._session_id = resp.get("sessionId") or requested_session

def _bridge_request_with_retry(
self,
method: str,
params: Any,
*,
timeout: float | None = None,
) -> dict[str, Any]:
"""Read-path helper: reconnect + retry once on ``transport_closed``.

Layer 4 (#2028): the read-path memory tools previously issued a
single ``self._bridge.request(...)`` and surfaced any error
verbatim to the model. When the Node bridge has died since the
last user turn, that first call now raises
``transport_closed`` fast (thanks to Layer 1/2). This helper
mirrors the pattern ``sync_turn`` already uses: reconnect the
bridge once and re-issue the same request. A second failure is
left to propagate — the ``except`` block in
``handle_tool_call`` will surface the error text verbatim.
"""
assert self._bridge is not None
try:
if timeout is None:
return self._bridge.request(method, params)
return self._bridge.request(method, params, timeout=timeout)
except BridgeError as err:
if not self._is_transport_closed(err):
raise
logger.info(
"MemOS: bridge transport closed on %s; reconnecting and retrying once — %s",
method,
err,
)
self._reconnect_bridge(self._session_id, timeout=30.0)
assert self._bridge is not None
if timeout is None:
return self._bridge.request(method, params)
return self._bridge.request(method, params, timeout=timeout)

def _is_transport_closed(self, err: Exception) -> bool:
if isinstance(err, BridgeError) and err.code == "transport_closed":
return True
msg = str(err).lower()
return "broken pipe" in msg or "bridge closed" in msg or "transport_closed" in msg

def _should_reconnect_after_keepalive_failure(self, err: Exception) -> bool:
"""Decide whether a keepalive failure warrants a bridge reconnect.

Layer 3 (#2028): the keepalive previously reconnected only on
``BridgeError("transport_closed", …)``. A hung Node bridge
surfaces instead as ``BridgeError("timeout", …)`` (the client
gave up waiting for a response); that error was dropped at
DEBUG and the stale client kept being reused, so every
subsequent memory tool timed out for another 30 s. Reconnect
also when the subprocess has already exited (belt-and-braces
for hangs that didn't raise a transport error).

A live subprocess raising a generic (non-transport) error must
NOT trigger a reconnect — otherwise transient parse noise
would create a reconnect storm.
"""
if self._is_transport_closed(err):
return True
if isinstance(err, BridgeError) and err.code == "timeout":
return True
# Ask the underlying subprocess: is it still alive?
bridge = self._bridge
if bridge is not None:
try:
exit_code = bridge._proc.poll() # type: ignore[attr-defined]
except Exception:
exit_code = None
if exit_code is not None:
return True
return False

def _reconnect_bridge(self, session_id: str = "", *, timeout: float = 30.0) -> None:
# Don't reconnect if we're shutting down
if self._bridge_keepalive_stop.is_set():
Expand Down Expand Up @@ -1891,8 +1960,11 @@ def _run() -> None:
assert provider._bridge is not None
provider._bridge.request("core.health", {}, timeout=10.0)
except Exception as err:
if provider._is_transport_closed(err):
logger.info("MemOS: bridge keepalive reconnecting after transport close")
if provider._should_reconnect_after_keepalive_failure(err):
logger.info(
"MemOS: bridge keepalive reconnecting after failure — %s",
err,
)
with contextlib.suppress(Exception):
provider._reconnect_bridge(provider._session_id, timeout=10.0)
else:
Expand Down
173 changes: 106 additions & 67 deletions apps/memos-local-plugin/adapters/hermes/memos_provider/bridge_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,18 @@ def request(
) -> dict[str, Any]:
if self._closed:
raise BridgeError("transport_closed", "bridge client is closed")
# Layer 2 (#2028): if the subprocess has already exited, bail
# BEFORE writing to the pipe. Otherwise the write silently
# buffers into a dead pipe and the caller parks on the full
# per-request timeout.
exit_code = None
with contextlib.suppress(Exception):
exit_code = self._proc.poll()
if exit_code is not None:
raise BridgeError(
"transport_closed",
f"bridge subprocess exited (code={exit_code})",
)
with self._lock:
rpc_id = self._next_id
self._next_id += 1
Expand Down Expand Up @@ -348,86 +360,113 @@ def close(self) -> None:
logger.error("MemOS: bridge process %d could not be killed", pid)

# 5. Clean up pending requests
self._abort_pending("bridge closed")

# ─── Internals ──

def _abort_pending(self, reason: str) -> None:
"""Wake every parked JSON-RPC waiter with `transport_closed`.

Called from both `close()` and the `_read_loop` `finally:`
block, so any pending request sees a real transport error
immediately instead of parking on its per-request timeout.
Also flips `_closed` and notifies host-handler waiters so a
second `request()` fails fast.
"""
with self._host_handlers_cv:
self._closed = True
self._host_handlers_cv.notify_all()
with self._lock:
for entry in list(self._pending.values()):
entry["error"] = {
"code": -32000,
"message": "bridge closed",
"message": reason,
"data": {"code": "transport_closed"},
}
entry["event"].set()
self._pending.clear()

# ─── Internals ──

def _read_loop(self) -> None:
assert self._proc.stdout is not None
for line in self._proc.stdout:
line = line.strip()
if not line:
continue
try:
msg = json.loads(line)
except json.JSONDecodeError:
logger.debug("bridge: malformed line: %r", line[:120])
continue
if "id" in msg and msg["id"] is not None and ("result" in msg or "error" in msg):
self._resolve(msg)
continue
if msg.get("method") == "events.notify":
for cb in list(self._events):
try:
cb(msg.get("params") or {})
except Exception:
logger.debug("event listener threw", exc_info=True)
continue
if msg.get("method") == "logs.forward":
for cb in list(self._logs):
try:
cb(msg.get("params") or {})
except Exception:
logger.debug("log listener threw", exc_info=True)
continue
# Reverse-direction request: the bridge is asking the
# adapter to do something (e.g. run a fallback LLM call
# via `host.llm.complete`). Dispatch to the registered
# handler and write the response back synchronously.
method = msg.get("method")
rpc_id = msg.get("id")
if (
isinstance(method, str)
and rpc_id is not None
and "result" not in msg
and "error" not in msg
):
handler = self._host_handler_for(method)
if handler is None:
self._send_response(
rpc_id,
error={
"code": -32601,
"message": f"method not found: {method}",
"data": {"code": "unknown_method"},
},
)
try:
for line in self._proc.stdout:
line = line.strip()
if not line:
continue
params = msg.get("params") or {}
if not isinstance(params, dict):
params = {}
try:
result = handler(params)
self._send_response(rpc_id, result=result)
except Exception as err:
logger.warning("host handler %s failed: %s", method, err)
self._send_response(
rpc_id,
error={
"code": -32000,
"message": str(err) or err.__class__.__name__,
"data": {"code": "host_handler_failed"},
},
)
continue
msg = json.loads(line)
except json.JSONDecodeError:
logger.debug("bridge: malformed line: %r", line[:120])
continue
if "id" in msg and msg["id"] is not None and ("result" in msg or "error" in msg):
self._resolve(msg)
continue
if msg.get("method") == "events.notify":
for cb in list(self._events):
try:
cb(msg.get("params") or {})
except Exception:
logger.debug("event listener threw", exc_info=True)
continue
if msg.get("method") == "logs.forward":
for cb in list(self._logs):
try:
cb(msg.get("params") or {})
except Exception:
logger.debug("log listener threw", exc_info=True)
continue
# Reverse-direction request: the bridge is asking the
# adapter to do something (e.g. run a fallback LLM call
# via `host.llm.complete`). Dispatch to the registered
# handler and write the response back synchronously.
method = msg.get("method")
rpc_id = msg.get("id")
if (
isinstance(method, str)
and rpc_id is not None
and "result" not in msg
and "error" not in msg
):
handler = self._host_handler_for(method)
if handler is None:
self._send_response(
rpc_id,
error={
"code": -32601,
"message": f"method not found: {method}",
"data": {"code": "unknown_method"},
},
)
continue
params = msg.get("params") or {}
if not isinstance(params, dict):
params = {}
try:
result = handler(params)
self._send_response(rpc_id, result=result)
except Exception as err:
logger.warning("host handler %s failed: %s", method, err)
self._send_response(
rpc_id,
error={
"code": -32000,
"message": str(err) or err.__class__.__name__,
"data": {"code": "host_handler_failed"},
},
)
continue
except Exception:
# Any unexpected exception in the reader loop still needs
# to fall through to the `finally:` cleanup so callers
# don't park on 30 s timeouts.
logger.debug("bridge reader thread crashed", exc_info=True)
finally:
# Layer 1 (#2028): the reader thread is the only signal
# that the Node subprocess has stopped answering. On any
# exit — normal EOF or exception — abort pending waiters
# immediately so callers get `transport_closed` in < 1 s
# instead of waiting for each 30 s per-request timeout.
self._abort_pending("bridge subprocess exited")

def _host_handler_for(
self,
Expand Down
Loading
Loading