Skip to content

Commit

Permalink
[JS] feat: attachments in final stream chunk (#2032)
Browse files Browse the repository at this point in the history
## Linked issues

closes: #2036

## Details

- add option to add attachments to final stream chunk, by calling
`StreamingResponse.setAttachments(attachments)`
- streamer instance is accessed via the `endStreamHandler` (type is
`PromptCompletionModelResponseReceivedEvent`)

## Attestation Checklist

- [X] My code follows the style guidelines of this project

- I have checked for/fixed spelling, linting, and other errors
- I have commented my code for clarity
- I have made corresponding changes to the documentation (updating the
doc strings in the code is sufficient)
- My changes generate no new warnings
- I have added tests that validates my changes, and provides sufficient
test coverage. I have tested with:
  - Local testing
  - E2E testing in Teams
- New and existing unit tests pass locally with my changes
  • Loading branch information
lilyydu authored Sep 23, 2024
1 parent dc821c0 commit 5866556
Show file tree
Hide file tree
Showing 8 changed files with 137 additions and 15 deletions.
44 changes: 44 additions & 0 deletions js/packages/teams-ai/src/StreamingResponse.spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import assert from 'assert';
import { TestAdapter } from 'botbuilder';
import { CardFactory } from 'botbuilder-core';
import { StreamingResponse } from './StreamingResponse';

describe('StreamingResponse', function () {
Expand Down Expand Up @@ -157,5 +158,48 @@ describe('StreamingResponse', function () {
);
});
});

it('should send a final message with text and attachments', async () => {
const adapter = new TestAdapter();
const adaptiveCard = {
$schema: 'http://adaptivecards.io/schemas/adaptive-card.json',
version: '1.6',
type: 'AdaptiveCard',
body: [
{
text: 'This is an example of an attachment..',
wrap: true,
type: 'TextBlock'
}
]
};
await adapter.sendTextToBot('test', async (context) => {
const response = new StreamingResponse(context);
response.queueTextChunk('first');
response.queueTextChunk('second');
await response.waitForQueue();
await response.setAttachments([CardFactory.adaptiveCard(adaptiveCard)]);
await response.endStream();
assert(response.updatesSent == 2, 'updatesSent should be 2');

// Validate sent activities
const activities = adapter.activeQueue;
assert.equal(activities.length, 3, 'should have sent 3 activities');
assert.equal(activities[0].channelData.streamSequence, 1, 'first activity streamSequence should be 1');
assert.equal(activities[1].channelData.streamSequence, 2, 'second activity streamSequence should be 2');
assert.equal(activities[2].type, 'message', 'final activity type should be "message"');
assert.equal(activities[2].text, 'firstsecond', 'final activity text should be "firstsecond"');
assert.deepEqual(
activities[2].channelData,
{ streamType: 'final', streamId: response.streamId },
'final activity channelData should match'
);
assert.notEqual(activities[2].attachments, null);
if (activities[2].attachments) {
assert.equal(activities[2].attachments.length, 1, 'should have 1 attachment');
assert.deepEqual(activities[2].attachments[0].content, adaptiveCard, 'adaptive card should match');
}
});
});
});
});
20 changes: 19 additions & 1 deletion js/packages/teams-ai/src/StreamingResponse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* Licensed under the MIT License.
*/

import { Activity, TurnContext } from 'botbuilder-core';
import { Activity, Attachment, TurnContext } from 'botbuilder-core';

/**
* A helper class for streaming responses to the client.
Expand All @@ -23,6 +23,7 @@ export class StreamingResponse {
private _nextSequence: number = 1;
private _streamId?: string;
private _message: string = '';
private _attachments?: Attachment[];
private _ended = false;

// Queue for outgoing activities
Expand Down Expand Up @@ -113,6 +114,22 @@ export class StreamingResponse {
return this._queueSync!;
}

/**
* Sets the attachments to attach to the final chunk.
* @param attachments List of attachments.
*/
public setAttachments(attachments: Attachment[]): void {
this._attachments = attachments;
}

/**
* Returns the most recently streamed message.
* @returns The streamed message.
*/
public getMessage(): string {
return this._message;
}

