-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
122 lines (80 loc) · 3.55 KB
/
utils.py
File metadata and controls
122 lines (80 loc) · 3.55 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
from trace_logger import Step, StepType
def summarize_step(step: Step) -> str:
if step.step_type == StepType.REASONING:
return f"[Reasoning] {step.text}"
elif step.step_type == StepType.TOOL_CALL:
args_str = str(step.tool_args) if step.tool_args else ""
return f"[Tool Call] {step.tool_name}({args_str})"
elif step.step_type == StepType.LLM_RESPONSE:
return f"[LLM Response] {step.text}"
elif step.step_type == StepType.TOOL_RESPONSE:
output_str = str(step.tool_output)
return f"[Tool Response] {output_str}"
elif step.step_type == StepType.MEMORY_ACCESS:
return f"[Memory Access] Key: {step.memory_key}, Value: {step.memory_value}"
elif step.step_type == StepType.ENVIRONMENT_ACTION:
return f"[Environment Action] {step.action}"
elif step.step_type == StepType.ENVIRONMENT_OBSERVATION:
return f"[Environment Observation] {step.observation}"
elif step.step_type == StepType.FINAL_ANSWER:
return f"[Final Answer] {step.text}"
else:
return f"[Unknown Step Type] {step.step_type}"
def extract_step_text(step: Step) -> str:
if step.step_type == StepType.REASONING:
return step.text or ""
elif step.step_type == StepType.TOOL_CALL:
# For tool calls, format as: tool_name(args)
args_str = str(step.tool_args) if step.tool_args else ""
return f"{step.tool_name}({args_str})"
elif step.step_type == StepType.TOOL_RESPONSE:
return str(step.tool_output) if step.tool_output else ""
elif step.step_type == StepType.MEMORY_ACCESS:
return f"Memory[{step.memory_key}] = {step.memory_value}"
elif step.step_type == StepType.ENVIRONMENT_ACTION:
return step.action or ""
elif step.step_type == StepType.ENVIRONMENT_OBSERVATION:
return step.observation or ""
elif step.step_type == StepType.FINAL_ANSWER:
return step.text or ""
return ""
def format_step_context(step: Step, context: str) -> str:
step_summary = summarize_step(step)
if context:
return f"""Step ID: {step.step_id}
Type: {step.step_type.value}
Content: {step_summary}
Context from dependencies:
{context}"""
else:
return f"""Step ID: {step.step_id}
Type: {step.step_type.value}
Content: {step_summary}"""
def calculate_text_similarity(text1: str, text2: str) -> float:
# Simple word-based tokenization
tokens1 = set[str](text1.lower().split())
tokens2 = set[str](text2.lower().split())
if not tokens1 and not tokens2:
return 1.0
if not tokens1 or not tokens2:
return 0.0
# Jaccard similarity
intersection = len(tokens1 & tokens2)
union = len(tokens1 | tokens2)
return intersection / union if union > 0 else 0.0
def calculate_minimality_score(original: str, modified: str) -> float:
if original == modified:
return 1.0
original_tokens = original.lower().split()
modified_tokens = modified.lower().split()
if not original_tokens:
return 0.0 if modified_tokens else 1.0
# Calculate the proportion of unchanged tokens
max_len = max(len(original_tokens), len(modified_tokens))
# Count matching tokens (simple approach)
matches = sum(1 for i in range(min(len(original_tokens), len(modified_tokens)))
if original_tokens[i] == modified_tokens[i])
# Additional penalty for length difference
len_diff_penalty = abs(len(original_tokens) - len(modified_tokens)) / max_len
minimality = (matches / max_len) * (1 - len_diff_penalty * 0.5)
return max(0.0, min(1.0, minimality))