Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add SpaCy Processor for Enhanced NLP Support in Quivr #3468

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 143 additions & 0 deletions core/quivr_core/processor/implementations/spaCy_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
import logging
import os
import spacy
import aiofiles
import pandas as pd
import fitz # PyMuPDF for PDF processing
import docx # python-docx for DOCX processing
from langchain_core.documents import Document
from langchain_text_splitters import RecursiveCharacterTextSplitter, TextSplitter

from quivr_core.files.file import QuivrFile
from quivr_core.processor.processor_base import ProcessorBase
from quivr_core.processor.registry import FileExtension
from quivr_core.processor.splitter import SplitterConfig

logger = logging.getLogger("quivr_core")


class SpaCyProcessor(ProcessorBase):
"""
SpaCyProcessor for handling various text file types with spaCy NLP.

It extracts and processes text content using spaCy's NLP pipeline.

## Installation
```bash
pip install spacy pandas pymupdf python-docx
python -m spacy download en_core_web_sm
```
"""

supported_extensions = [
FileExtension.pdf,
FileExtension.docx,
FileExtension.txt,
FileExtension.csv,
]

def __init__(
self,
splitter: TextSplitter | None = None,
splitter_config: SplitterConfig = SplitterConfig(),
spacy_model: str = "en_core_web_sm"
) -> None:
# Load spaCy model
try:
self.nlp = spacy.load(spacy_model)
except Exception as e:
logger.error(f"Failed to load spaCy model '{spacy_model}': {e}")
raise

self.splitter_config = splitter_config

if splitter:
self.text_splitter = splitter
else:
self.text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
chunk_size=splitter_config.chunk_size,
chunk_overlap=splitter_config.chunk_overlap,
)

@property
def processor_metadata(self):
return {
"processor_cls": "SpaCyProcessor",
"chunk_overlap": self.splitter_config.chunk_overlap,
}

async def process_file_inner(self, file: QuivrFile) -> list[Document]:
# Extract text based on file type
try:
if file.extension == FileExtension.pdf:
text = await self.extract_text_from_pdf(file)
elif file.extension == FileExtension.docx:
text = await self.extract_text_from_docx(file)
elif file.extension == FileExtension.txt:
text = await self.extract_text_from_txt(file)
elif file.extension == FileExtension.csv:
text = await self.extract_text_from_csv(file)
else:
raise ValueError(f"Unsupported file type: {file.extension}")

# Check for empty text
if not text:
logger.warning(f"No content extracted from file: {file.path}")
return []

# Apply spaCy NLP processing
doc = Document(page_content=text)
processed_docs = self.text_splitter.split_documents([doc])

for doc in processed_docs:
spacy_doc = self.nlp(doc.page_content)
doc.metadata.update({
"chunk_size": len(spacy_doc),
"entities": [(ent.text, ent.label_) for ent in spacy_doc.ents],
"sentences": [sent.text for sent in spacy_doc.sents]
})
doc.page_content = spacy_doc.text

return processed_docs

except Exception as e:
logger.error(f"Error processing file '{file.path}': {e}")
return []

async def extract_text_from_pdf(self, file: QuivrFile) -> str:
try:
async with file.open():
doc = fitz.open(file.path)
text = ""
for page in doc:
text += page.get_text()
return text
except Exception as e:
logger.error(f"Error extracting text from PDF '{file.path}': {e}")
return ""

async def extract_text_from_docx(self, file: QuivrFile) -> str:
try:
doc = docx.Document(file.path)
text = "\n".join([para.text for para in doc.paragraphs])
return text
except Exception as e:
logger.error(f"Error extracting text from DOCX '{file.path}': {e}")
return ""

async def extract_text_from_txt(self, file: QuivrFile) -> str:
try:
async with aiofiles.open(file.path, mode="r") as f:
content = await f.read()
return content
except Exception as e:
logger.error(f"Error extracting text from TXT '{file.path}': {e}")
return ""

async def extract_text_from_csv(self, file: QuivrFile) -> str:
try:
df = pd.read_csv(file.path)
return ' '.join(df.astype(str).values.flatten())
except Exception as e:
logger.error(f"Error extracting text from CSV '{file.path}': {e}")
return ""
Loading