Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Access] Refactor access subscriptions #7105

Open
wants to merge 1 commit into
base: peter/refactor-access-models
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -202,12 +202,12 @@ generate-mocks: install-mock-generators
mockery --name 'Blocks' --dir="./access/validator" --case=underscore --output="./access/validator/mock" --outpkg="mock"
mockery --name 'API' --dir="./engine/protocol" --case=underscore --output="./engine/protocol/mock" --outpkg="mock"
mockery --name '.*' --dir="./engine/access/state_stream" --case=underscore --output="./engine/access/state_stream/mock" --outpkg="mock"
mockery --name 'BlockTracker' --dir="./engine/access/subscription" --case=underscore --output="./engine/access/subscription/mock" --outpkg="mock"
mockery --name 'BlockTracker' --dir="./engine/access/subscription/tracker" --case=underscore --output="./engine/access/subscription/tracker/mock" --outpkg="mock"
mockery --name 'ExecutionDataTracker' --dir="./engine/access/subscription/tracker" --case=underscore --output="./engine/access/subscription/tracker/mock" --outpkg="mock"
mockery --name 'DataProvider' --dir="./engine/access/rest/websockets/data_providers" --case=underscore --output="./engine/access/rest/websockets/data_providers/mock" --outpkg="mock"
mockery --name 'DataProviderFactory' --dir="./engine/access/rest/websockets/data_providers" --case=underscore --output="./engine/access/rest/websockets/data_providers/mock" --outpkg="mock"
mockery --name 'LinkGenerator' --dir="./engine/access/rest/common/models" --case=underscore --output="./engine/access/rest/common/models/mock" --outpkg="mock"
mockery --name 'WebsocketConnection' --dir="./engine/access/rest/websockets" --case=underscore --output="./engine/access/rest/websockets/mock" --outpkg="mock"
mockery --name 'ExecutionDataTracker' --dir="./engine/access/subscription" --case=underscore --output="./engine/access/subscription/mock" --outpkg="mock"
mockery --name 'ConnectionFactory' --dir="./engine/access/rpc/connection" --case=underscore --output="./engine/access/rpc/connection/mock" --outpkg="mock"
mockery --name 'Communicator' --dir="./engine/access/rpc/backend" --case=underscore --output="./engine/access/rpc/backend/mock" --outpkg="mock"
mockery --name '.*' --dir=model/fingerprint --case=underscore --output="./model/fingerprint/mock" --outpkg="mock"
Expand Down
5 changes: 3 additions & 2 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ import (
"github.com/onflow/flow-go/engine/access/state_stream"
statestreambackend "github.com/onflow/flow-go/engine/access/state_stream/backend"
"github.com/onflow/flow-go/engine/access/subscription"
subscriptiontracker "github.com/onflow/flow-go/engine/access/subscription/tracker"
followereng "github.com/onflow/flow-go/engine/common/follower"
"github.com/onflow/flow-go/engine/common/requester"
commonrpc "github.com/onflow/flow-go/engine/common/rpc"
Expand Down Expand Up @@ -1068,7 +1069,7 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
useIndex := builder.executionDataIndexingEnabled &&
eventQueryMode != backend.IndexQueryModeExecutionNodesOnly

executionDataTracker := subscription.NewExecutionDataTracker(
executionDataTracker := subscriptiontracker.NewExecutionDataTracker(
builder.Logger,
node.State,
builder.executionDataConfig.InitialBlockHeight,
Expand Down Expand Up @@ -1981,7 +1982,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
broadcaster := engine.NewBroadcaster()
// create BlockTracker that will track for new blocks (finalized and sealed) and
// handles block-related operations.
blockTracker, err := subscription.NewBlockTracker(
blockTracker, err := subscriptiontracker.NewBlockTracker(
node.State,
builder.FinalizedRootBlock.Header.Height,
node.Storage.Headers,
Expand Down
5 changes: 3 additions & 2 deletions cmd/observer/node_builder/observer_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
"github.com/onflow/flow-go/engine/access/state_stream"
statestreambackend "github.com/onflow/flow-go/engine/access/state_stream/backend"
"github.com/onflow/flow-go/engine/access/subscription"
subscriptiontracker "github.com/onflow/flow-go/engine/access/subscription/tracker"
"github.com/onflow/flow-go/engine/common/follower"
commonrpc "github.com/onflow/flow-go/engine/common/rpc"
"github.com/onflow/flow-go/engine/common/stop"
Expand Down Expand Up @@ -1529,7 +1530,7 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS
useIndex := builder.executionDataIndexingEnabled &&
eventQueryMode != backend.IndexQueryModeExecutionNodesOnly

executionDataTracker := subscription.NewExecutionDataTracker(
executionDataTracker := subscriptiontracker.NewExecutionDataTracker(
builder.Logger,
node.State,
builder.executionDataConfig.InitialBlockHeight,
Expand Down Expand Up @@ -1888,7 +1889,7 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() {
broadcaster := engine.NewBroadcaster()
// create BlockTracker that will track for new blocks (finalized and sealed) and
// handles block-related operations.
blockTracker, err := subscription.NewBlockTracker(
blockTracker, err := subscriptiontracker.NewBlockTracker(
node.State,
builder.FinalizedRootBlock.Header.Height,
node.Storage.Headers,
Expand Down
5 changes: 3 additions & 2 deletions engine/access/integration_unsecure_grpc_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/onflow/flow-go/engine/access/state_stream"
statestreambackend "github.com/onflow/flow-go/engine/access/state_stream/backend"
"github.com/onflow/flow-go/engine/access/subscription"
"github.com/onflow/flow-go/engine/access/subscription/tracker"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/blobs"
"github.com/onflow/flow-go/module/execution"
Expand Down Expand Up @@ -62,7 +63,7 @@ type SameGRPCPortTestSuite struct {
metrics *metrics.NoopCollector
rpcEng *rpc.Engine
stateStreamEng *statestreambackend.Engine
executionDataTracker subscription.ExecutionDataTracker
executionDataTracker tracker.ExecutionDataTracker

// storage
blocks *storagemock.Blocks
Expand Down Expand Up @@ -253,7 +254,7 @@ func (suite *SameGRPCPortTestSuite) SetupTest() {

eventIndexer := index.NewEventsIndex(index.NewReporter(), suite.events)

suite.executionDataTracker = subscription.NewExecutionDataTracker(
suite.executionDataTracker = tracker.NewExecutionDataTracker(
suite.log,
suite.state,
rootBlock.Header.Height,
Expand Down
5 changes: 3 additions & 2 deletions engine/access/rpc/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/onflow/flow-go/engine/access/index"
"github.com/onflow/flow-go/engine/access/rpc/connection"
"github.com/onflow/flow-go/engine/access/subscription"
"github.com/onflow/flow-go/engine/access/subscription/tracker"
"github.com/onflow/flow-go/engine/common/rpc"
commonrpc "github.com/onflow/flow-go/engine/common/rpc"
"github.com/onflow/flow-go/engine/common/version"
Expand Down Expand Up @@ -74,7 +75,7 @@ type Backend struct {
executionReceipts storage.ExecutionReceipts
connFactory connection.ConnectionFactory

BlockTracker subscription.BlockTracker
BlockTracker tracker.BlockTracker
stateParams protocol.Params
versionControl *version.VersionControl
}
Expand Down Expand Up @@ -103,7 +104,7 @@ type Params struct {
ScriptExecutionMode IndexQueryMode
CheckPayerBalanceMode validator.PayerBalanceMode
EventQueryMode IndexQueryMode
BlockTracker subscription.BlockTracker
BlockTracker tracker.BlockTracker
SubscriptionHandler *subscription.SubscriptionHandler

EventsIndex *index.EventsIndex
Expand Down
3 changes: 2 additions & 1 deletion engine/access/rpc/backend/backend_stream_blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/rs/zerolog"

"github.com/onflow/flow-go/engine/access/subscription"
"github.com/onflow/flow-go/engine/access/subscription/tracker"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/state/protocol"
"github.com/onflow/flow-go/storage"
Expand All @@ -22,7 +23,7 @@ type backendSubscribeBlocks struct {
headers storage.Headers

subscriptionHandler *subscription.SubscriptionHandler
blockTracker subscription.BlockTracker
blockTracker tracker.BlockTracker
}

// SubscribeBlocksFromStartBlockID subscribes to the finalized or sealed blocks starting at the requested
Expand Down
11 changes: 6 additions & 5 deletions engine/access/rpc/backend/backend_stream_blocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ import (
"github.com/onflow/flow-go/engine"
connectionmock "github.com/onflow/flow-go/engine/access/rpc/connection/mock"
"github.com/onflow/flow-go/engine/access/subscription"
subscriptionmock "github.com/onflow/flow-go/engine/access/subscription/mock"
"github.com/onflow/flow-go/engine/access/subscription/tracker"
trackermock "github.com/onflow/flow-go/engine/access/subscription/tracker/mock"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/metrics"
protocol "github.com/onflow/flow-go/state/protocol/mock"
Expand All @@ -38,8 +39,8 @@ type BackendBlocksSuite struct {

blocks *storagemock.Blocks
headers *storagemock.Headers
blockTracker *subscriptionmock.BlockTracker
blockTrackerReal subscription.BlockTracker
blockTracker *trackermock.BlockTracker
blockTrackerReal tracker.BlockTracker

connectionFactory *connectionmock.ConnectionFactory

Expand Down Expand Up @@ -82,7 +83,7 @@ func (s *BackendBlocksSuite) SetupTest() {
s.headers = new(storagemock.Headers)
s.chainID = flow.Testnet
s.connectionFactory = connectionmock.NewConnectionFactory(s.T())
s.blockTracker = subscriptionmock.NewBlockTracker(s.T())
s.blockTracker = trackermock.NewBlockTracker(s.T())

s.broadcaster = engine.NewBroadcaster()

Expand Down Expand Up @@ -135,7 +136,7 @@ func (s *BackendBlocksSuite) SetupTest() {
require.NoError(s.T(), err)

// create real block tracker to use GetStartHeight from it, instead of mocking
s.blockTrackerReal, err = subscription.NewBlockTracker(
s.blockTrackerReal, err = tracker.NewBlockTracker(
s.state,
s.rootBlock.Header.Height,
s.headers,
Expand Down
3 changes: 2 additions & 1 deletion engine/access/rpc/backend/backend_stream_transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/rs/zerolog"

"github.com/onflow/flow-go/engine/access/subscription"
"github.com/onflow/flow-go/engine/access/subscription/tracker"
"github.com/onflow/flow-go/engine/common/rpc"
accessmodel "github.com/onflow/flow-go/model/access"
"github.com/onflow/flow-go/model/flow"
Expand All @@ -32,7 +33,7 @@ type backendSubscribeTransactions struct {
log zerolog.Logger

subscriptionHandler *subscription.SubscriptionHandler
blockTracker subscription.BlockTracker
blockTracker tracker.BlockTracker
sendTransaction sendTransaction
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
backendmock "github.com/onflow/flow-go/engine/access/rpc/backend/mock"
connectionmock "github.com/onflow/flow-go/engine/access/rpc/connection/mock"
"github.com/onflow/flow-go/engine/access/subscription"
subscriptionmock "github.com/onflow/flow-go/engine/access/subscription/mock"
trackermock "github.com/onflow/flow-go/engine/access/subscription/tracker/mock"
"github.com/onflow/flow-go/engine/common/rpc/convert"
accessmodel "github.com/onflow/flow-go/model/access"
"github.com/onflow/flow-go/model/flow"
Expand Down Expand Up @@ -67,7 +67,7 @@ type TransactionStatusSuite struct {

connectionFactory *connectionmock.ConnectionFactory
communicator *backendmock.Communicator
blockTracker *subscriptionmock.BlockTracker
blockTracker *trackermock.BlockTracker
reporter *syncmock.IndexReporter
indexReporter *index.Reporter

Expand Down Expand Up @@ -121,7 +121,7 @@ func (s *TransactionStatusSuite) SetupTest() {
s.connectionFactory = connectionmock.NewConnectionFactory(s.T())
s.communicator = backendmock.NewCommunicator(s.T())
s.broadcaster = engine.NewBroadcaster()
s.blockTracker = subscriptionmock.NewBlockTracker(s.T())
s.blockTracker = trackermock.NewBlockTracker(s.T())
s.resultsMap = map[flow.Identifier]*flow.ExecutionResult{}

s.colClient.On(
Expand Down
37 changes: 27 additions & 10 deletions engine/access/rpc/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1069,7 +1069,7 @@ func (h *Handler) SubscribeBlocksFromStartBlockID(request *accessproto.Subscribe
}

sub := h.api.SubscribeBlocksFromStartBlockID(stream.Context(), startBlockID, blockStatus)
return subscription.HandleRPCSubscription(sub, h.handleBlocksResponse(stream.Send, request.GetFullBlockResponse(), blockStatus))
return HandleRPCSubscription(sub, h.handleBlocksResponse(stream.Send, request.GetFullBlockResponse(), blockStatus))
}

// SubscribeBlocksFromStartHeight handles subscription requests for blocks started from block height.
Expand All @@ -1096,7 +1096,7 @@ func (h *Handler) SubscribeBlocksFromStartHeight(request *accessproto.SubscribeB
}

sub := h.api.SubscribeBlocksFromStartHeight(stream.Context(), request.GetStartBlockHeight(), blockStatus)
return subscription.HandleRPCSubscription(sub, h.handleBlocksResponse(stream.Send, request.GetFullBlockResponse(), blockStatus))
return HandleRPCSubscription(sub, h.handleBlocksResponse(stream.Send, request.GetFullBlockResponse(), blockStatus))
}

// SubscribeBlocksFromLatest handles subscription requests for blocks started from latest sealed block.
Expand All @@ -1123,7 +1123,7 @@ func (h *Handler) SubscribeBlocksFromLatest(request *accessproto.SubscribeBlocks
}

sub := h.api.SubscribeBlocksFromLatest(stream.Context(), blockStatus)
return subscription.HandleRPCSubscription(sub, h.handleBlocksResponse(stream.Send, request.GetFullBlockResponse(), blockStatus))
return HandleRPCSubscription(sub, h.handleBlocksResponse(stream.Send, request.GetFullBlockResponse(), blockStatus))
}

// handleBlocksResponse handles the subscription to block updates and sends
Expand Down Expand Up @@ -1182,7 +1182,7 @@ func (h *Handler) SubscribeBlockHeadersFromStartBlockID(request *accessproto.Sub
}

sub := h.api.SubscribeBlockHeadersFromStartBlockID(stream.Context(), startBlockID, blockStatus)
return subscription.HandleRPCSubscription(sub, h.handleBlockHeadersResponse(stream.Send))
return HandleRPCSubscription(sub, h.handleBlockHeadersResponse(stream.Send))
}

// SubscribeBlockHeadersFromStartHeight handles subscription requests for block headers started from block height.
Expand All @@ -1209,7 +1209,7 @@ func (h *Handler) SubscribeBlockHeadersFromStartHeight(request *accessproto.Subs
}

sub := h.api.SubscribeBlockHeadersFromStartHeight(stream.Context(), request.GetStartBlockHeight(), blockStatus)
return subscription.HandleRPCSubscription(sub, h.handleBlockHeadersResponse(stream.Send))
return HandleRPCSubscription(sub, h.handleBlockHeadersResponse(stream.Send))
}

// SubscribeBlockHeadersFromLatest handles subscription requests for block headers started from latest sealed block.
Expand All @@ -1236,7 +1236,7 @@ func (h *Handler) SubscribeBlockHeadersFromLatest(request *accessproto.Subscribe
}

sub := h.api.SubscribeBlockHeadersFromLatest(stream.Context(), blockStatus)
return subscription.HandleRPCSubscription(sub, h.handleBlockHeadersResponse(stream.Send))
return HandleRPCSubscription(sub, h.handleBlockHeadersResponse(stream.Send))
}

// handleBlockHeadersResponse handles the subscription to block updates and sends
Expand Down Expand Up @@ -1296,7 +1296,7 @@ func (h *Handler) SubscribeBlockDigestsFromStartBlockID(request *accessproto.Sub
}

sub := h.api.SubscribeBlockDigestsFromStartBlockID(stream.Context(), startBlockID, blockStatus)
return subscription.HandleRPCSubscription(sub, h.handleBlockDigestsResponse(stream.Send))
return HandleRPCSubscription(sub, h.handleBlockDigestsResponse(stream.Send))
}

// SubscribeBlockDigestsFromStartHeight handles subscription requests for lightweight blocks started from block height.
Expand All @@ -1323,7 +1323,7 @@ func (h *Handler) SubscribeBlockDigestsFromStartHeight(request *accessproto.Subs
}

sub := h.api.SubscribeBlockDigestsFromStartHeight(stream.Context(), request.GetStartBlockHeight(), blockStatus)
return subscription.HandleRPCSubscription(sub, h.handleBlockDigestsResponse(stream.Send))
return HandleRPCSubscription(sub, h.handleBlockDigestsResponse(stream.Send))
}

// SubscribeBlockDigestsFromLatest handles subscription requests for lightweight block started from latest sealed block.
Expand All @@ -1350,7 +1350,7 @@ func (h *Handler) SubscribeBlockDigestsFromLatest(request *accessproto.Subscribe
}

sub := h.api.SubscribeBlockDigestsFromLatest(stream.Context(), blockStatus)
return subscription.HandleRPCSubscription(sub, h.handleBlockDigestsResponse(stream.Send))
return HandleRPCSubscription(sub, h.handleBlockDigestsResponse(stream.Send))
}

// handleBlockDigestsResponse handles the subscription to block updates and sends
Expand Down Expand Up @@ -1431,7 +1431,7 @@ func (h *Handler) SendAndSubscribeTransactionStatuses(
sub := h.api.SendAndSubscribeTransactionStatuses(ctx, &tx, request.GetEventEncodingVersion())

messageIndex := counters.NewMonotonicCounter(0)
return subscription.HandleRPCSubscription(sub, func(txResults []*accessmodel.TransactionResult) error {
return HandleRPCSubscription(sub, func(txResults []*accessmodel.TransactionResult) error {
for i := range txResults {
index := messageIndex.Value()
if ok := messageIndex.Set(index + 1); !ok {
Expand Down Expand Up @@ -1569,3 +1569,20 @@ func checkBlockStatus(blockStatus flow.BlockStatus) error {
}
return nil
}

// HandleRPCSubscription is a generic handler for subscriptions to a specific type for rpc calls.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is now duplicated in 2 places since the logic is very simple.

//
// Parameters:
// - sub: The subscription.
// - handleResponse: The function responsible for handling the response of the subscribed type.
//
// Expected errors during normal operation:
// - codes.Internal: If the subscription encounters an error or gets an unexpected response.
func HandleRPCSubscription[T any](sub subscription.Subscription, handleResponse func(resp T) error) error {
err := subscription.HandleSubscription(sub, handleResponse)
if err != nil {
return rpc.ConvertError(err, "handle subscription error", codes.Internal)
}

return nil
}
5 changes: 3 additions & 2 deletions engine/access/state_stream/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/onflow/flow-go/engine/access/index"
"github.com/onflow/flow-go/engine/access/state_stream"
"github.com/onflow/flow-go/engine/access/subscription"
"github.com/onflow/flow-go/engine/access/subscription/tracker"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/execution"
"github.com/onflow/flow-go/module/executiondatasync/execution_data"
Expand Down Expand Up @@ -62,7 +63,7 @@ type Config struct {
type GetExecutionDataFunc func(context.Context, uint64) (*execution_data.BlockExecutionDataEntity, error)

type StateStreamBackend struct {
subscription.ExecutionDataTracker
tracker.ExecutionDataTracker

ExecutionDataBackend
EventsBackend
Expand Down Expand Up @@ -92,7 +93,7 @@ func New(
useEventsIndex bool,
registerIDsRequestLimit int,
subscriptionHandler *subscription.SubscriptionHandler,
executionDataTracker subscription.ExecutionDataTracker,
executionDataTracker tracker.ExecutionDataTracker,
) (*StateStreamBackend, error) {
logger := log.With().Str("module", "state_stream_api").Logger()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/onflow/flow-go/engine/access/state_stream"
"github.com/onflow/flow-go/engine/access/subscription"
"github.com/onflow/flow-go/engine/access/subscription/tracker"

"github.com/onflow/flow-go/fvm/errors"
"github.com/onflow/flow-go/model/flow"
Expand All @@ -25,7 +26,7 @@ type AccountStatusesBackend struct {
log zerolog.Logger
subscriptionHandler *subscription.SubscriptionHandler

executionDataTracker subscription.ExecutionDataTracker
executionDataTracker tracker.ExecutionDataTracker
eventsRetriever EventsRetriever
}

Expand Down
3 changes: 2 additions & 1 deletion engine/access/state_stream/backend/backend_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/onflow/flow-go/engine/access/state_stream"
"github.com/onflow/flow-go/engine/access/subscription"
"github.com/onflow/flow-go/engine/access/subscription/tracker"
"github.com/onflow/flow-go/fvm/errors"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/storage"
Expand All @@ -17,7 +18,7 @@ type EventsBackend struct {
log zerolog.Logger

subscriptionHandler *subscription.SubscriptionHandler
executionDataTracker subscription.ExecutionDataTracker
executionDataTracker tracker.ExecutionDataTracker
eventsRetriever EventsRetriever
}

Expand Down
Loading