/**
* Waits for the outgoing activity queue to be empty.
* @returns {Promise<void>} - A promise representing the async operation.
Expand Down Expand Up @@ -140,6 +157,7 @@ export class StreamingResponse {
return {
type: 'message',
text: this._message,
attachments: this._attachments,
channelData: {
streamType: 'final'
} as StreamingChannelData
Expand Down
10 changes: 6 additions & 4 deletions js/packages/teams-ai/src/models/OpenAIModel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import { Tokenizer } from '../tokenizers';
import { ActionCall, PromptResponse } from '../types';

import { PromptCompletionModel, PromptCompletionModelEmitter } from './PromptCompletionModel';
import { StreamingResponse } from '../StreamingResponse';

/**
* Base model options common to both OpenAI and Azure OpenAI services.
Expand Down Expand Up @@ -166,7 +167,7 @@ export interface AzureOpenAIModelOptions extends BaseOpenAIModelOptions {
/**
* A `PromptCompletionModel` for calling OpenAI and Azure OpenAI hosted models.
* @remarks
* The model has been updated to support calling OpenAI's new o1 family of models. That currently
* The model has been updated to support calling OpenAI's new o1 family of models. That currently
* comes with a few constraints. These constraints are mostly handled for you but are worth noting:
* - The o1 models introduce a new `max_completion_tokens` parameter and they've deprecated the
* `max_tokens` parameter. The model will automatically convert the incoming `max_tokens` parameter
Expand All @@ -175,9 +176,9 @@ export interface AzureOpenAIModelOptions extends BaseOpenAIModelOptions {
* increase in token usage and costs when using the o1 models.
* - The o1 models do not currently support the sending of system messages which just means that the
* `useSystemMessages` parameter is ignored when calling the o1 models.
* - The o1 models do not currently support setting the `temperature`, `top_p`, and `presence_penalty`
* - The o1 models do not currently support setting the `temperature`, `top_p`, and `presence_penalty`
* parameters so they will be ignored.
* - The o1 models do not currently support the use of tools so you will need to use the "monologue"
* - The o1 models do not currently support the use of tools so you will need to use the "monologue"
* augmentation to call actions.
*/
export class OpenAIModel implements PromptCompletionModel {
Expand Down Expand Up @@ -436,7 +437,8 @@ export class OpenAIModel implements PromptCompletionModel {

// Signal response received
const response: PromptResponse<string> = { status: 'success', input, message };
this._events.emit('responseReceived', context, memory, response);
const streamer: StreamingResponse = memory.getValue('temp.streamer');
this._events.emit('responseReceived', context, memory, response, streamer);

// Let any pending events flush before returning
await new Promise((resolve) => setTimeout(resolve, 0));
Expand Down
11 changes: 9 additions & 2 deletions js/packages/teams-ai/src/models/PromptCompletionModel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { Tokenizer } from '../tokenizers';
import { PromptResponse } from '../types';
import { Memory } from '../MemoryFork';
import StrictEventEmitter from '../external/strict-event-emitter-types';
import { StreamingResponse } from '../StreamingResponse';

/**
* Events emitted by a PromptCompletionModel.
Expand Down Expand Up @@ -51,7 +52,12 @@ export interface PromptCompletionModelEvents {
* @param memory An interface for accessing state values.
* @param response Final response returned by the model.
*/
responseReceived: (context: TurnContext, memory: Memory, response: PromptResponse<string>) => void;
responseReceived: (
context: TurnContext,
memory: Memory,
response: PromptResponse<string>,
streamer: StreamingResponse
) => void;
}

/**
Expand Down Expand Up @@ -81,7 +87,8 @@ export type PromptCompletionModelChunkReceivedEvent = (
export type PromptCompletionModelResponseReceivedEvent = (
context: TurnContext,
memory: Memory,
response: PromptResponse<string>
response: PromptResponse<string>,
streamer: StreamingResponse
) => void;

/**
Expand Down
7 changes: 5 additions & 2 deletions js/packages/teams-ai/src/models/TestModel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { Tokenizer } from '../tokenizers';
import { TurnContext } from 'botbuilder';
import { Memory } from '../MemoryFork';
import EventEmitter from 'events';
import { StreamingResponse } from '../StreamingResponse';

/**
* A `PromptCompletionModel` used for testing.
Expand Down Expand Up @@ -90,7 +91,8 @@ export class TestModel implements PromptCompletionModel {
return new TestModel(async (model, context, memory, functions, tokenizer, template) => {
model.events.emit('beforeCompletion', context, memory, functions, tokenizer, template, false);
await new Promise((resolve) => setTimeout(resolve, delay));
model.events.emit('responseReceived', context, memory, response);
const streamer = new StreamingResponse(context);
model.events.emit('responseReceived', context, memory, response, streamer);
return response;
});
}
Expand Down Expand Up @@ -127,7 +129,8 @@ export class TestModel implements PromptCompletionModel {
// Finalize the response.
await new Promise((resolve) => setTimeout(resolve, delay));
const response: PromptResponse<string> = { status: 'success', message: { role: 'assistant', content } };
model.events.emit('responseReceived', context, memory, response);
const streamer = new StreamingResponse(context);
model.events.emit('responseReceived', context, memory, response, streamer);
return response;
});
}
Expand Down
10 changes: 8 additions & 2 deletions js/packages/teams-ai/src/planners/ActionPlanner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { TurnContext } from 'botbuilder';
import { AI } from '../AI';
import { DefaultAugmentation } from '../augmentations';
import { Memory } from '../MemoryFork';
import { PromptCompletionModel } from '../models';
import { PromptCompletionModel, PromptCompletionModelResponseReceivedEvent } from '../models';
import { PromptTemplate, PromptManager } from '../prompts';
import { Tokenizer } from '../tokenizers';
import { TurnState } from '../TurnState';
Expand Down Expand Up @@ -85,6 +85,11 @@ export interface ActionPlannerOptions<TState extends TurnState = TurnState> {
* Optional message to send a client at the start of a streaming response.
*/
startStreamingMessage?: string;

/**
* Optional handler to run when a stream is about to conclude.
*/
endStreamHandler?: PromptCompletionModelResponseReceivedEvent;
}

