Skip to content

Commit 5898fa1

Browse files
authored
Merge pull request #14 from thedadams/event-stream
feat: add event streaming
2 parents 739fb33 + 0ace262 commit 5898fa1

File tree

6 files changed

+299
-34
lines changed

6 files changed

+299
-34
lines changed

README.md

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,54 @@ async function streamExec() {
168168
}
169169
```
170170

171+
### streamExecWithEvents
172+
173+
Executes a gptscript with optional input and arguments, and returns the output and event streams.
174+
175+
**Options:**
176+
177+
These are optional options that can be passed to the `exec` function.
178+
Neither option is required, and the defaults will reduce the number of calls made to the Model API.
179+
180+
- `cache`: Enable or disable caching.
181+
- `cacheDir`: Specify the cache directory.
182+
183+
**Usage:**
184+
185+
```javascript
186+
const gptscript = require('@gptscript-ai/gptscript');
187+
188+
const opts = {
189+
cache: false,
190+
};
191+
192+
const t = new gptscript.Tool({
193+
instructions: "who was the president of the united states in 1928?"
194+
});
195+
196+
async function streamExecWithEvents() {
197+
try {
198+
const { stdout, stderr, events, promise } = await gptscript.streamExecWithEvents(t, opts);
199+
200+
stdout.on('data', data => {
201+
console.log(`system: ${data}`);
202+
});
203+
204+
stderr.on('data', data => {
205+
console.log(`system: ${data}`);
206+
});
207+
208+
events.on('data', data => {
209+
console.log(`events: ${data}`);
210+
});
211+
212+
await promise;
213+
} catch (e) {
214+
console.error(e);
215+
}
216+
}
217+
```
218+
171219
### streamExecFile
172220

173221
**Options:**
@@ -208,6 +256,50 @@ async function streamExecFile() {
208256
}
209257
```
210258

259+
### streamExecFileWithEvents
260+
261+
**Options:**
262+
263+
These are optional options that can be passed to the `exec` function.
264+
Neither option is required, and the defaults will reduce the number of calls made to the Model API.
265+
266+
- `cache`: Enable or disable caching.
267+
- `cacheDir`: Specify the cache directory.
268+
269+
**Usage:**
270+
271+
The script is relative to the callers source directory.
272+
273+
```javascript
274+
const gptscript = require('@gptscript-ai/gptscript');
275+
276+
const opts = {
277+
cache: false,
278+
};
279+
280+
async function streamExecFileWithEvents() {
281+
try {
282+
const { stdout, stderr, events, promise } = await gptscript.streamExecFileWithEvents('./test.gpt', "--testin how high is that there mouse?", opts);
283+
284+
stdout.on('data', data => {
285+
console.log(`system: ${data}`);
286+
});
287+
288+
stderr.on('data', data => {
289+
console.log(`system: ${data}`);
290+
});
291+
292+
events.on('data', data => {
293+
console.log(`event: ${data}`);
294+
});
295+
296+
await promise;
297+
} catch (e) {
298+
console.error(e);
299+
}
300+
}
301+
```
302+
211303
## Types
212304

213305
### Tool Parameters

src/exec.d.ts

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,15 @@ interface StreamExecResult {
66
promise: Promise<void>;
77
}
88

9-
export function streamExec(command: string, args: string[], stdin: string, cwd: string, detached: boolean, env: { [key: string]: string }): StreamExecResult;
9+
export function streamExec(command: string, args: string[], stdin: string, cwd: string, detached: boolean, env: { [key: string]: string }): StreamExecResult;
10+
11+
interface StreamExecWithEventsResult {
12+
stdout: Readable;
13+
stderr: Readable;
14+
events: Readable;
15+
promise: Promise<void>;
16+
}
17+
18+
export function streamExecWithEvents(command: string, args: string[], stdin: string, cwd: string, env: {
19+
[p: string]: string
20+
}): StreamExecWithEventsResult;

src/exec.js

Lines changed: 104 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
11
const childProcess = require('child_process');
2+
const net = require('net');
3+
const stream = require('stream');
4+
const TEN_MEBIBYTE = 1024 * 1024 * 10;
25

3-
const TEN_MEBIBYTE = 1024 * 1024 * 10
46

