Skip to content

Commit 4b009fb

Browse files
lilyydulilydu
andauthored
[PY] feat: Streaming + Sample (#2141)
## Linked issues closes: #1967, #1970 ## Details - Added streaming support for Python with an associated sample. As ChefBot does not exist in Python, streaming is demonstrated through the ListBot instead. - Added Powered by AI features for the final chunk (feedback loop, citations, sensitivity label, gen by AI label) - Designed a custom `PromptCompletionModelEmitter` class to manage streaming events and handlers - **Feedback Loop:** The flag is passed in slightly differently from JS and C#. This is because the AI class was never being passed in as a method param for `Planner.continue_task `. Adding this change now as an optional param will lead to compiling errors for all developers with custom planners (parent class needs to be updated). This also leads to a couple of cyclical dependencies. Instead of internally piping this flag, it is now exposed as an _option_ in the `ActionPlanner`. This is (a) more obvious to configure, (b) will not introduce a sudden breaking change to non-streaming developers, and (c) provides more control for the developer. Although this flag can deviate from the one set in the `AI` class, it is not possible for both flows to run at once, so there won't be any conflicts. - **Citations and Sensitivity Label**: This will work slightly differently than the previous non-streaming Plan flow. Citations and its respective sensitivity labels are added per each text chunk queued. However, these will only be rendered in the final message (when the full message has been received). Rather than exposing the `SensitivityUsageInfo` object as an override on the `PredictedSayCommand`, the label can now be directly set as `usageInfo` in the `AIEntity` object along with the AIGenerated label and the citations. Additional items for parity: - Added temporary 1.5 second buffer to adhere to 1RPS BE service requirement. Consequently, if the message is small, the message will not look like it is being streamed. Recommended command to visualize the feature is "tell me a story and render this as a large text chunk". - Added reject/catch handling for errors - Added entities metadata to match GA requirements. This will log a few warnings from the Botbuilder side, until it is added on their side, in a few months. **screenshots**: ![image](https://github.com/user-attachments/assets/4e6c1606-dfa0-49f0-a652-66174eb7c9be) ## Attestation Checklist - [x] My code follows the style guidelines of this project - I have checked for/fixed spelling, linting, and other errors - I have commented my code for clarity - I have made corresponding changes to the documentation (updating the doc strings in the code is sufficient) - My changes generate no new warnings - I have added tests that validates my changes, and provides sufficient test coverage. I have tested with: - Local testing - E2E testing in Teams - New and existing unit tests pass locally with my changes --------- Co-authored-by: lilydu <[email protected]>
1 parent 0dbbc8a commit 4b009fb

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

55 files changed

+5819
-11
lines changed

getting-started/CONCEPTS/STREAMING.md

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ There are two parts to streaming:
3434
## Sample Bots
3535
- [C# Streaming ChefBot](https://github.com/microsoft/teams-ai/tree/main/dotnet/samples/04.ai.g.teamsChefBot-streaming)
3636
- [JS Streaming ChefBot](https://github.com/microsoft/teams-ai/tree/main/js/samples/04.ai-apps/i.teamsChefBot-streaming)
37+
- [Python Streaming ListBot](https://github.com/microsoft/teams-ai/tree/main/python/samples/04.ai.h.chainedActions.listBot-streaming)
3738

3839
## Streaming Response Class
3940
The `StreamingResponse` class is the helper class for streaming responses to the client. The class is used to send a series of updates to the client in a single response. If you are using your own custom model, you can directly instantiate and manage this class to stream responses.
@@ -52,7 +53,8 @@ Once `endStream()` is called, the stream is considered ended and no further upda
5253
### Current Limitations:
5354
- Streaming is only available in 1:1 chats.
5455
- SendActivity requests are restricted to 1 RPS. Our SDK buffers to 1.5 seconds.
55-
- For Powered by AI features, only the Feedback Loop and Generated by AI Label is currently supported.
56+
- For Powered by AI features, Citations, Sensitivity Label, Feedback Loop and Generated by AI Label are supported in the final chunk.
57+
- Citations are set per each text chunk queued.
5658
- Only rich text can be streamed.
5759
- Due to future GA protocol changes, the `channelData` metadata must be included in the `entities` object as well.
5860
- Only one informative message can be set. This is reused for each message.
@@ -74,7 +76,8 @@ You can configure streaming with your bot by following these steps:
7476

7577
#### Optional additions:
7678
- Set the informative message in the `ActionPlanner` declaration via the `StartStreamingMessage` config.
77-
- As previously, set the feedback loop toggle in the `AIOptions` object in the `app` declaration and specify a handler.
79+
- As previously, set the feedback loop toggle in the `AIOptions` object in the `app` declaration and specify a handler.
80+
- For *Python* specifically, the toggle also needs to be set in the `ActionPlannerOptions` object.
7881
- Set attachments in the final chunk via the `EndStreamHandler` in the `ActionPlanner` declaration.
7982

8083
#### C#
@@ -158,6 +161,46 @@ const planner = new ActionPlanner({
158161
});
159162
```
160163

164+
### Python
165+
166+
```python
167+
model = OpenAIModel(
168+
OpenAIModelOptions(api_key=config.OPENAI_KEY, default_model="gpt-4o", stream=True)
169+
)
170+
171+
def end_stream_handler(
172+
context: TurnContext,
173+
state: MemoryBase,
174+
response: PromptResponse[str],
175+
streamer: StreamingResponse,
176+
):
177+
if not streamer:
178+
return
179+
180+
card = CardFactory.adaptive_card(
181+
{
182+
"$schema": "http://adaptivecards.io/schemas/adaptive-card.json",
183+
"version": "1.6",
184+
"type": "AdaptiveCard",
185+
"body": [{"type": "TextBlock", "wrap": True, "text": streamer.message}],
186+
}
187+
)
188+
189+
streamer.set_attachments([card])
190+
191+
planner=ActionPlanner(
192+
ActionPlannerOptions(
193+
model=model,
194+
prompts=prompts,
195+
default_prompt="tools",
196+
enable_feedback_loop=True, # Enable the feedback loop
197+
start_streaming_message="Loading streaming results...", # Set the informative message
198+
end_stream_handler=end_stream_handler, # Set the final chunk handler
199+
)
200+
),
201+
202+
```
203+
161204
---
162205

163206
## Return to other major section topics:

python/packages/ai/teams/ai/citations/citations.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ class AIEntity(Entity):
2222
"id_": {"key": "@id", "type": "str"},
2323
"additional_type": {"key": "additionalType", "type": "[str]"},
2424
"citation": {"key": "citation", "type": "[ClientCitation]"},
25+
"usage_info": {"key": "usageInfo", "type": "SensitivityUsageInfo"},
2526
}
2627

2728
additional_type: Optional[list[str]]
@@ -30,6 +31,7 @@ class AIEntity(Entity):
3031
type_: str = "Message"
3132
context_: str = "https://schema.org"
3233
id_: str = ""
34+
usage_info: Optional[SensitivityUsageInfo] = field(default=None)
3335

3436

3537
@dataclass

python/packages/ai/teams/ai/clients/llm_client.py

Lines changed: 112 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,14 @@
1212
from botbuilder.core import TurnContext
1313

1414
from ...state import Memory, MemoryBase
15-
from ..models import PromptCompletionModel, PromptResponse
15+
from ...streaming.prompt_chunk import PromptChunk
16+
from ...streaming.streaming_response import StreamingResponse
17+
from ..models import (
18+
PromptCompletionModel,
19+
PromptResponse,
20+
ResponseReceivedHandler,
21+
StreamHandlerTypes,
22+
)
1623
from ..prompts import (
1724
ConversationHistorySection,
1825
Message,
@@ -68,13 +75,29 @@ class LLMClientOptions:
6875
Optional. When set the model will log requests
6976
"""
7077

78+
start_streaming_message: Optional[str] = ""
79+
"""
80+
Optional message to send at the start of a streaming response.
81+
"""
82+
83+
end_stream_handler: Optional[ResponseReceivedHandler] = None
84+
"""
85+
Optional handler to run when a stream is about to conclude.
86+
"""
87+
88+
enable_feedback_loop: Optional[bool] = False
89+
"Optional. Enables the Teams thumbs up or down buttons."
90+
7191

7292
class LLMClient:
7393
"""
7494
LLMClient class that's used to complete prompts.
7595
"""
7696

7797
_options: LLMClientOptions
98+
_start_streaming_message: Optional[str] = ""
99+
_end_stream_handler: Optional[ResponseReceivedHandler] = None
100+
_enable_feedback_loop: Optional[bool] = False
78101

79102
@property
80103
def options(self) -> LLMClientOptions:
@@ -89,6 +112,9 @@ def __init__(self, options: LLMClientOptions) -> None:
89112
"""
90113

91114
self._options = options
115+
self._start_streaming_message = options.start_streaming_message
116+
self._end_stream_handler = options.end_stream_handler
117+
self._enable_feedback_loop = options.enable_feedback_loop
92118

93119
async def complete_prompt(
94120
self,
@@ -112,6 +138,70 @@ async def complete_prompt(
112138

113139
remaining_attempts = remaining_attempts or self._options.max_repair_attempts
114140

141+
# Define event handlers
142+
is_streaming = False
143+
streamer: Optional[StreamingResponse] = None
144+
145+
def before_completion(
146+
ctx: TurnContext,
147+
memory: MemoryBase,
148+
functions: PromptFunctions,
149+
tokenizer: Tokenizer,
150+
template: PromptTemplate,
151+
streaming: bool,
152+
) -> None:
153+
# pylint: disable=unused-argument
154+
# Ignore events for other contexts
155+
if context != ctx:
156+
return
157+
158+
# Check for a streaming response
159+
if streaming:
160+
nonlocal is_streaming
161+
is_streaming = True
162+
163+
nonlocal streamer
164+
streamer = StreamingResponse(context)
165+
memory.set("temp.streamer", streamer)
166+
167+
if self._enable_feedback_loop is not None:
168+
streamer.set_feedback_loop(self._enable_feedback_loop)
169+
170+
streamer.set_generated_by_ai_label(True)
171+
172+
if self._start_streaming_message:
173+
streamer.queue_informative_update(self._start_streaming_message)
174+
175+
def chunk_received(
176+
ctx: TurnContext,
177+
memory: MemoryBase,
178+
chunk: PromptChunk,
179+
) -> None:
180+
# pylint: disable=unused-argument
181+
nonlocal streamer
182+
if (context != ctx) or (streamer is None):
183+
return
184+
185+
text = chunk.delta.content if (chunk.delta and chunk.delta.content) else ""
186+
citations = (
187+
chunk.delta.context.citations if (chunk.delta and chunk.delta.context) else None
188+
)
189+
190+
if len(text) > 0:
191+
streamer.queue_text_chunk(text, citations)
192+
193+
# Subscribe to model events
194+
if self._options.model.events is not None:
195+
self._options.model.events.subscribe(
196+
StreamHandlerTypes.BEFORE_COMPLETION, before_completion
197+
)
198+
self._options.model.events.subscribe(StreamHandlerTypes.CHUNK_RECEIVED, chunk_received)
199+
200+
if self._end_stream_handler is not None:
201+
self._options.model.events.subscribe(
202+
StreamHandlerTypes.RESPONSE_RECEIVED, self._end_stream_handler
203+
)
204+
115205
try:
116206
if remaining_attempts <= 0:
117207
return PromptResponse(
@@ -187,9 +277,30 @@ async def complete_prompt(
187277

188278
self._add_message_to_history(memory, self._options.history_variable, res.input)
189279
self._add_message_to_history(memory, self._options.history_variable, res.message)
280+
281+
if is_streaming and res.status == "success":
282+
# Delete message from response to avoid sending it twice
283+
res.message = None
284+
285+
if streamer is not None:
286+
await streamer.end_stream()
190287
return res
191288
except Exception as err: # pylint: disable=broad-except
192289
return PromptResponse(status="error", error=str(err))
290+
finally:
291+
# Unsubscribe from model events
292+
if self._options.model.events is not None:
293+
self._options.model.events.unsubscribe(
294+
StreamHandlerTypes.BEFORE_COMPLETION, before_completion
295+
)
296+
self._options.model.events.unsubscribe(
297+
StreamHandlerTypes.CHUNK_RECEIVED, chunk_received
298+
)
299+
300+
if self._end_stream_handler is not None:
301+
self._options.model.events.unsubscribe(
302+
StreamHandlerTypes.RESPONSE_RECEIVED, self._end_stream_handler
303+
)
193304

194305
def _add_message_to_history(
195306
self, memory: MemoryBase, variable: str, messages: Union[Message[Any], List[Message[Any]]]

python/packages/ai/teams/ai/models/__init__.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,16 @@
33
Licensed under the MIT License.
44
"""
55

6+
from ...streaming import (
7+
BeforeCompletionHandler,
8+
ChunkReceivedHandler,
9+
ResponseReceivedHandler,
10+
StreamHandlerTypes,
11+
)
612
from .chat_completion_action import ChatCompletionAction
713
from .openai_model import AzureOpenAIModelOptions, OpenAIModel, OpenAIModelOptions
814
from .prompt_completion_model import PromptCompletionModel
15+
from .prompt_completion_model_emitter import PromptCompletionModelEmitter
916
from .prompt_response import PromptResponse, PromptResponseStatus
1017

1118
__all__ = [
@@ -16,4 +23,9 @@
1623
"PromptCompletionModel",
1724
"PromptResponse",
1825
"PromptResponseStatus",
26+
"PromptCompletionModelEmitter",
27+
"BeforeCompletionHandler",
28+
"ChunkReceivedHandler",
29+
"ResponseReceivedHandler",
30+
"StreamHandlerTypes",
1931
]

0 commit comments

Comments
 (0)