-
Notifications
You must be signed in to change notification settings - Fork 17
Expand file tree
/
Copy pathpipeline.py
More file actions
29 lines (23 loc) · 877 Bytes
/
pipeline.py
File metadata and controls
29 lines (23 loc) · 877 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
"""IngestionPipeline — drives the Source × Extractor matrix (★③).
A document is fetched from a Source then handed to an Extractor; the resulting
candidates are shown for user confirmation before they land in the semantic
layer (documents stay the source of truth). The pipeline itself is axis-blind:
any Source pairs with any Extractor.
"""
from __future__ import annotations
from ..core.ports.ingestion import (
DocExtractorPort,
SemanticCandidate,
SourcePort,
)
class IngestionPipeline:
"""Fetch a document from ``source`` then extract candidates with ``extractor``."""
async def ingest(
self,
source: SourcePort,
extractor: DocExtractorPort,
ref: str,
blob: bytes | None = None,
) -> list[SemanticCandidate]:
doc = await source.fetch(ref, blob)
return await extractor.extract(doc)