57
async function exec(command, args, stdin, cwd = './', detached, env) {
68
let stdout = '';
79
let stderr = '';
810

911
const spawnOptions = { maxBuffer: TEN_MEBIBYTE, cwd, detached, env };
1012

11-
const child = childProcess.spawn(command, args, spawnOptions);
13+
const child = childExec(command, args, stdin, spawnOptions);
1214

1315
// Capture stdout
1416
child.stdout.on('data', data => {
@@ -20,13 +22,6 @@ async function exec(command, args, stdin, cwd = './', detached, env) {
2022
stderr += data;
2123
});
2224

23-
// Write to stdin if provided
24-
if (stdin) {
25-
child.stdin.setEncoding('utf-8');
26-
child.stdin.write(stdin);
27-
child.stdin.end();
28-
}
29-
3025
// Wait for the child process to exit
3126
await new Promise((resolve, reject) => {
3227
child.on('close', code => {
@@ -49,38 +44,116 @@ async function exec(command, args, stdin, cwd = './', detached, env) {
4944
async function streamExec(command, args, stdin, cwd = './', detached, env) {
5045
const spawnOptions = { maxBuffer: TEN_MEBIBYTE, cwd, detached, env };
5146

52-
const child = childProcess.spawn(command, args, spawnOptions);
47+
const child = childExec(command, args, stdin, spawnOptions);
48+
49+
return {
50+
stdout: child.stdout,
51+
stderr: child.stderr,
52+
promise: new Promise((resolve, reject) => {
53+
child.on('close', code => {
54+
if (code !== 0) {
55+
reject(new Error(`Child process exited with code ${code}`));
56+
} else {
57+
resolve();
58+
}
59+
});
60+
61+
child.on('error', error => {
62+
reject(error);
63+
});
64+
})
65+
}
66+
}
67+
68+
async function streamExecWithEvents(command, args, stdin, cwd = './', env) {
69+
let server, events = null;
70+
const spawnOptions = { maxBuffer: TEN_MEBIBYTE, cwd, env, stdio: ['pipe', 'pipe', 'pipe'] };
71+
72+
// On Windows, the child process doesn't know which file handles are available to it.
73+
// Therefore, we have to use a named pipe. This is set up with a server.
74+
if (process.platform === 'win32') {
75+
const namedPipe = '\\\\.\\pipe\\gptscript-' + Math.floor(Math.random() * 1000000);
76+
events = new stream.Readable({
77+
encoding: 'utf-8',
78+
read() {
79+
}
80+
});
81+
82+
server = net.createServer((connection) => {
83+
console.debug('Client connected');
84+
85+
connection.on('data', (data) => {
86+
// Pass the data onto the event stream.
87+
events.push(data);
88+
});
89+
90+
connection.on('end', () => {
91+
// Indicate that there is no more data.
92+
events.push(null);
93+
});
94+
});
95+
96+
server.listen(namedPipe, () => {
97+
console.debug('Server is listening on', namedPipe);
98+
});
99+
100+
// Add the named pipe for streaming events.
101+
args.unshift("--events-stream-to="+namedPipe);
102+
} else {
103+
// For non-Windows systems, we just add an extra stdio pipe and use that for streaming events.
104+
spawnOptions.stdio.push('pipe');
105+
args.unshift("--events-stream-to=fd://"+(spawnOptions.stdio.length-1));
106+
}
107+
108+
109+
const child = childExec(command, args, stdin, spawnOptions);
110+
if (!events) {
111+
// If the child process is not a Windows system, we can use the stdio pipe for streaming events.
112+
events = stream.Readable.from(child.stdio[child.stdio.length-1])
113+
}
114+
115+
return {
116+
stdout: child.stdout,
117+
stderr: child.stderr,
118+
events: events,
119+
promise: new Promise((resolve, reject) => {
120+
child.on('exit', code => {
121+
events.destroy();
122+
if (server) server.close();
123+
124+
if (code !== 0) {
125+
reject(new Error(`Child process exited with code ${code}`));
126+
} else {
127+
resolve();
128+
}
129+
});
130+
131+
child.on('error', error => {
132+
events.destroy();
133+
if (server) server.close();
134+
135+
reject(error);
136+
});
137+
})
138+
};
139+
}
140+
141+
function childExec(command, args, stdin, opts = {}) {
142+
const child = childProcess.spawn(command, args, opts);
53143

54144
// Write to stdin if provided
55145
if (stdin && child.stdin) {
56-
child.stdin.setEncoding('utf-8');
146+
child.stdin.setDefaultEncoding('utf-8');
57147
child.stdin.write(stdin);
58148
child.stdin.end();
59149
}
60150

61-
if (child.stdout && child.stdout) {
62-
return {
63-
stdout: child.stdout,
64-
stderr: child.stderr,
65-
promise: new Promise((resolve, reject) => {
66-
child.on('close', code => {
67-
if (code !== 0) {
68-
reject(new Error(`Child process exited with code ${code}`));
69-
} else {
70-
resolve();
71-
}
72-
});
73-
74-
child.on('error', error => {
75-
reject(error);
76-
});
77-
})
78-
};
79-
}
151+
return child;
80152
}
81153

82154

83155
module.exports = {
84156
exec: exec,
85-
streamExec: streamExec
157+
streamExec: streamExec,
158+
streamExecWithEvents: streamExecWithEvents
86159
}

src/gptscript.d.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { StreamExecResult } from "./exec";
1+
import { StreamExecResult, StreamExecWithEventsResult } from "./exec";
22
import { Tool, FreeForm } from "./tool";
33

44
export function getCmdPath(): string;
@@ -9,4 +9,6 @@ export function listModels(): Promise<string[]>;
99
export function exec(tool: Tool | FreeForm | (Tool | FreeForm)[], opts: { [key: string]: string }): Promise<string>;
1010
export function execFile(scriptPath: string, input: string, opts: { [key: string]: string }): Promise<string>;
1111
export function streamExec(tool: Tool | FreeForm | (Tool | FreeForm)[], opts: { [key: string]: string }): StreamExecResult;
12-
export function streamExecFile(scriptPath: string, input: string, opts: { [key: string]: string }): StreamExecResult;
12+
export function streamExecWithEvents(tool: Tool | FreeForm | (Tool | FreeForm)[], opts: { [key: string]: string }): StreamExecWithEventsResult;
13+
export function streamExecFile(scriptPath: string, input: string, opts: { [key: string]: string }): StreamExecResult;
14+
export function streamExecFileWithEvents(tool: Tool | FreeForm | (Tool | FreeForm)[], opts: { [key: string]: string }): StreamExecWithEventsResult;

src/gptscript.js

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@ const path = require('path');
33
const tools = require('./tool');
44

55
function getCmdPath() {
6+
if (process.env.GPTSCRIPT_BIN) {
7+
return process.env.GPTSCRIPT_BIN;
8+
}
69
return path.join(__dirname, '..', 'bin', 'gptscript');
710
}
811

@@ -73,6 +76,13 @@ function streamRun(args = [], stdin, gptPath = './', input = "", env = process.e
7376
return execlib.streamExec(cmdPath, cmdArgs, stdin, './', false, env);
7477
}
7578

79+
function streamRunWithEvents(args = [], stdin, gptPath = './', input = "", env = process.env) {
80+
const cmdPath = getCmdPath();
81+
const cmdArgs = cliArgBuilder(args, stdin, gptPath, input);
82+
83+
return execlib.streamExecWithEvents(cmdPath, cmdArgs, stdin, './', env);
84+
}
85+
7686
function listTools() {
7787
return run(['--list-tools']);
7888
}
@@ -104,18 +114,33 @@ function streamExec(tool, opts = {}) {
104114
return streamRun(args, toolString);
105115
}
106116

117+
function streamExecWithEvents(tool, opts = {}) {
118+
const args = toArgs(opts);
119+
const toolString = getToolString(tool);
120+
121+
return streamRunWithEvents(args, toolString);
122+
}
123+
107124
function streamExecFile(scriptPath, input = "", opts = {}) {
108125
const args = toArgs(opts);
109126
return streamRun(args, undefined, scriptPath, input);
110127
}
111128

129+
function streamExecFileWithEvents(scriptPath, input = "", opts = {}) {
130+
const args = toArgs(opts);
131+
132+
return streamRunWithEvents(args, undefined, scriptPath, input);
133+
}
134+
112135
module.exports = {
113136
listTools: listTools,
114137
listModels: listModels,
115138
exec: exec,
116139
execFile: execFile,
117140
streamExec: streamExec,
141+
streamExecWithEvents: streamExecWithEvents,
118142
streamExecFile: streamExecFile,
143+
streamExecFileWithEvents: streamExecFileWithEvents,
119144
version: version,
120145
Tool: tools.Tool,
121146
FreeForm: tools.FreeForm

0 commit comments

Comments
 (0)