Skip to content

feat(streaming): support external async token generators #1286

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 67 additions & 1 deletion docs/user-guides/advanced/streaming.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,73 @@ result = await app.generate_async(
print(result)
```

For the complete working example, check out this [demo script](https://github.com/NVIDIA/NeMo-Guardrails/tree/develop/examples/scripts/demo_streaming.py).
### Using External Token Generators

You can also provide your own async generator that yields tokens, which is useful when:

- You want to use a different LLM provider that has its own streaming API
- You have pre-generated responses that you want to stream through guardrails
- You want to implement custom token generation logic
- You want to test your output rails or its config in streaming mode wihtout relying on an LLM which generates stochastic outputs.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- You want to test your output rails or its config in streaming mode wihtout relying on an LLM which generates stochastic outputs.
- You want to test your output rails or its config in streaming mode on predefined responses without actually relying on an actual LLM generation.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Corrected a mistype and also changed the wording. Is this the correct usage, on predefined / given assistant responses only with output rails?


To use an external generator, pass it to the `generator` parameter of `stream_async`:

```python
from nemoguardrails import LLMRails
from typing import AsyncIterator

app = LLMRails(config)

async def my_token_generator() -> AsyncIterator[str]:
# this could be from OpenAI, Anthropic, or any other source
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# this could be from OpenAI, Anthropic, or any other source
# This could be from OpenAI API, Anthropic API, or any other LLM API that already has a streaming token generator. Mocking the stream here, for a simple example.

tokens = ["Hello", " ", "world", "!"]
for token in tokens:
yield token

# use the external generator with guardrails
async for chunk in app.stream_async(
messages=history,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing history - can we put a simple example?

generator=my_token_generator()
):
print(f"CHUNK: {chunk}")
```

When using an external generator:

- The internal LLM generation is completely bypassed
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- The internal LLM generation is completely bypassed
- The internal LLM generation in the Guardrails runtime is completely bypassed, the LLM responses are given by the external generator

- Output rails are still applied if configured
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- Output rails are still applied if configured
- Output rails are still applied to the LLM responses returned by the external generator, if configured

- The generator should yield string tokens

Example with a real LLM API:

```python
async def openai_streaming_generator(messages) -> AsyncIterator[str]:
"""Example using OpenAI's streaming API."""
import openai

stream = await openai.ChatCompletion.create(
model="gpt-4o",
messages=messages,
stream=True
)

# Yield tokens as they arrive
async for chunk in stream:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content

config = RailsConfig.from_path("config/with_output_rails")
app = LLMRails(config)

async for chunk in app.stream_async(
messages=[{"role": "user", "content": "Tell me a story"}],
generator=openai_streaming_generator(messages)
):
# output rails will be applied to these chunks
print(chunk, end="", flush=True)
```

This feature enables seamless integration of NeMo Guardrails with any streaming LLM or token source while maintaining all the safety features of output rails.

### Server API

Expand Down
13 changes: 13 additions & 0 deletions nemoguardrails/rails/llm/llmrails.py
Original file line number Diff line number Diff line change
Expand Up @@ -1053,8 +1053,21 @@ def stream_async(
options: Optional[Union[dict, GenerationOptions]] = None,
state: Optional[Union[dict, State]] = None,
include_generation_metadata: Optional[bool] = False,
generator: Optional[AsyncIterator[str]] = None,
) -> AsyncIterator[str]:
"""Simplified interface for getting directly the streamed tokens from the LLM."""

# if an external generator is provided, use it directly
if generator:
if self.config.rails.output.streaming.enabled:
return self._run_output_rails_in_streaming(
streaming_handler=generator,
messages=messages,
prompt=prompt,
)
else:
return generator

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this still be used with explain / generation options?

self.explain_info = self._ensure_explain_info()

streaming_handler = StreamingHandler(
Expand Down
255 changes: 254 additions & 1 deletion tests/test_streaming_output_rails.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

import asyncio
import json
import math
from json.decoder import JSONDecodeError
from typing import AsyncIterator

import pytest

Expand Down Expand Up @@ -250,3 +250,256 @@ async def test_streaming_output_rails_default_config_not_blocked_at_start(
json.loads(chunks[0])

await asyncio.gather(*asyncio.all_tasks() - {asyncio.current_task()})


async def simple_token_generator() -> AsyncIterator[str]:
"""Simple generator that yields tokens."""
tokens = ["Hello", " ", "world", "!"]
for token in tokens:
yield token


async def offensive_token_generator() -> AsyncIterator[str]:
"""Generator that yields potentially offensive content."""

tokens = ["This", " ", "is", " ", "offensive", " ", "content", " ", "idiot", "!"]
for token in tokens:
yield token


@pytest.mark.asyncio
async def test_external_generator_without_output_rails():
"""Test that external generator works without output rails."""
config = RailsConfig.from_content(
config={
"models": [],
"rails": {},
"streaming": True,
}
)

rails = LLMRails(config)

tokens = []
async for token in rails.stream_async(generator=simple_token_generator()):
tokens.append(token)

assert tokens == ["Hello", " ", "world", "!"]
assert "".join(tokens) == "Hello world!"


@pytest.mark.asyncio
async def test_external_generator_with_output_rails_allowed():
"""Test that external generator works with output rails that allow content."""
config = RailsConfig.from_content(
config={
"models": [],
"rails": {
"output": {
"flows": ["self check output"],
"streaming": {
"enabled": True,
"chunk_size": 4,
"context_size": 2,
"stream_first": False,
},
}
},
"streaming": True,
"prompts": [
{"task": "self_check_output", "content": "Check: {{ bot_response }}"}
],
},
colang_content="""
define flow self check output
execute self_check_output
""",
)

rails = LLMRails(config)

@action(name="self_check_output")
async def self_check_output(**kwargs):
return True

rails.register_action(self_check_output, "self_check_output")

tokens = []
async for token in rails.stream_async(
generator=simple_token_generator(),
messages=[{"role": "user", "content": "Hello"}],
):
tokens.append(token)

assert tokens == ["Hello", " ", "world", "!"]


@pytest.mark.asyncio
async def test_external_generator_with_output_rails_blocked():
"""Test that external generator content can be blocked by output rails."""
config = RailsConfig.from_content(
config={
"models": [],
"rails": {
"output": {
"flows": ["self check output"],
"streaming": {
"enabled": True,
"chunk_size": 6,
"context_size": 2,
"stream_first": False,
},
}
},
"streaming": True,
"prompts": [
{"task": "self_check_output", "content": "Check: {{ bot_response }}"}
],
},
colang_content="""
define flow self check output
execute self_check_output
""",
)

rails = LLMRails(config)

@action(name="self_check_output")
async def self_check_output(**kwargs):
bot_message = kwargs.get(
"bot_message", kwargs.get("context", {}).get("bot_message", "")
)
# block if message contains "offensive" or "idiot"
if "offensive" in bot_message.lower() or "idiot" in bot_message.lower():
return False
return True

rails.register_action(self_check_output, "self_check_output")

tokens = []
error_received = False

async for token in rails.stream_async(
generator=offensive_token_generator(),
messages=[{"role": "user", "content": "Generate something"}],
):
if isinstance(token, str) and token.startswith('{"error"'):
error_received = True
break
tokens.append(token)

assert error_received, "Expected to receive an error JSON when content is blocked"
assert len(tokens) == 0


@pytest.mark.asyncio
async def test_external_generator_with_custom_llm():
"""Test using external generator as a custom LLM replacement."""

async def custom_llm_generator(messages):
"""Simulate a custom LLM that generates based on input."""

user_message = messages[-1]["content"] if messages else ""

if "weather" in user_message.lower():
response = "The weather is sunny today!"
elif "name" in user_message.lower():
response = "I am an AI assistant."
else:
response = "I can help you with that."

for token in response.split(" "):
yield token + " "

config = RailsConfig.from_content(
config={
"models": [],
"rails": {},
"streaming": True,
}
)

rails = LLMRails(config)

messages = [{"role": "user", "content": "What's the weather?"}]
tokens = []

async for token in rails.stream_async(
generator=custom_llm_generator(messages), messages=messages
):
tokens.append(token)

result = "".join(tokens).strip()
assert result == "The weather is sunny today!"


@pytest.mark.asyncio
async def test_external_generator_empty_stream():
"""Test that empty generator streams work correctly."""

async def empty_generator():
if False:
yield

config = RailsConfig.from_content(
config={
"models": [],
"rails": {},
"streaming": True,
}
)

rails = LLMRails(config)

tokens = []
async for token in rails.stream_async(generator=empty_generator()):
tokens.append(token)

assert tokens == []


@pytest.mark.asyncio
async def test_external_generator_single_chunk():
"""Test generator that yields a single large chunk."""

async def single_chunk_generator():
yield "This is a complete response in a single chunk."

config = RailsConfig.from_content(
config={
"models": [],
"rails": {
"output": {
"flows": ["self check output"],
"streaming": {
"enabled": True,
"chunk_size": 10,
"context_size": 5,
"stream_first": True,
},
}
},
"streaming": True,
"prompts": [
{"task": "self_check_output", "content": "Check: {{ bot_response }}"}
],
},
colang_content="""
define flow self check output
execute self_check_output
""",
)

rails = LLMRails(config)

@action(name="self_check_output")
async def self_check_output(**kwargs):
return True

rails.register_action(self_check_output, "self_check_output")

tokens = []
async for token in rails.stream_async(generator=single_chunk_generator()):
tokens.append(token)

assert "".join(tokens) == "This is a complete response in a single chunk."