Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 45 additions & 1 deletion openviking/server/routers/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import time
import uuid
from pathlib import Path
from typing import Any, Optional
from typing import Any, Dict, Optional

from fastapi import APIRouter, Depends, File, Form, UploadFile
from pydantic import BaseModel, model_validator
Expand Down Expand Up @@ -91,6 +91,25 @@ class AddSkillRequest(BaseModel):
telemetry: TelemetryRequest = False


class PatchResourceRequest(BaseModel):
"""Request model for patching resource metadata and summaries.

At least one of meta, abstract, or overview must be provided.
"""

uri: str
meta: Optional[Dict[str, Any]] = None
abstract: Optional[str] = None
overview: Optional[str] = None
telemetry: TelemetryRequest = False

@model_validator(mode="after")
def check_has_update(self):
if self.meta is None and self.abstract is None and self.overview is None:
raise ValueError("At least one of 'meta', 'abstract', or 'overview' must be provided")
return self


def _cleanup_temp_files(temp_dir: Path, max_age_hours: int = 1):
"""Clean up temporary files older than max_age_hours."""
if not temp_dir.exists():
Expand Down Expand Up @@ -218,3 +237,28 @@ async def add_skill(
result=execution.result,
telemetry=execution.telemetry,
).model_dump(exclude_none=True)


@router.patch("/resources")
async def patch_resource(
request: PatchResourceRequest,
_ctx: RequestContext = Depends(get_request_context),
):
"""Patch resource metadata and/or summaries (L0 abstract, L1 overview)."""
service = get_service()
execution = await run_operation(
operation="resources.patch_resource",
telemetry=request.telemetry,
fn=lambda: service.resources.patch_resource(
uri=request.uri,
ctx=_ctx,
meta=request.meta,
abstract=request.abstract,
overview=request.overview,
),
)
return Response(
status="ok",
result=execution.result,
telemetry=execution.telemetry,
).model_dump(exclude_none=True)
97 changes: 97 additions & 0 deletions openviking/service/resource_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,16 @@

import json
import time
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Any, Dict, List, Optional

