Skip to content

Commit

Permalink
Regression fixes: LLM logging; client readiness (EventStreamRuntime) (A…
Browse files Browse the repository at this point in the history
…ll-Hands-AI#3776)

* Regression fixes: LLM logging; client readiness (EventStreamRuntime)

* fix llm.async_completion_wrapper bad edit in previous commit

* regen couple of mock files

* client: always log initialized status
  • Loading branch information
tobitege authored Sep 9, 2024
1 parent 50dc17c commit 5ffff74
Show file tree
Hide file tree
Showing 28 changed files with 1,349 additions and 158 deletions.
109 changes: 48 additions & 61 deletions openhands/llm/llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,32 +192,8 @@ def wrapper(*args, **kwargs):
else:
messages = args[1] if len(args) > 1 else []

# log the prompt
debug_message = ''
for message in messages:
debug_str = '' # helper to prevent empty messages
content = message['content']

if isinstance(content, list):
for element in content:
if isinstance(element, dict):
if 'text' in element:
debug_str = element['text'].strip()
elif (
self.vision_is_active()
and 'image_url' in element
and 'url' in element['image_url']
):
debug_str = element['image_url']['url']
else:
debug_str = str(element)
else:
debug_str = str(element)
else:
debug_str = str(content)

if debug_str:
debug_message += message_separator + debug_str
# this serves to prevent empty messages and logging the messages
debug_message = self._get_debug_message(messages)

if self.is_caching_prompt_active():
# Anthropic-specific prompt caching
Expand All @@ -236,11 +212,11 @@ def wrapper(*args, **kwargs):

# log the response
message_back = resp['choices'][0]['message']['content']
if message_back:
llm_response_logger.debug(message_back)

llm_response_logger.debug(message_back)

# post-process to log costs
self._post_completion(resp)
# post-process to log costs
self._post_completion(resp)

return resp

Expand Down Expand Up @@ -276,36 +252,10 @@ async def async_completion_wrapper(*args, **kwargs):
if 'messages' in kwargs:
messages = kwargs['messages']
else:
messages = args[1]

# log the prompt
debug_message = ''
for message in messages:
content = message['content']

if isinstance(content, list):
for element in content:
if isinstance(element, dict):
if 'text' in element:
debug_str = element['text']
elif (
self.vision_is_active()
and 'image_url' in element
and 'url' in element['image_url']
):
debug_str = element['image_url']['url']
else:
debug_str = str(element)
else:
debug_str = str(element)

debug_message += message_separator + debug_str
else:
debug_str = str(content)

debug_message += message_separator + debug_str
messages = args[1] if len(args) > 1 else []

llm_prompt_logger.debug(debug_message)
# this serves to prevent empty messages and logging the messages
debug_message = self._get_debug_message(messages)

async def check_stopped():
while True:
Expand All @@ -321,7 +271,12 @@ async def check_stopped():

try:
# Directly call and await litellm_acompletion
resp = await async_completion_unwrapped(*args, **kwargs)
if debug_message:
llm_prompt_logger.debug(debug_message)
resp = await async_completion_unwrapped(*args, **kwargs)
else:
logger.debug('No completion messages!')
resp = {'choices': [{'message': {'content': ''}}]}

# skip if messages is empty (thus debug_message is empty)
if debug_message:
Expand Down Expand Up @@ -370,7 +325,7 @@ async def async_acompletion_stream_wrapper(*args, **kwargs):
if 'messages' in kwargs:
messages = kwargs['messages']
else:
messages = args[1]
messages = args[1] if len(args) > 1 else []

# log the prompt
debug_message = ''
Expand Down Expand Up @@ -422,6 +377,38 @@ async def async_acompletion_stream_wrapper(*args, **kwargs):
self._async_completion = async_completion_wrapper # type: ignore
self._async_streaming_completion = async_acompletion_stream_wrapper # type: ignore

def _get_debug_message(self, messages):
if not messages:
return ''

messages = messages if isinstance(messages, list) else [messages]
return message_separator.join(
self._format_message_content(msg) for msg in messages if msg['content']
)

def _format_message_content(self, message):
content = message['content']
if isinstance(content, list):
return self._format_list_content(content)
return str(content)

def _format_list_content(self, content_list):
return '\n'.join(
self._format_content_element(element) for element in content_list
)

def _format_content_element(self, element):
if isinstance(element, dict):
if 'text' in element:
return element['text']
if (
self.vision_is_active()
and 'image_url' in element
and 'url' in element['image_url']
):
return element['image_url']['url']
return str(element)

async def _call_acompletion(self, *args, **kwargs):
return await litellm.acompletion(*args, **kwargs)

Expand Down
4 changes: 3 additions & 1 deletion openhands/runtime/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ async def ainit(self):
logger.info(f'AgentSkills initialized: {obs}')

await self._init_bash_commands()
logger.info('Runtime client initialized.')

def _init_user(self, username: str, user_id: int) -> None:
"""Create user if not exists."""
Expand Down Expand Up @@ -515,7 +516,6 @@ async def lifespan(app: FastAPI):
browsergym_eval_env=args.browsergym_eval_env,
)
await client.ainit()
logger.info('Runtime client initialized.')
yield
# Clean up & release the resources
client.close()
Expand Down Expand Up @@ -722,5 +722,7 @@ async def list_files(request: Request):
logger.error(f'Error listing files: {e}', exc_info=True)
return []

logger.info('Runtime client initialized.')

