Skip to content

Pass irrecoverable context to ws handler #7154

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion cmd/util/cmd/run-script/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
"github.com/onflow/flow-go/fvm/storage/snapshot"
"github.com/onflow/flow-go/ledger"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/irrecoverable"
"github.com/onflow/flow-go/module/metrics"
modutil "github.com/onflow/flow-go/module/util"
)

var ErrNotImplemented = errors.New("not implemented")
Expand Down Expand Up @@ -154,15 +156,26 @@
vm := fvm.NewVirtualMachine()

if flagServe {

api := &api{
chainID: chainID,
vm: vm,
ctx: ctx,
storageSnapshot: storageSnapshot,
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

irrCtx, errCh := irrecoverable.WithSignaler(ctx)
go func() {
err := modutil.WaitError(errCh, ctx)

Check failure on line 171 in cmd/util/cmd/run-script/cmd.go

View workflow job for this annotation

GitHub Actions / Lint (./)

cannot use ctx (variable of type "context".Context) as <-chan struct{} value in argument to modutil.WaitError (typecheck)

Check failure on line 171 in cmd/util/cmd/run-script/cmd.go

View workflow job for this annotation

GitHub Actions / Lint (./)

cannot use ctx (variable of type "context".Context) as <-chan struct{} value in argument to modutil.WaitError) (typecheck)

Check failure on line 171 in cmd/util/cmd/run-script/cmd.go

View workflow job for this annotation

GitHub Actions / Unit Tests (cmd)

cannot use ctx (variable of type "context".Context) as <-chan struct{} value in argument to modutil.WaitError

Check failure on line 171 in cmd/util/cmd/run-script/cmd.go

View workflow job for this annotation

GitHub Actions / Unit Tests (cmd)

cannot use ctx (variable of type "context".Context) as <-chan struct{} value in argument to modutil.WaitError
if err != nil {
log.Fatal().Err(err).Msg("server finished with error")
}
}()

server, err := rest.NewServer(
irrCtx,
api,
rest.Config{
ListenAddress: fmt.Sprintf(":%d", flagPort),
Expand Down
4 changes: 3 additions & 1 deletion engine/access/rest/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/onflow/flow-go/engine/access/state_stream/backend"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/irrecoverable"
)

// RouterBuilder is a utility for building HTTP routers with common middleware and routes.
Expand Down Expand Up @@ -93,12 +94,13 @@ func (b *RouterBuilder) AddLegacyWebsocketsRoutes(
}

func (b *RouterBuilder) AddWebsocketsRoute(
ctx irrecoverable.SignalerContext,
chain flow.Chain,
config websockets.Config,
maxRequestSize int64,
dataProviderFactory dp.DataProviderFactory,
) *RouterBuilder {
handler := websockets.NewWebSocketHandler(b.logger, config, chain, maxRequestSize, dataProviderFactory)
handler := websockets.NewWebSocketHandler(ctx, b.logger, config, chain, maxRequestSize, dataProviderFactory)
b.v1SubRouter.
Methods(http.MethodGet).
Path("/ws").
Expand Down
7 changes: 5 additions & 2 deletions engine/access/rest/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/onflow/flow-go/engine/access/state_stream/backend"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/irrecoverable"
)

const (
Expand All @@ -37,7 +38,9 @@ type Config struct {
}

// NewServer returns an HTTP server initialized with the REST API handler
func NewServer(serverAPI access.API,
func NewServer(
ctx irrecoverable.SignalerContext,
serverAPI access.API,
config Config,
logger zerolog.Logger,
chain flow.Chain,
Expand All @@ -63,7 +66,7 @@ func NewServer(serverAPI access.API,
)

if enableNewWebsocketsStreamAPI {
builder.AddWebsocketsRoute(chain, wsConfig, config.MaxRequestSize, dataProviderFactory)
builder.AddWebsocketsRoute(ctx, chain, wsConfig, config.MaxRequestSize, dataProviderFactory)
}

c := cors.New(cors.Options{
Expand Down
12 changes: 10 additions & 2 deletions engine/access/rest/websockets/handler.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package websockets

import (
"context"
"net/http"

"github.com/gorilla/websocket"
Expand All @@ -10,11 +9,18 @@ import (
"github.com/onflow/flow-go/engine/access/rest/common"
dp "github.com/onflow/flow-go/engine/access/rest/websockets/data_providers"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/irrecoverable"
)

type Handler struct {
*common.HttpHandler

// ctx holds the irrecoverable context used to start the REST server
// typically we do not store contexts within a struct. it is necessary in this case
// because we need to pass an irrecoverable context into the API backend logic to
// handle exceptions, and we cannot use the request's context since the websocket
// connection lives longer than the request duration.
ctx irrecoverable.SignalerContext
logger zerolog.Logger
websocketConfig Config
dataProviderFactory dp.DataProviderFactory
Expand All @@ -23,13 +29,15 @@ type Handler struct {
var _ http.Handler = (*Handler)(nil)

func NewWebSocketHandler(
ctx irrecoverable.SignalerContext,
logger zerolog.Logger,
config Config,
chain flow.Chain,
maxRequestSize int64,
dataProviderFactory dp.DataProviderFactory,
) *Handler {
return &Handler{
ctx: ctx,
HttpHandler: common.NewHttpHandler(logger, chain, maxRequestSize),
websocketConfig: config,
logger: logger,
Expand Down Expand Up @@ -61,5 +69,5 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}

controller := NewWebSocketController(logger, h.websocketConfig, NewWebsocketConnection(conn), h.dataProviderFactory)
controller.HandleConnection(context.TODO())
controller.HandleConnection(h.ctx)
}
5 changes: 3 additions & 2 deletions engine/access/rpc/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,9 +242,8 @@ func (e *Engine) serveREST(ctx irrecoverable.SignalerContext, ready component.Re
return
}

e.log.Info().Str("rest_api_address", e.config.RestConfig.ListenAddress).Msg("starting REST server on address")

r, err := rest.NewServer(
ctx,
e.restHandler,
e.config.RestConfig,
e.log,
Expand All @@ -267,6 +266,8 @@ func (e *Engine) serveREST(ctx irrecoverable.SignalerContext, ready component.Re
return irrecoverable.WithSignalerContext(ctx, ctx)
}

e.log.Info().Str("rest_api_address", e.config.RestConfig.ListenAddress).Msg("starting REST server on address")

l, err := net.Listen("tcp", e.config.RestConfig.ListenAddress)
if err != nil {
e.log.Err(err).Msg("failed to start the REST server")
Expand Down
Loading