Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/typeagent/emails/email_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,11 +207,11 @@ def _text_to_chunks(text: str, max_chunk_length: int) -> list[str]:
if len(text) < max_chunk_length:
return [text]

paragraphs = _splitIntoParagraphs(text)
paragraphs = _split_into_paragraphs(text)
return list(_merge_chunks(paragraphs, "\n\n", max_chunk_length))


def _splitIntoParagraphs(text: str) -> list[str]:
def _split_into_paragraphs(text: str) -> list[str]:
return _remove_empty_strings(re.split(r"\n{2,}", text))


Expand Down
16 changes: 8 additions & 8 deletions src/typeagent/emails/email_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ def to_actions(self) -> list[kplib.Action]:
actions: list[kplib.Action] = []
if self.sender and self.recipients:
for recipient in self.recipients:
actions.extend(self._createActions("sent", self.sender, recipient))
actions.extend(self._createActions("received", recipient, self.sender))
actions.extend(self._create_actions("sent", self.sender, recipient))
actions.extend(self._create_actions("received", recipient, self.sender))
return actions

# Returns the knowledge entities for a given email address string
Expand Down Expand Up @@ -103,7 +103,7 @@ def _email_address_to_entities(
)
return entities

def _createActions(
def _create_actions(
self, verb: str, sender: str, recipient: str
) -> list[kplib.Action]:
sender_display_name, sender_address = parseaddr(sender)
Expand All @@ -121,15 +121,15 @@ def _add_actions_for_sender(
) -> None:
recipient_display_name, recipient_address = parseaddr(recipient)
if recipient_display_name:
actions.append(self._createAction(verb, sender, recipient_display_name))
actions.append(self._create_action(verb, sender, recipient_display_name))

if recipient_address:
actions.append(self._createAction(verb, sender, recipient_address))
actions.append(self._create_action(verb, sender, recipient_address))

def _createAction(
self, verb: str, sender: str, recipient: str, useIndirect: bool = True
def _create_action(
self, verb: str, sender: str, recipient: str, use_indirect: bool = True
) -> kplib.Action:
if useIndirect:
if use_indirect:
return kplib.Action(
verbs=[verb],
verb_tense="past",
Expand Down
2 changes: 1 addition & 1 deletion src/typeagent/knowpro/answer_context_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class RelevantMessage:
from_: Annotated[EntityNames | None, Doc("Sender(s) of the message")]
to: Annotated[EntityNames | None, Doc("Recipient(s) of the message")]
timestamp: Annotated[str | None, Doc("Timestamp of the message in ISO format")]
messageText: Annotated[str | list[str] | None, Doc("Text chunks in this message")]
message_text: Annotated[str | list[str] | None, Doc("Text chunks in this message")]


@dataclass
Expand Down
2 changes: 1 addition & 1 deletion src/typeagent/knowpro/answer_response_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class AnswerResponse:
"the answer to display if [ANSWER CONTEXT] is highly relevant and can be used to answer the user's question"
),
] = None
whyNoAnswer: Annotated[
why_no_answer: Annotated[
str | None,
Doc(
"If NoAnswer, explain why..\nparticularly explain why you didn't use any supplied entities"
Expand Down
18 changes: 10 additions & 8 deletions src/typeagent/knowpro/answers.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class AnswerContextOptions:
topics_top_k: int | None = None
messages_top_k: int | None = None
chunking: bool | None = None
debug: bool = False


async def generate_answers(
Expand Down Expand Up @@ -77,7 +78,7 @@ async def generate_answers(
combined_answer = AnswerResponse(type="Answered", answer=good_answers[0])
else:
combined_answer = AnswerResponse(
type="NoAnswer", whyNoAnswer="No good answers found."
type="NoAnswer", why_no_answer="No good answers found."
)
return all_answers, combined_answer

Expand All @@ -91,15 +92,16 @@ async def generate_answer[TMessage: IMessage, TIndex: ITermToSemanticRefIndex](
assert search_result.raw_query_text is not None, "Raw query text must not be None"
context = await make_context(search_result, conversation, options)
request = f"{create_question_prompt(search_result.raw_query_text)}\n\n{create_context_prompt(context)}"
# print("+" * 80)
# print(request)
# print("+" * 80)
if options and options.debug:
print("Stage 4 input:")
print(request)
print("-" * 50)
result = await translator.translate(request)
if isinstance(result, typechat.Failure):
return AnswerResponse(
type="NoAnswer",
answer=None,
whyNoAnswer=f"TypeChat failure: {result.message}",
why_no_answer=f"TypeChat failure: {result.message}",
)
else:
return result.value
Expand Down Expand Up @@ -230,7 +232,7 @@ async def get_relevant_messages_for_answer[
from_=metadata.source,
to=metadata.dest,
timestamp=msg.timestamp,
messageText=(
message_text=(
msg.text_chunks[0] if len(msg.text_chunks) == 1 else msg.text_chunks
),
)
Expand Down Expand Up @@ -553,7 +555,7 @@ async def combine_answers(
) -> AnswerResponse:
"""Combine multiple answers into a single answer."""
if not answers:
return AnswerResponse(type="NoAnswer", whyNoAnswer="No answers provided.")
return AnswerResponse(type="NoAnswer", why_no_answer="No answers provided.")
if len(answers) == 1:
return AnswerResponse(type="Answered", answer=answers[0])
request_parts = [
Expand All @@ -572,6 +574,6 @@ async def combine_answers(
request = "\n".join(request_parts)
result = await translator.translate(request)
if isinstance(result, typechat.Failure):
return AnswerResponse(type="NoAnswer", whyNoAnswer=result.message)
return AnswerResponse(type="NoAnswer", why_no_answer=result.message)
else:
return result.value
2 changes: 1 addition & 1 deletion src/typeagent/knowpro/conversation_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ async def query(

match combined_answer.type:
case "NoAnswer":
return f"No answer found: {combined_answer.whyNoAnswer or 'Unable to find relevant information'}"
return f"No answer found: {combined_answer.why_no_answer or 'Unable to find relevant information'}"
case "Answered":
return combined_answer.answer or "No answer provided"
case _: # Cannot happen in type-checked code
Expand Down
6 changes: 4 additions & 2 deletions src/typeagent/knowpro/convknowledge.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@
# TODO: Move ModelWrapper and create_typechat_model() to aitools package.


# TODO: Make this a parameter that can be configured (e.g. from command line).
DEFAULT_TIMEOUT_SECONDS = 30
# TODO: Make these parameters that can be configured (e.g. from command line).
DEFAULT_MAX_RETRY_ATTEMPTS = 0
DEFAULT_TIMEOUT_SECONDS = 25


class ModelWrapper(typechat.TypeChatLanguageModel):
Expand Down Expand Up @@ -52,6 +53,7 @@ def create_typechat_model() -> typechat.TypeChatLanguageModel:
env[key_name] = shared_token_provider.get_token()
model = typechat.create_language_model(env)
model.timeout_seconds = DEFAULT_TIMEOUT_SECONDS
model.max_retry_attempts = DEFAULT_MAX_RETRY_ATTEMPTS
if shared_token_provider is not None:
model = ModelWrapper(model, shared_token_provider)
return model
Expand Down
2 changes: 1 addition & 1 deletion src/typeagent/mcp/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ async def query_conversation(
match combined_answer.type:
case "NoAnswer":
return QuestionResponse(
success=False, answer=combined_answer.whyNoAnswer or "", time_used=dt
success=False, answer=combined_answer.why_no_answer or "", time_used=dt
)
case "Answered":
return QuestionResponse(
Expand Down
52 changes: 34 additions & 18 deletions tools/ingest_email.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
from pathlib import Path
import sys
import time
import traceback

import openai

from typeagent.aitools import utils
from typeagent.emails.email_import import decode_encoded_words, import_email_from_file
Expand Down Expand Up @@ -139,7 +142,7 @@ async def ingest_emails(
if verbose:
print("\nParsing and importing emails...")

successful_count = 0
success_count = 0
failed_count = 0
skipped_count = 0
start_time = time.time()
Expand Down Expand Up @@ -179,6 +182,14 @@ async def ingest_emails(

if verbose:
print(f" From: {decode_encoded_words(email.metadata.sender)}")
if email.metadata.recipients:
print(
f" To: {', '.join(decode_encoded_words(r) for r in email.metadata.recipients)}"
)
if email.metadata.cc:
print(
f" Cc: {', '.join(decode_encoded_words(r) for r in email.metadata.cc)}"
)
if email.metadata.subject:
print(
f" Subject: {decode_encoded_words(email.metadata.subject).replace('\n', '\\n')}"
Expand All @@ -194,33 +205,38 @@ async def ingest_emails(
print(f" {preview}")

# Pass source_id to mark as ingested atomically with the message
await email_memory.add_messages_with_indexing(
[email], source_ids=[str(email_file)]
) # This may raise, esp. if the knowledge extraction fails (see except below)
successful_count += 1
try:
await email_memory.add_messages_with_indexing(
[email], source_ids=[str(email_file)]
) # This may raise, esp. if the knowledge extraction fails (see except below)
success_count += 1
except openai.AuthenticationError as e:
if verbose:
traceback.print_exc()
sys.exit(f"Authentication error: {e!r}")

# Print progress periodically
if (i + 1) % batch_size == 0:
if (success_count + failed_count) % batch_size == 0:
elapsed = time.time() - start_time
semref_count = await semref_coll.size()
print(
f"\n[{i + 1}/{len(email_files)}] {successful_count} imported | "
f"{semref_count} refs | {elapsed:.1f}s elapsed\n"
f"\n[{i + 1}/{len(email_files)}] "
f"{success_count} imported | "
f"{failed_count} failed | "
f"{skipped_count} skipped | "
f"{semref_count} semrefs | "
f"{elapsed:.1f}s elapsed\n"
)

except Exception as e:
failed_count += 1
print(f"Error processing {email_file}: {e}", file=sys.stderr)
exc_name = (
e.__class__.__qualname__
if e.__class__.__module__ == "builtins"
else f"{e.__class__.__module__}.{e.__class__.__qualname__}"
)
print(f"Error processing {email_file}: {e!r:.150s}", file=sys.stderr)
mod = e.__class__.__module__
qual = e.__class__.__qualname__
exc_name = qual if mod == "builtins" else f"{mod}.{qual}"
async with storage_provider:
storage_provider.mark_source_ingested(str(email_file), exc_name)
if verbose:
import traceback

traceback.print_exc(limit=10)

# Final summary
Expand All @@ -229,7 +245,7 @@ async def ingest_emails(

print()
if verbose:
print(f"Successfully imported {successful_count} email(s)")
print(f"Successfully imported {success_count} email(s)")
if skipped_count:
print(f"Skipped {skipped_count} already-ingested email(s)")
if failed_count:
Expand All @@ -238,7 +254,7 @@ async def ingest_emails(
print(f"Total time: {elapsed:.1f}s")
else:
print(
f"Imported {successful_count} emails to {database} "
f"Imported {success_count} emails to {database} "
f"({semref_count} refs, {elapsed:.1f}s)"
)
if skipped_count:
Expand Down
21 changes: 15 additions & 6 deletions tools/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ async def cmd_stage(context: ProcessingContext, args: list[str]) -> None:
expected4 = (record4["answer"], not record4["hasNoAnswer"])
match combined_answer.type:
case "NoAnswer":
actual4 = (combined_answer.whyNoAnswer or "", False)
actual4 = (combined_answer.why_no_answer or "", False)
case "Answered":
actual4 = (combined_answer.answer or "", True)
await compare_answers(context, expected4, actual4)
Expand All @@ -419,7 +419,7 @@ async def cmd_stage(context: ProcessingContext, args: list[str]) -> None:
prsep()

if combined_answer.type == "NoAnswer":
print(Fore.RED + f"Failure: {combined_answer.whyNoAnswer}" + Fore.RESET)
print(Fore.RED + f"Failure: {combined_answer.why_no_answer}" + Fore.RESET)
else:
print(Fore.GREEN + f"{combined_answer.answer}" + Fore.RESET)
prsep()
Expand Down Expand Up @@ -564,6 +564,12 @@ async def main():
sr_list, sr_index = load_index_file(
args.srfile, "searchText", SearchResultData, args.verbose
)
if args.batch:
args.history_size = 0
if not ar_list:
raise SystemExit("Error: non-empty --qafile required for batch mode.")
if not sr_list:
raise SystemExit("Error: non-empty --srfile required for batch mode.")

model = convknowledge.create_typechat_model()
query_translator = utils.create_translator(model, search_query_schema.SearchQuery)
Expand Down Expand Up @@ -869,6 +875,7 @@ async def process_query(context: ProcessingContext, query_text: str) -> float |
print("Stage 3 diff unavailable")
prsep()

context.answer_context_options.debug = context.debug4 == "full"
all_answers, combined_answer = await answers.generate_answers(
context.answer_translator,
search_results,
Expand All @@ -881,14 +888,14 @@ async def process_query(context: ProcessingContext, query_text: str) -> float |
if combined_answer.type == "Answered":
context.history.add(query_text, combined_answer.answer or "", True)
else:
context.history.add(query_text, combined_answer.whyNoAnswer or "", False)
context.history.add(query_text, combined_answer.why_no_answer or "", False)

if context.debug4 == "full":
utils.pretty_print(all_answers)
prsep()
if context.debug4 in ("full", "nice"):
if combined_answer.type == "NoAnswer":
print(Fore.RED + f"Failure: {combined_answer.whyNoAnswer}" + Fore.RESET)
print(Fore.RED + f"Failure: {combined_answer.why_no_answer}" + Fore.RESET)
else:
print(Fore.GREEN + f"{combined_answer.answer}" + Fore.RESET)
prsep()
Expand All @@ -899,7 +906,7 @@ async def process_query(context: ProcessingContext, query_text: str) -> float |
print("Stage 4 diff:")
match combined_answer.type:
case "NoAnswer":
actual4 = (combined_answer.whyNoAnswer or "", False)
actual4 = (combined_answer.why_no_answer or "", False)
case "Answered":
actual4 = (combined_answer.answer or "", True)
score = await compare_answers(context, expected4, actual4)
Expand All @@ -911,7 +918,9 @@ async def process_query(context: ProcessingContext, query_text: str) -> float |
else:
print("Stage 4 diff unavailable; nice answer:")
if combined_answer.type == "NoAnswer":
print(Fore.RED + f"Failure: {combined_answer.whyNoAnswer}" + Fore.RESET)
print(
Fore.RED + f"Failure: {combined_answer.why_no_answer}" + Fore.RESET
)
else:
print(Fore.GREEN + f"{combined_answer.answer}" + Fore.RESET)
prsep()
Expand Down
Loading