Skip to content

Commit e82e720

Browse files
authored
[#152]: feature: streaming support
2 parents 6990f12 + 298ce80 commit e82e720

File tree

4 files changed

+64
-44
lines changed

4 files changed

+64
-44
lines changed

common/interfaces.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ type Pool interface {
1515
// Workers returns worker list associated with the pool.
1616
Workers() (workers []*worker.Process)
1717
// Exec payload
18-
Exec(ctx context.Context, p *payload.Payload) (*payload.Payload, error)
18+
Exec(ctx context.Context, p *payload.Payload, stopCh chan struct{}) (chan *staticPool.PExec, error)
1919
// Reset kill all workers inside the watcher and replaces with new
2020
Reset(ctx context.Context) error
2121
// Destroy all underlying stack (but let them complete the task).

go.mod

+4-5
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ module github.com/roadrunner-server/http/v4
33
go 1.20
44

55
require (
6-
github.com/caddyserver/certmagic v0.19.0
6+
github.com/caddyserver/certmagic v0.19.1
77
github.com/goccy/go-json v0.10.2
88
github.com/google/go-cmp v0.5.9
99
github.com/mholt/acmez v1.2.0
@@ -12,13 +12,13 @@ require (
1212
github.com/roadrunner-server/endure/v2 v2.3.0
1313
github.com/roadrunner-server/errors v1.2.0
1414
github.com/roadrunner-server/goridge/v3 v3.6.3
15-
github.com/roadrunner-server/sdk/v4 v4.3.1
15+
github.com/roadrunner-server/sdk/v4 v4.4.0-beta.2
1616
github.com/stretchr/testify v1.8.4
1717
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.42.0
1818
go.opentelemetry.io/contrib/propagators/jaeger v1.17.0
1919
go.opentelemetry.io/otel v1.16.0
2020
go.opentelemetry.io/otel/trace v1.16.0
21-
go.uber.org/zap v1.24.0
21+
go.uber.org/zap v1.25.0
2222
golang.org/x/net v0.12.0
2323
golang.org/x/sys v0.10.0
2424
)
@@ -40,15 +40,14 @@ require (
4040
github.com/pmezard/go-difflib v1.0.0 // indirect
4141
github.com/prometheus/client_model v0.4.0 // indirect
4242
github.com/prometheus/common v0.44.0 // indirect
43-
github.com/prometheus/procfs v0.11.0 // indirect
43+
github.com/prometheus/procfs v0.11.1 // indirect
4444
github.com/roadrunner-server/tcplisten v1.3.0 // indirect
4545
github.com/shirou/gopsutil v3.21.11+incompatible // indirect
4646
github.com/tklauser/go-sysconf v0.3.11 // indirect
4747
github.com/tklauser/numcpus v0.6.1 // indirect
4848
github.com/yusufpapurcu/wmi v1.2.3 // indirect
4949
github.com/zeebo/blake3 v0.2.3 // indirect
5050
go.opentelemetry.io/otel/metric v1.16.0 // indirect
51-
go.uber.org/atomic v1.11.0 // indirect
5251
go.uber.org/multierr v1.11.0 // indirect
5352
golang.org/x/crypto v0.11.0 // indirect
5453
golang.org/x/mod v0.12.0 // indirect

go.sum

+10-23
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,14 @@
1-
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
1+
github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A=
22
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
33
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
4-
github.com/caddyserver/certmagic v0.19.0 h1:HuJ1Yf1H1jAfmBGrSSQN1XRkafnWcpDtyIiyMV6vmpM=
5-
github.com/caddyserver/certmagic v0.19.0/go.mod h1:fsL01NomQ6N+kE2j37ZCnig2MFosG+MIO4ztnmG/zz8=
4+
github.com/caddyserver/certmagic v0.19.1 h1:4jyOYm2DHvQI8YM0sk6qm62Gl5XznHxiMBMWjMTlQkw=
5+
github.com/caddyserver/certmagic v0.19.1/go.mod h1:fsL01NomQ6N+kE2j37ZCnig2MFosG+MIO4ztnmG/zz8=
66
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
77
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
88
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
99
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
1010
github.com/felixge/httpsnoop v1.0.3 h1:s/nj+GCswXYzN5v2DpNMuMQYe+0DDwt5WVCU6CWBdXk=
1111
github.com/felixge/httpsnoop v1.0.3/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
12-
github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
1312
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
1413
github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ=
1514
github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
@@ -28,9 +27,6 @@ github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
2827
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
2928
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
3029
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
31-
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
32-
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
33-
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
3430
github.com/klauspost/cpuid/v2 v2.0.12/go.mod h1:g2LTdtYhdyuGPqyWyv7qRAmj1WBqxuObKfj5c0PQa7c=
3531
github.com/klauspost/cpuid/v2 v2.2.5 h1:0E5MSMDEoAulmXNFquVs//DdoomxaoTY1kUhbc/qbZg=
3632
github.com/klauspost/cpuid/v2 v2.2.5/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
@@ -44,11 +40,6 @@ github.com/mholt/acmez v1.2.0 h1:1hhLxSgY5FvH5HCnGUuwbKY2VQVo8IU7rxXKSnZ7F30=
4440
github.com/mholt/acmez v1.2.0/go.mod h1:VT9YwH1xgNX1kmYY89gY8xPJC84BFAisjo8Egigt4kE=
4541
github.com/miekg/dns v1.1.55 h1:GoQ4hpsj0nFLYe+bWiCToyrBEJXkQfOOIvFGFy0lEgo=
4642
github.com/miekg/dns v1.1.55/go.mod h1:uInx36IzPl7FYnDcMeVWxj9byh7DutNykX4G9Sj60FY=
47-
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
48-
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
49-
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
50-
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
51-
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
5243
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
5344
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
5445
github.com/prometheus/client_golang v1.16.0 h1:yk/hx9hDbrGHovbci4BY+pRMfSuuat626eFsHb7tmT8=
@@ -57,8 +48,8 @@ github.com/prometheus/client_model v0.4.0 h1:5lQXD3cAg1OXBf4Wq03gTrXHeaV0TQvGfUo
5748
github.com/prometheus/client_model v0.4.0/go.mod h1:oMQmHW1/JoDwqLtg57MGgP/Fb1CJEYF2imWWhWtMkYU=
5849
github.com/prometheus/common v0.44.0 h1:+5BrQJwiBB9xsMygAB3TNvpQKOwlkc25LbISbrdOOfY=
5950
github.com/prometheus/common v0.44.0/go.mod h1:ofAIvZbQ1e/nugmZGz4/qCb9Ap1VoSTIO7x0VV9VvuY=
60-
github.com/prometheus/procfs v0.11.0 h1:5EAgkfkMl659uZPbe9AS2N68a7Cc1TJbPEuGzFuRbyk=
61-
github.com/prometheus/procfs v0.11.0/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM=
51+
github.com/prometheus/procfs v0.11.1 h1:xRC8Iq1yyca5ypa9n1EZnWZkt7dwcoRPQwX/5gwaUuI=
52+
github.com/prometheus/procfs v0.11.1/go.mod h1:eesXgaPo1q7lBpVMoMy0ZOFTth9hBn4W/y0/p/ScXhY=
6253
github.com/roadrunner-server/api/v4 v4.5.0 h1:OUAcCwLeQbgRj7E2/6M6W2nxOnbG6XYPSS6LjW6COAQ=
6354
github.com/roadrunner-server/api/v4 v4.5.0/go.mod h1:nzJvLrDMYT0K9hgPFmeL8dh6q2EvrJEaCHy2XRqz20c=
6455
github.com/roadrunner-server/endure/v2 v2.3.0 h1:ctsXL3BjcgHJ0kyO42B2QIaKeZa0modVV9jYx3qSxqo=
@@ -67,14 +58,13 @@ github.com/roadrunner-server/errors v1.2.0 h1:qBmNXt8Iex9QnYTjCkbJKsBZu2EtYkQCM0
6758
github.com/roadrunner-server/errors v1.2.0/go.mod h1:z0ECxZp/dDa5RahtMcy4mBIavVxiZ9vwE5kByl7kFtY=
6859
github.com/roadrunner-server/goridge/v3 v3.6.3 h1:8hCuPVK9BxIE4IGyNphK6KPAy9Kg6t5tHaItBIQKh2o=
6960
github.com/roadrunner-server/goridge/v3 v3.6.3/go.mod h1:hB5+lHhl8msuHrngjKQ+Wx8B705AU0/2DlYGFXbjtgU=
70-
github.com/roadrunner-server/sdk/v4 v4.3.1 h1:DwmyzcKbprXz6JLnyR4fbOmgSC0qr528xC4uSwxRVSY=
71-
github.com/roadrunner-server/sdk/v4 v4.3.1/go.mod h1:YiYFMLx2zVcDjy52P8i/c++VJIY/qaUSdboN0PiPGok=
61+
github.com/roadrunner-server/sdk/v4 v4.4.0-beta.2 h1:YsAJaS5Sdnw7Z9ULiejLh+g0MrMLgqE4yrZ/kduew/0=
62+
github.com/roadrunner-server/sdk/v4 v4.4.0-beta.2/go.mod h1:QcZBTccDGT8zhbHkbzqM7SORktVtvh6Jigkz3hy6kBk=
7263
github.com/roadrunner-server/tcplisten v1.3.0 h1:VDd6IbP8oIjm5vKvMVozeZgeHgOcoP0XYLOyOqcZHCY=
7364
github.com/roadrunner-server/tcplisten v1.3.0/go.mod h1:VR6Ob5am0oEuLMOeLiVvQxG9ShykAEgrlvZddX8EfoU=
7465
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
7566
github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI=
7667
github.com/shirou/gopsutil v3.21.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
77-
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
7868
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
7969
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
8070
github.com/tklauser/go-sysconf v0.3.11 h1:89WgdJhk5SNwJfu+GKyYveZ4IaJ7xAkecBo+KdJV0CM=
@@ -100,13 +90,11 @@ go.opentelemetry.io/otel/metric v1.16.0 h1:RbrpwVG1Hfv85LgnZ7+txXioPDoh6EdbZHo26
10090
go.opentelemetry.io/otel/metric v1.16.0/go.mod h1:QE47cpOmkwipPiefDwo2wDzwJrlfxxNYodqc4xnGCo4=
10191
go.opentelemetry.io/otel/trace v1.16.0 h1:8JRpaObFoW0pxuVPapkgH8UhHQj+bJW8jJsCZEu5MQs=
10292
go.opentelemetry.io/otel/trace v1.16.0/go.mod h1:Yt9vYq1SdNz3xdjZZK7wcXv1qv2pwLkqr2QVwea0ef0=
103-
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
104-
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
105-
go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI=
93+
go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk=
10694
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
10795
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
108-
go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60=
109-
go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg=
96+
go.uber.org/zap v1.25.0 h1:4Hvk6GtkucQ790dqmj7l1eEnRdKm3k3ZUrUMS2d5+5c=
97+
go.uber.org/zap v1.25.0/go.mod h1:JIAUzQIH94IC4fOJQm7gMmBJP5k7wQfdcnYdPoEXJYk=
11098
golang.org/x/crypto v0.11.0 h1:6Ewdq3tDic1mg5xRO4milcWCfMVQhI4NkqWWvqejpuA=
11199
golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIio=
112100
golang.org/x/mod v0.12.0 h1:rmsUpXtvNzj340zd98LZ4KntptpfRHwpFOHG188oHXc=
@@ -133,6 +121,5 @@ google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs
133121
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
134122
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
135123
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
136-
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
137124
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
138125
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

handler/handler.go

+49-15
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,10 @@ type Handler struct {
4646
gid int
4747

4848
// internal
49-
reqPool sync.Pool
50-
respPool sync.Pool
51-
pldPool sync.Pool
52-
errPool sync.Pool
49+
reqPool sync.Pool
50+
respPool sync.Pool
51+
pldPool sync.Pool
52+
stopChPool sync.Pool
5353
}
5454

5555
// NewHandler return handle interface implementation
@@ -69,9 +69,9 @@ func NewHandler(cfg *config.Config, pool common.Pool, log *zap.Logger) (*Handler
6969
uid: cfg.UID,
7070
gid: cfg.GID,
7171

72-
errPool: sync.Pool{
72+
stopChPool: sync.Pool{
7373
New: func() any {
74-
return make(chan error, 1)
74+
return make(chan struct{}, 1)
7575
},
7676
},
7777
reqPool: sync.Pool{
@@ -141,29 +141,48 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
141141
return
142142
}
143143

144-
wResp, err := h.pool.Exec(context.Background(), pld)
144+
stopCh := h.getCh()
145+
wResp, err := h.pool.Exec(context.Background(), pld, stopCh)
145146
if err != nil {
146147
req.Close(h.log, r)
147148
h.putReq(req)
148149
h.putPld(pld)
150+
h.putCh(stopCh)
149151
h.handleError(w, err)
150152
h.log.Error("execute", zap.Time("start", start), zap.Duration("elapsed", time.Since(start)), zap.Error(err))
151153
return
152154
}
153155

154-
err = h.Write(wResp, w)
155-
if err != nil {
156-
req.Close(h.log, r)
157-
h.putReq(req)
158-
h.putPld(pld)
159-
h.handleError(w, err)
160-
h.log.Error("write response error", zap.Time("start", start), zap.Duration("elapsed", time.Since(start)), zap.Error(err))
161-
return
156+
for recv := range wResp {
157+
if recv.Error() != nil {
158+
req.Close(h.log, r)
159+
h.putReq(req)
160+
h.putPld(pld)
161+
h.putCh(stopCh)
162+
h.handleError(w, err)
163+
h.log.Error("write response error", zap.Time("start", start), zap.Duration("elapsed", time.Since(start)), zap.Error(err))
164+
return
165+
}
166+
167+
err = h.Write(recv.Payload(), w)
168+
if err != nil {
169+
// send stop signal to the workers pool
170+
stopCh <- struct{}{}
171+
172+
req.Close(h.log, r)
173+
h.putReq(req)
174+
h.putPld(pld)
175+
h.handleError(w, err)
176+
h.log.Error("write response error", zap.Time("start", start), zap.Duration("elapsed", time.Since(start)), zap.Error(err))
177+
h.putCh(stopCh)
178+
return
179+
}
162180
}
163181

164182
h.putPld(pld)
165183
req.Close(h.log, r)
166184
h.putReq(req)
185+
h.putCh(stopCh)
167186
}
168187

169188
func (h *Handler) Dispose() {}
@@ -249,3 +268,18 @@ func (h *Handler) getPld() *payload.Payload {
249268
pld.Codec = frame.CodecJSON
250269
return pld
251270
}
271+
272+
func (h *Handler) getCh() chan struct{} {
273+
ch := h.stopChPool.Get().(chan struct{})
274+
// just check if the chan is not empty
275+
select {
276+
case <-ch:
277+
default:
278+
}
279+
280+
return ch
281+
}
282+
283+
func (h *Handler) putCh(ch chan struct{}) {
284+
h.stopChPool.Put(ch)
285+
}

0 commit comments

Comments
 (0)