Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions apps/memos-local-plugin/agent-contract/memory-core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,22 @@ export interface MemoryCore {
health(): Promise<CoreHealth>;
/** Late-bind ARMS telemetry (called after config is available). */
bindTelemetry?(t: unknown): void;
/**
* Resolve when the background recovery kicked off by `init()` settles.
*
* `init()` returns as soon as the synchronous orphan / dirty-episode
* classification finishes; the actual reflect / reward / L2 recovery
* chain runs on a background promise so the host's event loop stays
* responsive (see issues #1776 + #1808). This method exposes that
* promise for callers that need the historic "await everything"
* semantics — primarily tests and one-shot batch tools.
*
* Implementations MUST never reject from this promise. Failures are
* logged on the `init.background_recovery_failed` channel instead.
* Adapters that have no startup recovery may omit the method; an
* absent implementation is equivalent to `() => Promise.resolve()`.
*/
waitForStartupRecovery?(): Promise<void>;

// ── session / episode ──
openSession(input: {
Expand Down
188 changes: 169 additions & 19 deletions apps/memos-local-plugin/core/pipeline/memory-core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,31 @@ export function createMemoryCore(
handle.config.algorithm.session.mergeMaxGapMs * 2,
4 * 60 * 60 * 1000,
);

// ─── Dirty closed-episode rescore backoff (issue #1808) ──
// Failed reward / reflect runs on dirty closed episodes used to be
// retried on every restart and every periodic rescan. The OpenClaw
// Gateway report at #1808 attributed >3s `eventLoopMax` bursts to this
// tight retry loop hammering the LLM on already-broken rows. We now
// track per-episode failure counts in `meta.rewardDirty` and require
// an exponential cool-down before retrying a row that has already
// failed `MAX_DIRTY_REWARD_ATTEMPTS` times. Manual feedback /
// `runManually` still rescore unconditionally — the backoff only
// applies to the automatic rescan paths.
const MAX_DIRTY_REWARD_ATTEMPTS = 3;
const DIRTY_REWARD_BACKOFF_BASE_MS = 60 * 60 * 1000; // 1h
const DIRTY_REWARD_BACKOFF_MAX_MS = 24 * 60 * 60 * 1000; // 24h

// ─── Startup recovery background promise (issue #1776 + #1808) ──
// `init()` used to `await` the entire reflect → reward → L2 chain for
// every stale / dirty episode found in SQLite. On databases with
// 30k+ traces this blocked the Gateway's main event loop for 3-5s+,
// long enough to time out the WebSocket read probe (3s budget) used
// by TUI / Control UI clients. We now keep the synchronous
// classification on the main thread (it only touches the DB) and
// detach the slow recovery to this promise. `waitForStartupRecovery`
// exposes it so tests can opt back into the deterministic semantics.
let startupRecoveryPromise: Promise<void> = Promise.resolve();
let lastStaleScan = 0;
let lastDirtyClosedScan = 0;
async function autoFinalizeStaleTasks(): Promise<void> {
Expand Down Expand Up @@ -610,9 +635,12 @@ export function createMemoryCore(
if (nowMs - lastDirtyClosedScan < 30_000) return;
lastDirtyClosedScan = nowMs;
try {
const dirtyClosed = handle.repos.episodes
const allDirty = handle.repos.episodes
.list({ status: "closed", limit: 500 })
.filter((ep) => !isLightweightEpisode(ep) && episodeRewardIsDirty(ep));
// Apply the same backoff filter as init() so the 10-min periodic
// scan does not hammer episodes whose LLM call keeps failing.
const dirtyClosed = allDirty.filter((ep) => dirtyEpisodeBackoffElapsed(ep, nowMs));
if (dirtyClosed.length > 0) {
await recoverDirtyClosedEpisodes(dirtyClosed);
}
Expand All @@ -623,14 +651,6 @@ export function createMemoryCore(
}
}

function scheduleStartupRecovery(label: string, task: () => Promise<void>): void {
void task().catch((err) => {
log.debug(`${label}.failed`, {
err: err instanceof Error ? err.message : String(err),
});
});
}

function makeHubRuntime(config: ResolvedConfig): HubRuntime {
return createHubRuntime({
repos: handle.repos,
Expand Down Expand Up @@ -899,6 +919,16 @@ export function createMemoryCore(
// not evidence that the topic ended; the next user turn gets routed
// through relation classification. Only hard-stale open topics are
// finalized here so the pipeline eventually catches up.
//
// ── Issue #1776 + #1808: stale + dirty recovery used to run inside
// `await` here, blocking `init()` for seconds-to-minutes on big
// databases and starving the OpenClaw Gateway event loop. We now
// collect the slow inputs synchronously (the DB list + classify is
// cheap) and detach the actual reflect / reward chain onto
// `startupRecoveryPromise`. Tests that need the historic semantics
// call `core.waitForStartupRecovery()` after `init()`.
let staleForBackground: Array<EpisodeRow & { meta?: Record<string, unknown> }> = [];
let dirtyClosedForBackground: Array<EpisodeRow & { meta?: Record<string, unknown> }> = [];
try {
const orphans = handle.repos.episodes.list({ status: "open", limit: 500 });
if (orphans.length > 0) {
Expand Down Expand Up @@ -928,26 +958,68 @@ export function createMemoryCore(
recoveredAtStartup: nowMs,
});
}
if (stale.length > 0) {
scheduleStartupRecovery("startup.open_recovery", async () => {
await recoverOpenEpisodesAsSessionEnd(stale);
});
}
staleForBackground = stale;
}
const dirtyClosed = handle.repos.episodes
const nowForDirty = Date.now();
const allDirty = handle.repos.episodes
.list({ status: "closed", limit: 500 })
.filter((ep) => !isLightweightEpisode(ep) && episodeRewardIsDirty(ep));
if (dirtyClosed.length > 0) {
scheduleStartupRecovery("startup.dirty_closed_recovery", async () => {
await recoverDirtyClosedEpisodes(dirtyClosed);
});
const dirtyClosed: typeof allDirty = [];
for (const ep of allDirty) {
if (dirtyEpisodeBackoffElapsed(ep, nowForDirty)) {
dirtyClosed.push(ep);
} else {
const dirtyMeta = (ep.meta?.rewardDirty as
| { failedAttempts?: number; lastFailureAt?: number }
| undefined) ?? {};
log.debug("init.dirty_closed_episodes.skip_backoff", {
episodeId: ep.id,
failedAttempts: dirtyMeta.failedAttempts ?? 0,
lastFailureAt: dirtyMeta.lastFailureAt ?? 0,
});
}
}
dirtyClosedForBackground = dirtyClosed;
} catch (err) {
log.debug("init.orphan_scan.failed", {
err: err instanceof Error ? err.message : String(err),
});
}

// Kick the slow recovery chain off the main thread. `init()` returns
// as soon as the synchronous classification above finishes, so the
// Gateway can start accepting WebSocket upgrades immediately.
if (staleForBackground.length > 0 || dirtyClosedForBackground.length > 0) {
const stale = staleForBackground;
const dirtyClosed = dirtyClosedForBackground;
log.info("init.background_recovery_started", {
staleCount: stale.length,
dirtyClosedCount: dirtyClosed.length,
});
const recoveryStartedAt = Date.now();
startupRecoveryPromise = (async () => {
try {
if (stale.length > 0) {
await recoverOpenEpisodesAsSessionEnd(stale);
}
if (dirtyClosed.length > 0) {
await recoverDirtyClosedEpisodes(dirtyClosed);
}
log.info("init.background_recovery_finished", {
staleCount: stale.length,
dirtyClosedCount: dirtyClosed.length,
durationMs: Date.now() - recoveryStartedAt,
});
} catch (err) {
log.warn("init.background_recovery_failed", {
err: err instanceof Error ? err.message : String(err),
staleCount: stale.length,
dirtyClosedCount: dirtyClosed.length,
});
}
})();
}

// Periodic rescore timer for episodes that miss the startup scan or
// retry of failed reward runs. 10-minute interval is safe because
// autoRescoreDirtyClosedEpisodes has its own 30-second dedup guard.
Expand Down Expand Up @@ -1276,10 +1348,21 @@ export function createMemoryCore(
episodes: Array<EpisodeRow & { meta?: Record<string, unknown> }>,
): Promise<void> {
log.info("init.dirty_closed_episodes.rescore", { count: episodes.length });
// Snapshot the prior failure counters so we can increment them later
// (after the bus chain settles) without an extra DB read.
const priorFailedAttempts = new Map<EpisodeId, number>();
for (const ep of episodes) {
if (isLightweightEpisode(ep)) continue;
const episodeId = ep.id as EpisodeId;
const endedAt = ep.endedAt ?? Date.now();
const prevDirty = (ep.meta?.rewardDirty as
| { failedAttempts?: unknown }
| undefined) ?? {};
const prevAttempts =
typeof prevDirty.failedAttempts === "number"
? prevDirty.failedAttempts
: 0;
priorFailedAttempts.set(episodeId, prevAttempts);
handle.repos.episodes.updateMeta(episodeId, {
closeReason: "finalized",
recoveredAtStartup: endedAt,
Expand All @@ -1295,6 +1378,31 @@ export function createMemoryCore(
});
}
await handle.flush();
// After the reward / reflect chain has finished, account for the
// outcome: clear `meta.rewardDirty` on episodes that are no longer
// dirty (success), bump `failedAttempts + lastFailureAt` on episodes
// that still match the dirty predicate (LLM failure / no-op). This
// closes the "retried indefinitely" loop reported in issue #1808.
const now = Date.now();
for (const [episodeId, prevAttempts] of priorFailedAttempts) {
const after = handle.repos.episodes.getById(episodeId);
if (!after) continue;
const stillDirty = episodeRewardIsDirty(after);
if (stillDirty) {
handle.repos.episodes.updateMeta(episodeId, {
rewardDirty: {
failedAttempts: prevAttempts + 1,
lastFailureAt: now,
},
});
} else if (
after.meta &&
typeof after.meta === "object" &&
"rewardDirty" in after.meta
) {
handle.repos.episodes.updateMeta(episodeId, { rewardDirty: undefined });
}
}
}

function episodeRewardIsDirty(ep: EpisodeRow & { meta?: Record<string, unknown> }): boolean {
Expand Down Expand Up @@ -1352,6 +1460,37 @@ export function createMemoryCore(
return handle.repos.traces.hasAnyNewerThan(traceIds, scoredAt);
}

/**
* Backoff filter for the automatic dirty-rescore scans (issue #1808).
*
* Returns true when the row is eligible for another reward rescore on
* the auto path (init scan + 10-minute periodic). When a row has
* failed `MAX_DIRTY_REWARD_ATTEMPTS` consecutive automatic rescores,
* we wait an exponentially increasing window (1h → 24h cap) before
* attempting again. This stops the "retried indefinitely with no
* backoff" symptom reported on the OpenClaw Gateway.
*
* Manual paths (`submitFeedback`, `runManually`) do NOT consult this
* filter — explicit user / operator intent should always re-trigger.
*/
function dirtyEpisodeBackoffElapsed(
ep: EpisodeRow & { meta?: Record<string, unknown> },
nowMs: number,
): boolean {
const dirty = (ep.meta?.rewardDirty as
| { failedAttempts?: unknown; lastFailureAt?: unknown }
| undefined) ?? {};
const attempts = typeof dirty.failedAttempts === "number" ? dirty.failedAttempts : 0;
if (attempts < MAX_DIRTY_REWARD_ATTEMPTS) return true;
const lastFailureAt = typeof dirty.lastFailureAt === "number" ? dirty.lastFailureAt : 0;
const exponent = Math.min(attempts - MAX_DIRTY_REWARD_ATTEMPTS, 5);
const wait = Math.min(
DIRTY_REWARD_BACKOFF_BASE_MS * (1 << exponent),
DIRTY_REWARD_BACKOFF_MAX_MS,
);
return nowMs - lastFailureAt >= wait;
}

function snapshotFromRecoveredEpisode(
ep: EpisodeRow & { meta?: Record<string, unknown> },
endedAt: number,
Expand Down Expand Up @@ -1459,6 +1598,16 @@ export function createMemoryCore(
if (shutDown) return;
shutDown = true;
try {
// Make sure the background startup recovery (issue #1808) has
// finished before we tear down the bus / DB handle. Without this
// wait, a fast `init → shutdown` race during tests or a quick
// gateway reload would close SQLite while reflect / reward is
// mid-flush, producing `SQLITE_MISUSE` noise on the way down.
try {
await startupRecoveryPromise;
} catch {
/* already logged inside the recovery promise */
}
try {
await hubRuntime?.stop();
} catch (err) {
Expand Down Expand Up @@ -4526,6 +4675,7 @@ export function createMemoryCore(
init,
shutdown,
health,
waitForStartupRecovery: () => startupRecoveryPromise,
bindTelemetry(t: import("../telemetry/index.js").Telemetry) { telemetry = t; },
openSession,
closeSession,
Expand Down
Loading
Loading