from openviking.core.context import Context, ContextLevel
from openviking.pyagfs.exceptions import AGFSClientError
from openviking.server.identity import RequestContext
from openviking.storage import VikingDBManager
from openviking.storage.id_utils import compute_record_id
from openviking.storage.queuefs import get_queue_manager
from openviking.storage.queuefs.embedding_msg_converter import EmbeddingMsgConverter
from openviking.storage.viking_fs import VikingFS
from openviking.telemetry import get_current_telemetry
from openviking.telemetry.resource_summary import (
Expand All @@ -27,6 +32,7 @@
ConflictError,
DeadlineExceededError,
InvalidArgumentError,
NotFoundError,
NotInitializedError,
)
from openviking_cli.utils import get_logger
Expand Down Expand Up @@ -446,6 +452,97 @@ async def summarize(
self._ensure_initialized()
return await self._resource_processor.summarize(resource_uris, ctx, **kwargs)

async def patch_resource(
self,
uri: str,
ctx: RequestContext,
meta: Optional[Dict[str, Any]] = None,
abstract: Optional[str] = None,
overview: Optional[str] = None,
) -> Dict[str, Any]:
"""Patch resource metadata and/or summaries.

Args:
uri: Resource URI (e.g., "viking://resources/doc_name")
ctx: Request context
meta: Updated metadata dict (merged with existing)
abstract: New L0 abstract text (rewrites .abstract.md and re-embeds)
overview: New L1 overview text (rewrites .overview.md and re-embeds)

Returns:
Dict with uri, updated fields list, and skipped fields list.
"""
self._ensure_initialized()

# Verify resource exists — only map "not found" to 404, let other errors propagate
try:
await self._viking_fs.stat(uri, ctx=ctx)
except AGFSClientError as e:
err_msg = str(e).lower()
if "not found" in err_msg or "no such file or directory" in err_msg:
raise NotFoundError(uri)
raise

updated: List[str] = []
skipped: List[str] = []

# --- Meta update: fetch → merge → upsert with existing vector ---
if meta is not None:
record_id = compute_record_id(ctx.account_id, uri, level=2)
records = await self._vikingdb.get(ids=[record_id], ctx=ctx)
if records:
record = dict(records[0])
existing_meta = record.get("meta") or {}
merged_meta = {**existing_meta, **meta}
record["meta"] = merged_meta
record["updated_at"] = datetime.now(timezone.utc).isoformat()
await self._vikingdb.upsert(record, ctx=ctx)
updated.append("meta")
else:
skipped.append("meta")

# --- Abstract (L0) update: write file + re-embed ---
if abstract is not None:
abstract_uri = f"{uri}/.abstract.md"
await self._viking_fs.write_file(abstract_uri, abstract, ctx=ctx)

context = Context(
uri=uri,
abstract=abstract,
level=ContextLevel.ABSTRACT,
context_type="resource",
account_id=ctx.account_id,
user=ctx.user,
owner_space="",
)
context.vectorize.text = abstract
embedding_msg = EmbeddingMsgConverter.from_context(context)
if embedding_msg:
await self._vikingdb.enqueue_embedding_msg(embedding_msg)
updated.append("abstract")

# --- Overview (L1) update: write file + re-embed ---
if overview is not None:
overview_uri = f"{uri}/.overview.md"
await self._viking_fs.write_file(overview_uri, overview, ctx=ctx)

context = Context(
uri=uri,
abstract=overview,
level=ContextLevel.OVERVIEW,
context_type="resource",
account_id=ctx.account_id,
user=ctx.user,
owner_space="",
)
context.vectorize.text = overview
embedding_msg = EmbeddingMsgConverter.from_context(context)
if embedding_msg:
await self._vikingdb.enqueue_embedding_msg(embedding_msg)
updated.append("overview")

return {"uri": uri, "updated": updated, "skipped": skipped}

async def wait_processed(self, timeout: Optional[float] = None) -> Dict[str, Any]:
"""Wait for all queued processing to complete.

Expand Down
12 changes: 2 additions & 10 deletions openviking/storage/collection_schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from openviking.models.embedder.base import EmbedResult
from openviking.server.identity import RequestContext, Role
from openviking.storage.errors import CollectionNotFoundError
from openviking.storage.id_utils import seed_uri_for_id
from openviking.storage.queuefs.embedding_msg import EmbeddingMsg
from openviking.storage.queuefs.named_queue import DequeueHandlerBase
from openviking.storage.viking_vector_index_backend import VikingVectorIndexBackend
Expand Down Expand Up @@ -201,16 +202,7 @@ def consume_request_stats(cls, telemetry_id: str) -> Optional[RequestQueueStats]
@staticmethod
def _seed_uri_for_id(uri: str, level: Any) -> str:
"""Build deterministic id seed URI from canonical uri + hierarchy level."""
try:
level_int = int(level)
except (TypeError, ValueError):
level_int = 2

if level_int == 0:
return uri if uri.endswith("/.abstract.md") else f"{uri}/.abstract.md"
if level_int == 1:
return uri if uri.endswith("/.overview.md") else f"{uri}/.overview.md"
return uri
return seed_uri_for_id(uri, level)

async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str, Any]]:
"""Process dequeued message and add embedding vector(s)."""
Expand Down
48 changes: 48 additions & 0 deletions openviking/storage/id_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd.
# SPDX-License-Identifier: Apache-2.0
"""Deterministic record ID utilities for VectorDB.

Provides a single source of truth for computing record IDs from URIs and levels.
Used by collection_schemas, viking_vector_index_backend, and resource_service.
"""

import hashlib
from typing import Any


def seed_uri_for_id(uri: str, level: Any) -> str:
"""Build deterministic id seed URI from canonical uri + hierarchy level.

Args:
uri: Viking URI (e.g., "viking://resources/doc_name")
level: Context level (0=abstract, 1=overview, 2=detail)

Returns:
Seed URI with appropriate suffix for the given level.
"""
try:
level_int = int(level)
except (TypeError, ValueError):
level_int = 2

if level_int == 0:
return uri if uri.endswith("/.abstract.md") else f"{uri}/.abstract.md"
if level_int == 1:
return uri if uri.endswith("/.overview.md") else f"{uri}/.overview.md"
return uri


def compute_record_id(account_id: str, uri: str, level: Any) -> str:
"""Compute deterministic VectorDB record ID for a given URI and level.

Args:
account_id: Tenant account ID
uri: Viking URI
level: Context level (0=abstract, 1=overview, 2=detail)

Returns:
MD5 hex digest used as the VectorDB record ID.
"""
seed = seed_uri_for_id(uri, level)
id_seed = f"{account_id}:{seed}"
return hashlib.md5(id_seed.encode("utf-8")).hexdigest()
10 changes: 2 additions & 8 deletions openviking/storage/viking_vector_index_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from openviking.server.identity import RequestContext, Role
from openviking.storage.expr import And, Eq, FilterExpr, In, Or, PathScope, RawDSL
from openviking.storage.id_utils import seed_uri_for_id
from openviking.storage.vectordb.collection.collection import Collection
from openviking.storage.vectordb.utils.logging_init import init_cpp_logging
from openviking.storage.vectordb_adapters import create_collection_adapter
Expand Down Expand Up @@ -868,13 +869,6 @@ async def update_uri_mapping(
if not records:
return False

def _seed_uri_for_id(uri: str, level: int) -> str:
if level == 0:
return uri if uri.endswith("/.abstract.md") else f"{uri}/.abstract.md"
if level == 1:
return uri if uri.endswith("/.overview.md") else f"{uri}/.overview.md"
return uri

success = False
ids_to_delete: List[str] = []
for record in records:
Expand All @@ -886,7 +880,7 @@ def _seed_uri_for_id(uri: str, level: int) -> str:
except (TypeError, ValueError):
level = 2

seed_uri = _seed_uri_for_id(new_uri, level)
seed_uri = seed_uri_for_id(new_uri, level)
id_seed = f"{ctx.account_id}:{seed_uri}"
new_id = hashlib.md5(id_seed.encode("utf-8")).hexdigest()

Expand Down
Loading
Loading