logger.info(f'Starting action execution API on port {args.port}')
run(app, host='0.0.0.0', port=args.port)
36 changes: 17 additions & 19 deletions openhands/runtime/client/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ class LogBuffer:
"""

def __init__(self, container: docker.models.containers.Container):
self.client_ready = False
self.init_msg = 'Runtime client initialized.'

self.buffer: list[str] = []
self.lock = threading.Lock()
self.log_generator = container.logs(stream=True, follow=True)
Expand Down Expand Up @@ -77,9 +80,12 @@ def stream_logs(self):
if self._stop_event.is_set():
break
if log_line:
self.append(log_line.decode('utf-8').rstrip())
decoded_line = log_line.decode('utf-8').rstrip()
self.append(decoded_line)
if self.init_msg in decoded_line:
self.client_ready = True
except Exception as e:
logger.error(f'Error in stream_logs: {e}')
logger.error(f'Error streaming docker logs: {e}')

def __del__(self):
if self.log_stream_thread.is_alive():
Expand Down Expand Up @@ -129,7 +135,6 @@ def __init__(

# Buffer for container logs
self.log_buffer: LogBuffer | None = None
self.startup_done = False

if self.config.sandbox.runtime_extra_deps:
logger.info(
Expand Down Expand Up @@ -249,51 +254,44 @@ def _init_container(
reraise=(ConnectionRefusedError,),
)
def _wait_until_alive(self):
init_msg = 'Runtime client initialized.'
logger.debug('Getting container logs...')

# Print and clear the log buffer
assert (
self.log_buffer is not None
), 'Log buffer is expected to be initialized when container is started'

# Always process logs, regardless of startup_done status
# Always process logs, regardless of client_ready status
logs = self.log_buffer.get_and_clear()
if logs:
formatted_logs = '\n'.join([f' |{log}' for log in logs])
logger.info(
'\n'
+ '-' * 30
+ '-' * 35
+ 'Container logs:'
+ '-' * 30
+ '-' * 35
+ f'\n{formatted_logs}'
+ '\n'
+ '-' * 90
+ '-' * 80
)
# Check for initialization message even if startup_done is True
if any(init_msg in log for log in logs):
self.startup_done = True

if not self.startup_done:
if not self.log_buffer.client_ready:
attempts = 0
while not self.startup_done and attempts < 10:
while not self.log_buffer.client_ready and attempts < 5:
attempts += 1
time.sleep(1)
logs = self.log_buffer.get_and_clear()
if logs:
formatted_logs = '\n'.join([f' |{log}' for log in logs])
logger.info(
'\n'
+ '-' * 30
+ '-' * 35
+ 'Container logs:'
+ '-' * 30
+ '-' * 35
+ f'\n{formatted_logs}'
+ '\n'
+ '-' * 90
+ '-' * 80
)
if any(init_msg in log for log in logs):
self.startup_done = True
break

response = self.session.get(f'{self.api_url}/alive')
if response.status_code == 200:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@


----------

# Instructions
Review the current state of the page and all other information to find the best
possible next action to accomplish your goal. Your answer will be interpreted
Expand Down Expand Up @@ -111,7 +107,6 @@ Don't execute multiple actions at once if you need feedback from the page.




----------

# Current Accessibility Tree:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@


----------

# Instructions
Review the current state of the page and all other information to find the best
possible next action to accomplish your goal. Your answer will be interpreted
Expand Down Expand Up @@ -111,7 +107,6 @@ Don't execute multiple actions at once if you need feedback from the page.




----------

# Current Accessibility Tree:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@


----------

# Instructions
Review the current state of the page and all other information to find the best
possible next action to accomplish your goal. Your answer will be interpreted
Expand Down Expand Up @@ -111,7 +107,6 @@ Don't execute multiple actions at once if you need feedback from the page.




----------

# Current Accessibility Tree:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
To accomplish the goal of browsing localhost:8000 and finding the ultimate answer to life, I need to first navigate to the specified URL.

```
goto("http://localhost:8000"
```goto("http://localhost:8000"
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@


----------

A chat between a curious user and an artificial intelligence assistant. The assistant gives helpful, detailed answers to the user's questions.
The assistant can use a Python environment with <execute_ipython>, e.g.:
<execute_ipython>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@


----------

# Instructions
Review the current state of the page and all other information to find the best
possible next action to accomplish your goal. Your answer will be interpreted
and executed by a program, make sure to follow the formatting instructions.

# Goal:
Certainly! I'll browse localhost:8000 and retrieve the ultimate answer to life for you.. I should start with: Get the content on "http://localhost:8000"
. I should start with: Get the content on "http://localhost:8000"

# Action Space

Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@


----------

# Instructions
Review the current state of the page and all other information to find the best
possible next action to accomplish your goal. Your answer will be interpreted
and executed by a program, make sure to follow the formatting instructions.

# Goal:
Certainly! I'll browse localhost:8000 and retrieve the ultimate answer to life for you.. I should start with: Get the content on "http://localhost:8000"
. I should start with: Get the content on "http://localhost:8000"

# Action Space

Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@


----------

# Instructions
Review the current state of the page and all other information to find the best
possible next action to accomplish your goal. Your answer will be interpreted
and executed by a program, make sure to follow the formatting instructions.

# Goal:
Certainly! I'll browse localhost:8000 and retrieve the ultimate answer to life for you.. I should start with: Get the content on "http://localhost:8000"
. I should start with: Get the content on "http://localhost:8000"

# Action Space

Expand Down
Loading

0 comments on commit 5ffff74

Please sign in to comment.