Skip to content

Commit

Permalink
Merge pull request #47 from rmusser01/main
Browse files Browse the repository at this point in the history
Bugfixes, + XML ingestion, global api default setting, anki validation
  • Loading branch information
rmusser01 authored Oct 24, 2024
2 parents 30d1cb4 + 36cf40b commit 70880d4
Show file tree
Hide file tree
Showing 47 changed files with 3,657 additions and 1,109 deletions.
Binary file modified .gitignore
Binary file not shown.
2 changes: 1 addition & 1 deletion App_Function_Libraries/Audio/Audio_Transcription_Lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,4 +332,4 @@ def save_audio_temp(audio_data, sample_rate=16000):

#
#
#######################################################################################################################
#######################################################################################################################
144 changes: 144 additions & 0 deletions App_Function_Libraries/Books/Book_Ingestion_Lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
import zipfile
from datetime import datetime
import logging
import xml.etree.ElementTree as ET
import html2text
import csv
#
# External Imports
import ebooklib
Expand Down Expand Up @@ -241,6 +244,147 @@ def process_zip_file(zip_file,
return "\n".join(results)


def import_html(file_path, title=None, author=None, keywords=None, **kwargs):
"""
Imports an HTML file and converts it to markdown format.
"""
try:
logging.info(f"Importing HTML file from {file_path}")
h = html2text.HTML2Text()
h.ignore_links = False

with open(file_path, 'r', encoding='utf-8') as file:
html_content = file.read()

markdown_content = h.handle(html_content)

# Extract title from HTML if not provided
if not title:
soup = BeautifulSoup(html_content, 'html.parser')
title_tag = soup.find('title')
title = title_tag.string if title_tag else os.path.basename(file_path)

return process_markdown_content(markdown_content, file_path, title, author, keywords, **kwargs)

except Exception as e:
logging.exception(f"Error importing HTML file: {str(e)}")
raise


def import_xml(file_path, title=None, author=None, keywords=None, **kwargs):
"""
Imports an XML file and converts it to markdown format.
"""
try:
logging.info(f"Importing XML file from {file_path}")
tree = ET.parse(file_path)
root = tree.getroot()

# Convert XML to markdown
markdown_content = xml_to_markdown(root)

return process_markdown_content(markdown_content, file_path, title, author, keywords, **kwargs)

except Exception as e:
logging.exception(f"Error importing XML file: {str(e)}")
raise


def import_opml(file_path, title=None, author=None, keywords=None, **kwargs):
"""
Imports an OPML file and converts it to markdown format.
"""
try:
logging.info(f"Importing OPML file from {file_path}")
tree = ET.parse(file_path)
root = tree.getroot()

# Extract title from OPML if not provided
if not title:
title_elem = root.find(".//title")
title = title_elem.text if title_elem is not None else os.path.basename(file_path)

# Convert OPML to markdown
markdown_content = opml_to_markdown(root)

return process_markdown_content(markdown_content, file_path, title, author, keywords, **kwargs)

except Exception as e:
logging.exception(f"Error importing OPML file: {str(e)}")
raise


def xml_to_markdown(element, level=0):
"""
Recursively converts XML elements to markdown format.
"""
markdown = ""

# Add element name as heading
if level > 0:
markdown += f"{'#' * min(level, 6)} {element.tag}\n\n"

# Add element text if it exists
if element.text and element.text.strip():
markdown += f"{element.text.strip()}\n\n"

# Process child elements
for child in element:
markdown += xml_to_markdown(child, level + 1)

return markdown


def opml_to_markdown(root):
"""
Converts OPML structure to markdown format.
"""
markdown = "# Table of Contents\n\n"

def process_outline(outline, level=0):
result = ""
for item in outline.findall("outline"):
text = item.get("text", "")
result += f"{' ' * level}- {text}\n"
result += process_outline(item, level + 1)
return result

body = root.find(".//body")
if body is not None:
markdown += process_outline(body)

return markdown


def process_markdown_content(markdown_content, file_path, title, author, keywords, **kwargs):
"""
Processes markdown content and adds it to the database.
"""
info_dict = {
'title': title or os.path.basename(file_path),
'uploader': author or "Unknown",
'ingestion_date': datetime.now().strftime('%Y-%m-%d')
}

# Create segments (you may want to adjust the chunking method)
segments = [{'Text': markdown_content}]

# Add to database
result = add_media_to_database(
url=file_path,
info_dict=info_dict,
segments=segments,
summary=kwargs.get('summary', "No summary provided"),
keywords=keywords.split(',') if keywords else [],
custom_prompt_input=kwargs.get('custom_prompt'),
whisper_model="Imported",
media_type="document",
overwrite=False
)

return f"Document '{title}' imported successfully. Database result: {result}"


def import_file_handler(file,
title,
author,
Expand Down
147 changes: 146 additions & 1 deletion App_Function_Libraries/Chunk_Lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import logging
import re
from typing import Any, Dict, List, Optional, Tuple
import xml.etree.ElementTree as ET
#
# Import 3rd party
from openai import OpenAI
Expand All @@ -23,7 +24,6 @@
from sklearn.metrics.pairwise import cosine_similarity
#
# Import Local
from App_Function_Libraries.Tokenization_Methods_Lib import openai_tokenize
from App_Function_Libraries.Utils.Utils import load_comprehensive_config
#
#######################################################################################################################
Expand Down Expand Up @@ -943,6 +943,151 @@ def chunk_ebook_by_chapters(text: str, chunk_options: Dict[str, Any]) -> List[Di
#
# End of ebook chapter chunking
#######################################################################################################################
#
# XML Chunking

def extract_xml_structure(element, path=""):
"""
Recursively extract XML structure and content.
Returns a list of (path, text) tuples.
"""
results = []
current_path = f"{path}/{element.tag}" if path else element.tag

# Get direct text content
if element.text and element.text.strip():
results.append((current_path, element.text.strip()))

# Process attributes if any
if element.attrib:
for key, value in element.attrib.items():
results.append((f"{current_path}/@{key}", value))

# Process child elements
for child in element:
results.extend(extract_xml_structure(child, current_path))

return results


def chunk_xml(xml_text: str, chunk_options: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
Enhanced XML chunking that preserves structure and hierarchy.
Processes XML content into chunks while maintaining structural context.
Args:
xml_text (str): The XML content as a string
chunk_options (Dict[str, Any]): Configuration options including:
- max_size (int): Maximum chunk size (default: 1000)
- overlap (int): Number of overlapping elements (default: 0)
- method (str): Chunking method (default: 'xml')
- language (str): Content language (default: 'english')
Returns:
List[Dict[str, Any]]: List of chunks, each containing:
- text: The chunk content
- metadata: Chunk metadata including XML paths and chunking info
"""
logging.debug("Starting XML chunking process...")

try:
# Parse XML content
root = ET.fromstring(xml_text)
chunks = []

# Get chunking parameters with defaults
max_size = chunk_options.get('max_size', 1000)
overlap = chunk_options.get('overlap', 0)
language = chunk_options.get('language', 'english')

logging.debug(f"Chunking parameters - max_size: {max_size}, overlap: {overlap}, language: {language}")

# Extract full structure with hierarchy
xml_content = extract_xml_structure(root)
logging.debug(f"Extracted {len(xml_content)} XML elements")

# Initialize chunking variables
current_chunk = []
current_size = 0
chunk_count = 0

# Process XML content into chunks
for path, content in xml_content:
# Calculate content size (by words)
content_size = len(content.split())

# Check if adding this content would exceed max_size
if current_size + content_size > max_size and current_chunk:
# Create chunk from current content
chunk_text = '\n'.join(f"{p}: {c}" for p, c in current_chunk)
chunk_count += 1

# Create chunk with metadata
chunks.append({
'text': chunk_text,
'metadata': {
'paths': [p for p, _ in current_chunk],
'chunk_method': 'xml',
'chunk_index': chunk_count,
'max_size': max_size,
'overlap': overlap,
'language': language,
'root_tag': root.tag,
'xml_attributes': dict(root.attrib)
}
})

# Handle overlap if specified
if overlap > 0:
# Keep last few items for overlap
overlap_items = current_chunk[-overlap:]
current_chunk = overlap_items
current_size = sum(len(c.split()) for _, c in overlap_items)
logging.debug(f"Created overlap chunk with {len(overlap_items)} items")
else:
current_chunk = []
current_size = 0

# Add current content to chunk
current_chunk.append((path, content))
current_size += content_size

# Process final chunk if content remains
if current_chunk:
chunk_text = '\n'.join(f"{p}: {c}" for p, c in current_chunk)
chunk_count += 1

chunks.append({
'text': chunk_text,
'metadata': {
'paths': [p for p, _ in current_chunk],
'chunk_method': 'xml',
'chunk_index': chunk_count,
'max_size': max_size,
'overlap': overlap,
'language': language,
'root_tag': root.tag,
'xml_attributes': dict(root.attrib)
}
})

# Update total chunks count in metadata
for chunk in chunks:
chunk['metadata']['total_chunks'] = chunk_count

logging.debug(f"XML chunking complete. Created {len(chunks)} chunks")
return chunks

except ET.ParseError as e:
logging.error(f"XML parsing error: {str(e)}")
raise
except Exception as e:
logging.error(f"Unexpected error during XML chunking: {str(e)}")
raise

#
# End of XML Chunking
#######################################################################################################################

#######################################################################################################################
#
Expand Down
Loading

0 comments on commit 70880d4

Please sign in to comment.