Skip to content

Commit

Permalink
Merge branch 'main' into lilyydu/js-streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
lilyydu authored Aug 30, 2024
2 parents d36997b + 18ff729 commit 1fe45ff
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 74 deletions.
23 changes: 14 additions & 9 deletions js/packages/teams-ai/src/StreamingResponse.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ describe('StreamingResponse', function () {
const adapter = new TestAdapter();
await adapter.sendTextToBot('test', async (context) => {
const response = new StreamingResponse(context);
await response.sendInformativeUpdate('starting');
response.queueInformativeUpdate('starting');
await response.waitForQueue();
assert.equal(typeof response.streamId, 'string', 'streamId should be a string');
assert(response.streamId!.length > 0, 'streamId should not be empty');
assert.equal(response.updatesSent, 1, 'updatesSent should be 1');
Expand All @@ -41,8 +42,9 @@ describe('StreamingResponse', function () {
const adapter = new TestAdapter();
await adapter.sendTextToBot('test', async (context) => {
const response = new StreamingResponse(context);
await response.sendInformativeUpdate('first');
await response.sendInformativeUpdate('second');
response.queueInformativeUpdate('first');
response.queueInformativeUpdate('second');
await response.waitForQueue();
assert(response.updatesSent == 2, 'updatesSent should be 2');

// Validate sent activities
Expand All @@ -59,7 +61,8 @@ describe('StreamingResponse', function () {
const response = new StreamingResponse(context);
response.endStream();
try {
await response.sendInformativeUpdate('test');
response.queueInformativeUpdate('test');
await response.waitForQueue();
assert.fail('should have thrown an error');
} catch (err) {
assert.equal((err as Error).message, 'The stream has already ended.', 'error message should match');
Expand All @@ -73,8 +76,9 @@ describe('StreamingResponse', function () {
const adapter = new TestAdapter();
await adapter.sendTextToBot('test', async (context) => {
const response = new StreamingResponse(context);
await response.sendTextChunk('first');
await response.sendTextChunk('second');
response.queueTextChunk('first');
response.queueTextChunk('second');
await response.waitForQueue();
assert(response.updatesSent == 2, 'updatesSent should be 2');

// Validate sent activities
Expand Down Expand Up @@ -103,7 +107,8 @@ describe('StreamingResponse', function () {
const response = new StreamingResponse(context);
response.endStream();
try {
await response.sendTextChunk('test');
response.queueTextChunk('test');
await response.waitForQueue();
assert.fail('should have thrown an error');
} catch (err) {
assert.equal((err as Error).message, 'The stream has already ended.', 'error message should match');
Expand Down Expand Up @@ -136,8 +141,8 @@ describe('StreamingResponse', function () {
const adapter = new TestAdapter();
await adapter.sendTextToBot('test', async (context) => {
const response = new StreamingResponse(context);
await response.sendTextChunk('first');
await response.sendTextChunk('second');
response.queueTextChunk('first');
response.queueTextChunk('second');
await response.endStream();
assert(response.updatesSent == 3, 'updatesSent should be 3');

Expand Down
158 changes: 119 additions & 39 deletions js/packages/teams-ai/src/StreamingResponse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* Licensed under the MIT License.
*/

import { TurnContext } from 'botbuilder-core';
import { Activity, TurnContext } from 'botbuilder-core';

/**
* A helper class for streaming responses to the client.
Expand All @@ -25,6 +25,11 @@ export class StreamingResponse {
private _message: string = '';
private _ended = false;

// Queue for outgoing activities
private _queue: Array<() => Partial<Activity>> = [];
private _queueSync: Promise<void> | undefined;
private _chunkQueued = false;

/**
* Creates a new StreamingResponse instance.
* @param {TurnContext} context - Context for the current turn of conversation with the user.
Expand Down Expand Up @@ -53,42 +58,42 @@ export class StreamingResponse {
}

/**
* Sends an informative update to the client.
* Queues an informative update to be sent to the client.
* @param {string} text Text of the update to send.
* @returns {Promise<void>} - A promise representing the async operation.
*/
public sendInformativeUpdate(text: string): Promise<void> {
public queueInformativeUpdate(text: string): void {
if (this._ended) {
throw new Error('The stream has already ended.');
}

// Send typing activity
return this.sendActivity('typing', text, {
streamType: 'informative',
streamSequence: this._nextSequence++
});
// Queue a typing activity
this.queueActivity(() => ({
type: 'typing',
text,
channelData: {
streamType: 'informative',
streamSequence: this._nextSequence++
} as StreamingChannelData
}));
}

/**
* Sends a chunk of partial message text to the client.
* Queues a chunk of partial message text to be sent to the client.
* @remarks
* The text is appended to the full message text which will be sent when endStream() is called.
* The text we be sent as quickly as possible to the client. Chunks may be combined before
* delivery to the client.
* @param {string} text Partial text of the message to send.
* @returns {Promise<void>} - A promise representing the async operation
*/
public sendTextChunk(text: string): Promise<void> {
public queueTextChunk(text: string): void {
if (this._ended) {
throw new Error('The stream has already ended.');
}

// Update full message text
this._message += text;

// Send typing activity
return this.sendActivity('typing', text, {
streamType: 'streaming',
streamSequence: this._nextSequence++
});
// Queue the next chunk
this.queueNextChunk();
}

/**
Expand All @@ -100,43 +105,118 @@ export class StreamingResponse {
throw new Error('The stream has already ended.');
}

// Send final message
// Queue final message
this._ended = true;
return this.sendActivity('message', this._message, {
streamType: 'final',
streamSequence: this._nextSequence++
this.queueNextChunk();

// Wait for the queue to drain
return this._queueSync!;
}

/**
* Waits for the outgoing activity queue to be empty.
* @returns {Promise<void>} - A promise representing the async operation.
*/
public waitForQueue(): Promise<void> {
return this._queueSync || Promise.resolve();
}

/**
* Queues the next chunk of text to be sent to the client.
* @private
*/
private queueNextChunk(): void {
// Are we already waiting to send a chunk?
if (this._chunkQueued) {
return;
}

// Queue a chunk of text to be sent
this._chunkQueued = true;
this.queueActivity(() => {
this._chunkQueued = false;
if (this._ended) {
// Send final message
return {
type: 'message',
text: this._message,
channelData: {
streamType: 'final'
} as StreamingChannelData
};
} else {
// Send typing activity
return {
type: 'typing',
text: this._message,
channelData: {
streamType: 'streaming',
streamSequence: this._nextSequence++
} as StreamingChannelData
};
}
});
}

/**
* Queues an activity to be sent to the client.
* @param {() => Partial<Activity>} factory - A factory that creates the outgoing activity just before its sent.
*/
private queueActivity(factory: () => Partial<Activity>): void {
this._queue.push(factory);

// If there's no sync in progress, start one
if (!this._queueSync) {
this._queueSync = this.drainQueue();
}
}

/**
* Sends any queued activities to the client until the queue is empty.
* @returns {Promise<void>} - A promise that will be resolved once the queue is empty.
* @private
*/
private drainQueue(): Promise<void> {
return new Promise<void>(async (resolve) => {
try {
while (this._queue.length > 0) {
// Get next activity from queue
const factory = this._queue.shift()!;
const activity = factory();

// Send activity
await this.sendActivity(activity);
}

resolve();
} finally {
// Queue is empty, mark as idle
this._queueSync = undefined;
}
});
}

/**
* @param {'typing' | 'message'} type - The type of activity to send.
* @param {string} text - The text of the activity to send.
* @param {StreamingChannelData} channelData - The channel data for the activity to send.
* Sends an activity to the client and saves the stream ID returned.
* @param {Partial<Activity>} activity - The activity to send.
* @returns {Promise<void>} - A promise representing the async operation.
* @private
*/
private async sendActivity(
type: 'typing' | 'message',
text: string,
channelData: StreamingChannelData
): Promise<void> {
// Add stream ID
private async sendActivity(activity: Partial<Activity>): Promise<void> {
// Set activity ID to the assigned stream ID
if (this._streamId) {
channelData.streamId = this._streamId;
activity.id = this._streamId;
activity.channelData = Object.assign({}, activity.channelData, { streamId: this._streamId });
}

// Send activity
const response = await this._context.sendActivity({
type,
text,
channelData
});
const response = await this._context.sendActivity(activity);

// Save stream ID
// Save assigned stream ID
if (!this._streamId) {
this._streamId = response?.id;
}
}
}
}

/**
Expand Down
30 changes: 22 additions & 8 deletions js/packages/teams-ai/src/models/OpenAIModel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -290,9 +290,9 @@ export class OpenAIModel implements PromptCompletionModel {
: (this.options as OpenAIModelOptions).defaultModel);

// Check for legacy completion type
if (template.config.type == 'completion') {
if (template.config.completion.completion_type == 'text') {
throw new Error(
`The completion type 'completion' is no longer supported. Only 'chat' based models are supported.`
`The completion_type 'text' is no longer supported. Only 'chat' based models are supported.`
);
}

Expand Down Expand Up @@ -367,8 +367,10 @@ export class OpenAIModel implements PromptCompletionModel {
}

// Log stream completion
console.log(Colorize.title('STREAM COMPLETED:'));
console.log(Colorize.value('duration', Date.now() - startTime, 'ms'));
if (this.options.logRequests) {
console.log(Colorize.title('STREAM COMPLETED:'));
console.log(Colorize.value('duration', Date.now() - startTime, 'ms'));
}
} else {
const actionCalls: ActionCall[] = [];
const responseMessage = (completion as ChatCompletion).choices![0].message;
Expand Down Expand Up @@ -408,6 +410,9 @@ export class OpenAIModel implements PromptCompletionModel {
// Signal response received
const response: PromptResponse<string> = { status: 'success', input, message };
this._events.emit('responseReceived', context, memory, response);

// Let any pending events flush before returning
await new Promise((resolve) => setTimeout(resolve, 0));
return response;
} catch (err: unknown) {
console.log(err);
Expand Down Expand Up @@ -578,6 +583,13 @@ export class OpenAIModel implements PromptCompletionModel {
}
params.model = model;

// Remove tool params if not using tools
if (!Array.isArray(params.tools) || params.tools.length == 0) {
if (params.tool_choice) {
delete params.tool_choice;
}
}

return params;
}

Expand Down Expand Up @@ -615,11 +627,13 @@ export class OpenAIModel implements PromptCompletionModel {

private returnError(err: unknown, input: Message<string>[] | Message<string> | undefined): PromptResponse<string> {
if (err instanceof OpenAI.APIError) {
if (this.options.logRequests) {
console.log(Colorize.title('ERROR:'));
console.log(Colorize.output(err.message));
console.log(Colorize.title('HEADERS:'));
console.log(Colorize.output(err.headers as any));
}
if (err.status == 429) {
if (this.options.logRequests) {
console.log(Colorize.title('HEADERS:'));
console.log(Colorize.output(err.headers as any));
}
return {
status: 'rate_limited',
input,
Expand Down
Loading

0 comments on commit 1fe45ff

Please sign in to comment.