-
Notifications
You must be signed in to change notification settings - Fork 510
feat: implement parallel streaming output rails execution #1263
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
eb420fa
to
c33a0b7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds parallel streaming capabilities to the output rails execution path, improving throughput and allowing early termination when any rail blocks content.
- Introduce a
parallel
flag in theOutputRails
config and wire it through the LLM pipeline. - Implement a new
_run_output_rails_in_parallel_streaming
action that runs rails concurrently with early-stop on block, and register it at runtime. - Extend the test suite with comprehensive scenarios covering allowed flows, blocking, performance, error handling, and default behavior.
Reviewed Changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 2 comments.
Show a summary per file
File | Description |
---|---|
nemoguardrails/rails/llm/config.py | Added parallel flag to OutputRails config. |
nemoguardrails/rails/llm/llmrails.py | Integrated parallel streaming logic: context prep, dispatch call, and error handling. |
nemoguardrails/colang/v1_0/runtime/runtime.py | Implemented _run_output_rails_in_parallel_streaming action. |
nemoguardrails/colang/runtime.py | Registered the new parallel streaming action in the dispatcher. |
tests/test_parallel_streaming_output_rails.py | Added extensive tests covering parallel streaming, blocking, and performance. |
Comments suppressed due to low confidence (1)
nemoguardrails/rails/llm/llmrails.py:334
- After canceling pending tasks, await them (for example via
await asyncio.gather(*tasks, return_exceptions=True)
) to ensure proper cleanup and avoidTask was destroyed but it is pending!
warnings.
)
context.update(context_message["content"]) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updating the context dict with a string (context_message["content"]
) will iterate over its characters; instead assign the content under a specific key or ensure the updated value is a mapping.
context.update(context_message["content"]) | |
if isinstance(context_message.get("content"), dict): | |
context.update(context_message["content"]) | |
else: | |
log.warning("context_message['content'] is not a dictionary and will be ignored.") |
Copilot uses AI. Check for mistakes.
@@ -1346,6 +1346,32 @@ def _get_latest_user_message( | |||
return message | |||
return {} | |||
|
|||
def _prepare_context_for_parallel_rails( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] Defining helper functions (_prepare_context_for_parallel_rails
, _create_events_for_chunk
) inside a method can make the code harder to navigate; consider extracting them to module-level helpers.
Copilot uses AI. Check for mistakes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good to me. Can you run a local "integration test" where you enable streaming and run parallel input and output rails through the 3 Nemoguard NVCF models to check the responses make sense and everything works correctly? Please paste in the outputs, latency measurements, etc into the PR comments
|
||
# cancel remaining tasks | ||
for pending_task in tasks: | ||
if not pending_task.done(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we try and cancel a task that's already done, will that throw an exception? A task could finish in between the pending_task.done()
and pending_task.cancel()
calls. We need to be able to cancel a task that's already done and no Exceptions to be thrown
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we try and cancel a task that's already done, will that throw an exception?
calling cancel()
on a task that is already done or cancelled will not throw an exception. It simply returns False
and has no effect. There is no need to guard cancel()
with a try/except for this reason.
but this is a point good we might want to document.
A task could finish in between the
pending_task.done()
andpending_task.cancel()
calls
it shows a race condition which is the expected behavior, but we can log a warning:
For example
- rails A, B, C are running in parallel
- rail A completes and says "blocked"
- while we're processing A's result rail B also completes and says "blocked"
- We cancel rail C (which is still pending)
- We only report that A blocked, not B
based on our assumptions:
- not a bug: multiple rails might detect violations, but we only need to know that at least one violation occurred
- expected behavior: first violation detected stops the stream
- correct outcome: content is blocked regardless of which rail detected it first
so for content modifying rails one should use sequential execution.
for read only validation rails parallel execution is fine.
PERFORMANCE COMPARISON SUMMARYConfig 1config:
Output Rails:
Latency Improvements (First Chunk): Total Duration Comparison: Rail Execution Details:
Config 2 (Lower chunk size higher output rails invocation)config:
Output Rails:
Latency Improvements (First Chunk): Total Duration Comparison: Rail Execution Details:
|
- Add _run_output_rails_in_parallel_streaming method to run output rails concurrently - Use asyncio tasks to execute multiple rails simultaneously during streaming - Implement early termination when any rail blocks content to optimize performance - Register the new action in the runtime dispatcher - Add proper error handling and cancellation for robust parallel execution - Avoid full flow state management issues that can occur with hide_prev_turn logic during streaming - Add comprehensive tests for parallel streaming functionality parallel rails accept flow params dict
c33a0b7
to
4cb2762
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## develop #1263 +/- ##
===========================================
+ Coverage 69.78% 69.83% +0.04%
===========================================
Files 161 161
Lines 16057 16135 +78
===========================================
+ Hits 11206 11268 +62
- Misses 4851 4867 +16
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding the local performance test values, this looks great.
This PR implements streaming for parallel output rails execution
requires #1259