Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions apps/memos-local-plugin/core/pipeline/orchestrator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1367,6 +1367,9 @@ export function createPipeline(deps: PipelineDeps): PipelineHandle {
await subs.l3.drain();
await nextTick();
await subs.skills.flush();
subs.skills.lifecycleTick();
await nextTick();
await subs.skills.flush();
await subs.feedback.flush();
await embeddingRetryWorker.flush();
}
Expand Down
1 change: 1 addition & 0 deletions apps/memos-local-plugin/core/skill/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ export {
applyFeedback,
recomputeEta,
shouldArchiveIdle,
shouldPromoteCandidate,
type LifecycleUpdate,
} from "./lifecycle.js";
export {
Expand Down
11 changes: 11 additions & 0 deletions apps/memos-local-plugin/core/skill/lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,17 @@ export function shouldArchiveIdle(
return skill.eta < cfg.minEtaForRetrieval;
}

export function shouldPromoteCandidate(
skill: SkillRow,
cfg: SkillConfig,
opts: { repairOrigin: boolean },
): boolean {
if (skill.status !== "candidate") return false;
if (skill.trialsAttempted > 0) return false;
if (opts.repairOrigin) return false;
return skill.eta >= cfg.minEtaForRetrieval;
}

function clamp01(n: number): number {
if (!Number.isFinite(n)) return 0;
if (n < 0) return 0;
Expand Down
51 changes: 50 additions & 1 deletion apps/memos-local-plugin/core/skill/subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import type {
SkillTrigger,
} from "./types.js";
import type { SkillId } from "../types.js";
import type { PolicyRow, SkillRow } from "../types.js";
import { shouldPromoteCandidate } from "./lifecycle.js";

export interface SkillSubscriberDeps
extends Omit<RunSkillDeps, "log" | "bus"> {
Expand All @@ -46,6 +48,7 @@ export interface SkillSubscriberHandle {
dispose(): void;
runOnce(input: Omit<RunSkillInput, "trigger"> & { trigger?: SkillTrigger }): Promise<RunSkillResult>;
applyFeedback(skillId: SkillId, kind: SkillFeedbackKind, magnitude?: number): void;
lifecycleTick(): void;
/**
* Await any in-flight scheduled run. Primarily useful in tests where we
* want to assert on the effects of an event-driven run after the bus has
Expand Down Expand Up @@ -155,6 +158,33 @@ export function attachSkillSubscriber(
applySkillFeedback(skillId, kind, runDeps, magnitude);
}

function lifecycleTick(): void {
const now = Date.now();
const candidates = deps.repos.skills.list({ status: "candidate", limit: 500 });
for (const skill of candidates) {
if (!shouldPromoteCandidate(skill, deps.config, {
repairOrigin: isRepairOriginSkill(skill, deps.repos.policies),
})) {
continue;
}
deps.repos.skills.setStatus(skill.id, "active", now as SkillRow["updatedAt"]);
deps.bus.emit({
kind: "skill.status.changed",
at: now,
skillId: skill.id,
previous: skill.status,
next: "active",
transition: "promoted",
});
log.info("skill.candidate.auto_promoted", {
skillId: skill.id,
eta: skill.eta,
support: skill.support,
gain: skill.gain,
});
}
}

function resolveTrialsForReward(evt: Extract<RewardEvent, { kind: "reward.updated" }>): void {
const rTask = evt.result.rHuman;
const outcome =
Expand Down Expand Up @@ -207,5 +237,24 @@ export function attachSkillSubscriber(
}
}

return { dispose, runOnce, applyFeedback, flush };
return { dispose, runOnce, applyFeedback, lifecycleTick, flush };
}

function isRepairOriginSkill(
skill: SkillRow,
policies: { getById(id: PolicyRow["id"]): PolicyRow | null },
): boolean {
for (const policyId of skill.sourcePolicyIds) {
const policy = policies.getById(policyId);
if (!policy) continue;
if (
policy.experienceType === "repair_validated" ||
policy.experienceType === "repair_instruction" ||
policy.experienceType === "failure_avoidance" ||
policy.experienceType === "verifier_feedback"
) {
return true;
}
}
return false;
}
29 changes: 28 additions & 1 deletion apps/memos-local-plugin/tests/unit/skill/lifecycle.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import { describe, it, expect } from "vitest";

import { applyFeedback, recomputeEta, shouldArchiveIdle } from "../../../core/skill/lifecycle.js";
import {
applyFeedback,
recomputeEta,
shouldArchiveIdle,
shouldPromoteCandidate,
} from "../../../core/skill/lifecycle.js";
import type { PolicyRow, SkillRow } from "../../../core/types.js";
import { makeSkillConfig, NOW } from "./_helpers.js";

Expand Down Expand Up @@ -109,4 +114,26 @@ describe("skill/lifecycle", () => {
const s = mkSkill({ status: "active", eta: 0.4, updatedAt: 0 as SkillRow["updatedAt"] });
expect(shouldArchiveIdle(s, 1000, cfg, 10_000)).toBe(true);
});

it("auto-promotes untried non-repair candidates whose η reaches retrieval floor", () => {
const cfg = makeSkillConfig({ minEtaForRetrieval: 0.5 });

expect(
shouldPromoteCandidate(mkSkill({ status: "candidate", eta: 0.6 }), cfg, {
repairOrigin: false,
}),
).toBe(true);
expect(
shouldPromoteCandidate(mkSkill({ status: "candidate", eta: 0.6 }), cfg, {
repairOrigin: true,
}),
).toBe(false);
expect(
shouldPromoteCandidate(
mkSkill({ status: "candidate", eta: 0.6, trialsAttempted: 1 }),
cfg,
{ repairOrigin: false },
),
).toBe(false);
});
});
64 changes: 64 additions & 0 deletions apps/memos-local-plugin/tests/unit/skill/subscriber.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import type { PatternSignature } from "../../../core/memory/l2/types.js";
import {
makeDraft,
makeSkillConfig,
seedSkill,
seedPolicy,
seedSessionOnly,
seedTrace,
Expand Down Expand Up @@ -154,4 +155,67 @@ describe("skill/subscriber", () => {
expect(r.crystallized).toBe(1);
sub.dispose();
});

it("lifecycleTick promotes untried non-repair candidates but leaves repair-origin candidates for trials", () => {
handle = makeTmpDb();
const h = handle;
const l2Bus = createL2EventBus();
const rewardBus = createRewardEventBus();
const bus = createSkillEventBus();
const events: unknown[] = [];
bus.onAny((e) => events.push(e));

const policy = seedPolicy(h, {
id: "po_auto" as PolicyId,
gain: 0.4,
support: 4,
});
const repairPolicy = seedPolicy(h, {
id: "po_repair" as PolicyId,
gain: 0.4,
support: 4,
});
h.repos.policies.upsert({
...repairPolicy,
experienceType: "repair_instruction",
});
const eligible = seedSkill(h, {
id: "sk_auto" as never,
eta: 0.6,
trialsAttempted: 0,
sourcePolicyIds: [policy.id],
});
const repair = seedSkill(h, {
id: "sk_repair" as never,
eta: 0.6,
trialsAttempted: 0,
sourcePolicyIds: [repairPolicy.id],
});

const sub = attachSkillSubscriber({
l2Bus,
rewardBus,
bus,
repos: h.repos,
embedder: null,
llm: null,
log: rootLogger.child({ channel: "core.skill.subscriber" }),
config: makeSkillConfig({ minEtaForRetrieval: 0.5 }),
});

sub.lifecycleTick();

expect(h.repos.skills.getById(eligible.id)!.status).toBe("active");
expect(h.repos.skills.getById(repair.id)!.status).toBe("candidate");
expect(events).toContainEqual(
expect.objectContaining({
kind: "skill.status.changed",
skillId: eligible.id,
previous: "candidate",
next: "active",
transition: "promoted",
}),
);
sub.dispose();
});
});