Skip to content

Commit c1695f7

Browse files
authored
Merge pull request #266 from thedadams/event-streaming
feat: add event streaming
2 parents 6c87a23 + c1ed165 commit c1695f7

File tree

3 files changed

+142
-1
lines changed

3 files changed

+142
-1
lines changed

pkg/cli/gptscript.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ type GPTScript struct {
4444
Debug bool `usage:"Enable debug logging"`
4545
Quiet *bool `usage:"No output logging (set --quiet=false to force on even when there is no TTY)" short:"q"`
4646
Output string `usage:"Save output to a file, or - for stdout" short:"o"`
47+
EventsStreamTo string `usage:"Stream events to this location, could be a file descriptor/handle (e.g. fd://2), filename, or named pipe (e.g. \\\\.\\pipe\\my-pipe)" name:"events-stream-to"`
4748
Input string `usage:"Read input from a file (\"-\" for stdin)" short:"f"`
4849
SubTool string `usage:"Use tool of this name, not the first tool in file" local:"true"`
4950
Assemble bool `usage:"Assemble tool to a single artifact, saved to --output" hidden:"true" local:"true"`
@@ -137,6 +138,15 @@ func (r *GPTScript) NewGPTScriptOpts() (gptscript.Options, error) {
137138

138139
opts.Runner.CredentialOverride = r.CredentialOverride
139140

141+
if r.EventsStreamTo != "" {
142+
mf, err := monitor.NewFileFactory(r.EventsStreamTo)
143+
if err != nil {
144+
return gptscript.Options{}, err
145+
}
146+
147+
opts.Runner.MonitorFactory = mf
148+
}
149+
140150
return opts, nil
141151
}
142152

pkg/monitor/display.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,8 @@ func (d *display) Event(event runner.Event) {
203203
log := log.Fields(
204204
"id", currentCall.ID,
205205
"parentID", currentCall.ParentID,
206-
"toolID", currentCall.ToolID)
206+
"toolID", currentCall.ToolID,
207+
)
207208

208209
_, ok := d.callIDMap[currentCall.ID]
209210
if !ok {

pkg/monitor/fd.go

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
package monitor
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"os"
7+
"strconv"
8+
"strings"
9+
"sync"
10+
"time"
11+
12+
"github.com/gptscript-ai/gptscript/pkg/runner"
13+
"github.com/gptscript-ai/gptscript/pkg/types"
14+
)
15+
16+
type Event struct {
17+
runner.Event `json:",inline"`
18+
Program *types.Program `json:"program,omitempty"`
19+
Input string `json:"input,omitempty"`
20+
Output string `json:"output,omitempty"`
21+
Err string `json:"err,omitempty"`
22+
}
23+
24+
type fileFactory struct {
25+
file *os.File
26+
}
27+
28+
// NewFileFactory creates a new monitor factory that writes events to the location specified.
29+
// The location can be one of three things:
30+
// 1. a file descriptor/handle in the form "fd://2"
31+
// 2. a file name
32+
// 3. a named pipe in the form "\\.\pipe\my-pipe"
33+
func NewFileFactory(loc string) (runner.MonitorFactory, error) {
34+
var (
35+
file *os.File
36+
err error
37+
)
38+
39+
if strings.HasPrefix(loc, "fd://") {
40+
fd, err := strconv.Atoi(strings.TrimPrefix(loc, "fd://"))
41+
if err != nil {
42+
return nil, err
43+
}
44+
45+
file = os.NewFile(uintptr(fd), "events")
46+
} else {
47+
file, err = os.OpenFile(loc, os.O_WRONLY|os.O_CREATE, 0)
48+
if err != nil {
49+
return nil, err
50+
}
51+
}
52+
53+
return &fileFactory{
54+
file: file,
55+
}, nil
56+
}
57+
58+
func (s fileFactory) Start(_ context.Context, prg *types.Program, env []string, input string) (runner.Monitor, error) {
59+
fd := &fd{
60+
prj: prg,
61+
env: env,
62+
input: input,
63+
file: s.file,
64+
}
65+
66+
fd.event(Event{
67+
Event: runner.Event{
68+
Time: time.Now(),
69+
Type: "runStart",
70+
},
71+
Program: prg,
72+
})
73+
74+
return fd, nil
75+
}
76+
77+
type fd struct {
78+
prj *types.Program
79+
env []string
80+
input string
81+
file *os.File
82+
runLock sync.Mutex
83+
}
84+
85+
func (f *fd) Event(event runner.Event) {
86+
f.event(Event{
87+
Event: event,
88+
Input: f.input,
89+
})
90+
}
91+
92+
func (f *fd) event(event Event) {
93+
f.runLock.Lock()
94+
defer f.runLock.Unlock()
95+
b, err := json.Marshal(event)
96+
if err != nil {
97+
log.Errorf("Failed to marshal event: %v", err)
98+
return
99+
}
100+
101+
if _, err = f.file.Write(append(b, '\n', '\n')); err != nil {
102+
log.Errorf("Failed to write event to file: %v", err)
103+
}
104+
}
105+
106+
func (f *fd) Stop(output string, err error) {
107+
e := Event{
108+
Event: runner.Event{
109+
Time: time.Now(),
110+
Type: "runFinish",
111+
},
112+
Input: f.input,
113+
Output: output,
114+
}
115+
if err != nil {
116+
e.Err = err.Error()
117+
}
118+
119+
f.event(e)
120+
if err = f.file.Close(); err != nil {
121+
log.Errorf("Failed to close file: %v", err)
122+
}
123+
}
124+
125+
func (f *fd) Pause() func() {
126+
f.runLock.Lock()
127+
return func() {
128+
f.runLock.Unlock()
129+
}
130+
}

0 commit comments

Comments
 (0)