/**
Expand Down Expand Up @@ -259,7 +264,8 @@ export class ActionPlanner<TState extends TurnState = TurnState> implements Plan
max_history_messages: this.prompts.options.max_history_messages,
max_repair_attempts: this._options.max_repair_attempts,
logRepairs: this._options.logRepairs,
startStreamingMessage: this._options.startStreamingMessage
startStreamingMessage: this._options.startStreamingMessage,
endStreamHandler: this._options.endStreamHandler
});

// Complete prompt
Expand Down
19 changes: 18 additions & 1 deletion js/packages/teams-ai/src/planners/LLMClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import { Memory, MemoryFork } from '../MemoryFork';
import {
PromptCompletionModel,
PromptCompletionModelBeforeCompletionEvent,
PromptCompletionModelChunkReceivedEvent
PromptCompletionModelChunkReceivedEvent,
PromptCompletionModelResponseReceivedEvent
} from '../models';
import { ConversationHistory, Message, Prompt, PromptFunctions, PromptTemplate } from '../prompts';
import { StreamingResponse } from '../StreamingResponse';
Expand Down Expand Up @@ -91,6 +92,11 @@ export interface LLMClientOptions<TContent = any> {
* Optional message to send a client at the start of a streaming response.
*/
startStreamingMessage?: string;

/**
* Optional handler to run when a stream is about to conclude.
*/
endStreamHandler?: PromptCompletionModelResponseReceivedEvent;
}

/**
Expand Down Expand Up @@ -193,6 +199,7 @@ export interface ConfiguredLLMClientOptions<TContent = any> {
*/
export class LLMClient<TContent = any> {
private readonly _startStreamingMessage: string | undefined;
private readonly _endStreamHandler: PromptCompletionModelResponseReceivedEvent | undefined;

/**
* Configured options for this LLMClient instance.
Expand Down Expand Up @@ -226,6 +233,7 @@ export class LLMClient<TContent = any> {
}

this._startStreamingMessage = options.startStreamingMessage;
this._endStreamHandler = options.endStreamHandler;
}

/**
Expand Down Expand Up @@ -290,6 +298,7 @@ export class LLMClient<TContent = any> {

// Create streamer and send initial message
streamer = new StreamingResponse(context);
memory.setValue('temp.streamer', streamer);
if (this._startStreamingMessage) {
streamer.queueInformativeUpdate(this._startStreamingMessage);
}
Expand All @@ -313,6 +322,10 @@ export class LLMClient<TContent = any> {
if (this.options.model.events) {
this.options.model.events.on('beforeCompletion', beforeCompletion);
this.options.model.events.on('chunkReceived', chunkReceived);

if (this._endStreamHandler) {
this.options.model.events.on('responseReceived', this._endStreamHandler);
}
}

try {
Expand All @@ -335,6 +348,10 @@ export class LLMClient<TContent = any> {
if (this.options.model.events) {
this.options.model.events.off('beforeCompletion', beforeCompletion);
this.options.model.events.off('chunkReceived', chunkReceived);

if (this._endStreamHandler) {
this.options.model.events.off('responseReceived', this._endStreamHandler);
}
}
}
}
Expand Down
31 changes: 28 additions & 3 deletions js/samples/04.ai-apps/i.teamsChefBot-streaming/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import * as restify from 'restify';

// Import required bot services.
// See https://aka.ms/bot-services to learn more about the different parts of a bot.
import { ConfigurationServiceClientCredentialFactory, MemoryStorage, TurnContext } from 'botbuilder';
import { CardFactory, ConfigurationServiceClientCredentialFactory, MemoryStorage, TurnContext } from 'botbuilder';

import {
AI,
Expand All @@ -17,7 +17,9 @@ import {
OpenAIModel,
PromptManager,
TurnState,
TeamsAdapter
TeamsAdapter,
PromptCompletionModelResponseReceivedEvent,
StreamingResponse
} from '@microsoft/teams-ai';

import { addResponseFormatter } from './responseFormatter';
Expand Down Expand Up @@ -104,11 +106,34 @@ const prompts = new PromptManager({
promptsFolder: path.join(__dirname, '../src/prompts')
});

const endStreamHandler: PromptCompletionModelResponseReceivedEvent = (ctx, memory, response, streamer) => {
// Ignore events for other contexts
if (!streamer) {
return;
}

const card = CardFactory.adaptiveCard({
$schema: 'http://adaptivecards.io/schemas/adaptive-card.json',
version: '1.6',
type: 'AdaptiveCard',
body: [
{
type: 'TextBlock',
wrap: true,
text: streamer.getMessage(),
}
],
})

streamer.setAttachments([card]);
};

const planner = new ActionPlanner({
model,
prompts,
defaultPrompt: 'default',
startStreamingMessage: 'Loading stream results...'
startStreamingMessage: 'Loading stream results...',
endStreamHandler: endStreamHandler
});

// Define storage and application
Expand Down

0 comments on commit 5866556

Please sign in to comment.