Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 26 additions & 3 deletions src/vulcanai/console/console.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
from prompt_toolkit import PromptSession
from rich.progress import Progress, SpinnerColumn, TextColumn
from vulcanai.models.model import IModelHooks
from vulcanai.console.logger import console
from vulcanai.console.logger import console, VulcanAILogger




class SpinnerHook(IModelHooks):
Expand Down Expand Up @@ -77,6 +79,23 @@ def __init__(self, model: str = "gpt-5-nano", k: int = 7, iterative: bool = Fals
except Exception:
pass

current_path = os.path.dirname(os.path.abspath(__file__))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to be careful with this. __file__ will point to the installation path if installed without using -e argument when installing the package. So we need to either install the default_tools.py file in the same relative path from the installed file or to find another method to add the tools. Maybe we can install them as a module with the package and just load them directly

self.manager.register_tools_from_file(f"{current_path}/../tools/default_tools.py")

self.manager.bb["console"] = self
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still need to review Textualize PR, but we might consider adding a blocklist based on the keys of the blackboard when giving the Iterative Manager the bb to generate context. In this way we could avoid adding extra tokens to the query that provide no info, same with the Node already being shared.

self.logger = VulcanAILogger.log_manager
self.stream_task = None


def set_stream_task(self, input_stream):
"""
Function used in the tools to set the current streaming task.
with this variable the user can finish the execution of the
task by using the signal "Ctrl + C"
"""

self.stream_task = input_stream

def run(self):
self.print("VulcanAI Interactive Console")
self.print("Type 'exit' to quit.\n")
Expand Down Expand Up @@ -110,8 +129,12 @@ def run(self):
self.print(f"Output of plan: {result.get('blackboard', {None})}")

except KeyboardInterrupt:
console.print("[yellow]Exiting...[/yellow]")
break
if self.stream_task == None:
console.print("[yellow]Exiting...[/yellow]")
break
else:
self.stream_task.cancel() # triggers CancelledError in the task
self.stream_task = None
except EOFError:
console.print("[yellow]Exiting...[/yellow]")
break
Expand Down
153 changes: 153 additions & 0 deletions src/vulcanai/console/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
# Copyright 2025 Proyectos y Sistemas de Mantenimiento SL (eProsima).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import asyncio
import subprocess
import threading
import time
# To remove possible errors in textual terminal
# Subscribed to [/turtle1/pose] -> Subscribed to \[/turtle1/pose]
from textual.markup import escape

# Create a global background loop once
_background_loop = asyncio.new_event_loop()
_thread = threading.Thread(
target=_background_loop.run_forever,
daemon=True,
)
_thread.start()


async def run_streaming_cmd_async(console, args: list[str],
max_duration: float = 60,
max_lines: int = 1000,
echo: bool = True) -> str:

# Unpack the command
cmd, *cmd_args = args

# Create the subprocess
process = await asyncio.create_subprocess_exec(
cmd,
*cmd_args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
)

assert process.stdout is not None

start_time = time.monotonic()
line_count = 0

try:
# Subprocess main loop. Read line by line
async for raw_line in process.stdout:
line = raw_line.decode(errors="ignore").rstrip("\n")

# Print the line
if echo:
line_processed = escape(line)
console.logger(line_processed)

# Count the line
line_count += 1
if max_lines is not None and line_count >= max_lines:
console.logger(
f"[yellow]Stopping: [bold]reached max_lines = {max_lines}[/bold][/yellow]"
)
console.set_stream_task(None)
process.terminate()
break

# Check duration
if max_duration and (time.monotonic() - start_time) >= max_duration:
console.logger(
f"[yellow]Stopping: [bold]exceeded max_duration = {max_duration}s[/bold] [/yellow]"
)
console.set_stream_task(None)
process.terminate()
break


except asyncio.CancelledError:
# Task was cancelled → stop the subprocess
console.logger("[yellow][bold]Cancellation received:[/bold] terminating subprocess...[/yellow]")
process.terminate()
raise
# Not necessary, textual terminal get the keyboard input
except KeyboardInterrupt:
# Ctrl+C pressed → stop subprocess
console.logger("[yellow][bold]Ctrl+C received:[/bold] terminating subprocess...[/yellow]")
process.terminate()

finally:
try:
await asyncio.wait_for(process.wait(), timeout=3.0)
except asyncio.TimeoutError:
console.logger("Subprocess didn't exit in time → killing it.")
process.kill()
await process.wait()

return "Process stopped due to Ctrl+C"


def execute_subprocess(console, base_args, max_duration, max_lines):

stream_task = None

async def launcher() -> None:
try:
await run_streaming_cmd_async(
console,
base_args,
max_duration=max_duration,
max_lines=max_lines,
)
except Exception as e:
console.logger(f"Echo task error: {e!r}\n")

# Schedule coroutine in background loop
future = asyncio.run_coroutine_threadsafe(launcher(), _background_loop)

def _on_done(task: asyncio.Task) -> None:

if task.cancelled():
# Normal path -> dont log as an error
return

try:
task.result()
except Exception as e:
console.logger(f"Echo task error: {e!r}\n")
return

future.add_done_callback(_on_done)

# Add the coroutine to the console.
# Used to cancel the execution of the subprocess
# with the signal "Ctrl + C" in the terminal
console.set_stream_task(future)


def run_oneshot_cmd(args: list[str]) -> str:
try:
return subprocess.check_output(
args,
stderr=subprocess.STDOUT,
text=True
)

except subprocess.CalledProcessError as e:
raise Exception(f"Failed to run '{' '.join(args)}': {e.output}")
Loading