Skip to content

Commit 93369df

Browse files
authored
Merge pull request #6737 from The-K-R-O-K/AndriiSlisarchuk/6573-pending-should-return-immediately
[Access] Implement subscribe transaction statuses by transaction ID
2 parents db2177b + c03a846 commit 93369df

File tree

12 files changed

+883
-693
lines changed

12 files changed

+883
-693
lines changed

access/api.go

Lines changed: 17 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -203,41 +203,20 @@ type API interface {
203203
//
204204
// If invalid parameters will be supplied SubscribeBlockDigestsFromLatest will return a failed subscription.
205205
SubscribeBlockDigestsFromLatest(ctx context.Context, blockStatus flow.BlockStatus) subscription.Subscription
206-
// SubscribeTransactionStatusesFromStartBlockID subscribes to transaction status updates for a given transaction ID.
207-
// Monitoring begins from the specified block ID. The subscription streams status updates until the transaction
208-
// reaches a final state (TransactionStatusSealed or TransactionStatusExpired). When the transaction reaches one of
209-
// these final statuses, the subscription will automatically terminate.
206+
// SubscribeTransactionStatuses subscribes to transaction status updates for a given transaction ID. Monitoring starts
207+
// from the latest block to obtain the current transaction status. If the transaction is already in the final state
208+
// ([flow.TransactionStatusSealed] or [flow.TransactionStatusExpired]), all statuses will be prepared and sent to the client
209+
// sequentially. If the transaction is not in the final state, the subscription will stream status updates until the transaction
210+
// reaches the final state. Once a final state is reached, the subscription will automatically terminate.
210211
//
211212
// Parameters:
212-
// - ctx: The context to manage the subscription's lifecycle, including cancellation.
213-
// - txID: The identifier of the transaction to monitor.
214-
// - startBlockID: The block ID from which to start monitoring.
215-
// - requiredEventEncodingVersion: The version of event encoding required for the subscription.
216-
SubscribeTransactionStatusesFromStartBlockID(ctx context.Context, txID flow.Identifier, startBlockID flow.Identifier, requiredEventEncodingVersion entities.EventEncodingVersion) subscription.Subscription
217-
// SubscribeTransactionStatusesFromStartHeight subscribes to transaction status updates for a given transaction ID.
218-
// Monitoring begins from the specified block height. The subscription streams status updates until the transaction
219-
// reaches a final state (TransactionStatusSealed or TransactionStatusExpired). When the transaction reaches one of
220-
// these final statuses, the subscription will automatically terminate.
221-
//
222-
// Parameters:
223-
// - ctx: The context to manage the subscription's lifecycle, including cancellation.
213+
// - ctx: Context to manage the subscription's lifecycle, including cancellation.
224214
// - txID: The unique identifier of the transaction to monitor.
225-
// - startHeight: The block height from which to start monitoring.
226215
// - requiredEventEncodingVersion: The version of event encoding required for the subscription.
227-
SubscribeTransactionStatusesFromStartHeight(ctx context.Context, txID flow.Identifier, startHeight uint64, requiredEventEncodingVersion entities.EventEncodingVersion) subscription.Subscription
228-
// SubscribeTransactionStatusesFromLatest subscribes to transaction status updates for a given transaction ID.
229-
// Monitoring begins from the latest block. The subscription streams status updates until the transaction
230-
// reaches a final state (TransactionStatusSealed or TransactionStatusExpired). When the transaction reaches one of
231-
// these final statuses, the subscription will automatically terminate.
232-
//
233-
// Parameters:
234-
// - ctx: The context to manage the subscription's lifecycle, including cancellation.
235-
// - txID: The unique identifier of the transaction to monitor.
236-
// - requiredEventEncodingVersion: The version of event encoding required for the subscription.
237-
SubscribeTransactionStatusesFromLatest(ctx context.Context, txID flow.Identifier, requiredEventEncodingVersion entities.EventEncodingVersion) subscription.Subscription
238-
// SendAndSubscribeTransactionStatuses sends a transaction to the network and subscribes to its status updates.
216+
SubscribeTransactionStatuses(ctx context.Context, txID flow.Identifier, requiredEventEncodingVersion entities.EventEncodingVersion) subscription.Subscription
217+
// SendAndSubscribeTransactionStatuses sends a transaction to the execution node and subscribes to its status updates.
239218
// Monitoring begins from the reference block saved in the transaction itself and streams status updates until the transaction
240-
// reaches a final state (TransactionStatusSealed or TransactionStatusExpired). Once a final status is reached, the subscription
219+
// reaches the final state ([flow.TransactionStatusSealed] or [flow.TransactionStatusExpired]). Once the final status has been reached, the subscription
241220
// automatically terminates.
242221
//
243222
// Parameters:
@@ -261,6 +240,14 @@ type TransactionResult struct {
261240
BlockHeight uint64
262241
}
263242

243+
func (r *TransactionResult) IsExecuted() bool {
244+
return r.Status == flow.TransactionStatusExecuted || r.Status == flow.TransactionStatusSealed
245+
}
246+
247+
func (r *TransactionResult) IsFinal() bool {
248+
return r.Status == flow.TransactionStatusSealed || r.Status == flow.TransactionStatusExpired
249+
}
250+
264251
func TransactionResultToMessage(result *TransactionResult) *access.TransactionResultResponse {
265252
return &access.TransactionResultResponse{
266253
Status: entities.TransactionStatus(result.Status),

access/mock/api.go

Lines changed: 3 additions & 43 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cmd/util/cmd/run-script/cmd.go

Lines changed: 5 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ import (
2727
"github.com/onflow/flow-go/module/metrics"
2828
)
2929

30+
var ErrNotImplemented = errors.New("not implemented")
31+
3032
var (
3133
flagPayloads string
3234
flagState string
@@ -533,35 +535,18 @@ func (*api) SubscribeBlockDigestsFromLatest(
533535
return nil
534536
}
535537

536-
func (a *api) SubscribeTransactionStatusesFromStartBlockID(
537-
_ context.Context,
538-
_ flow.Identifier,
539-
_ flow.Identifier,
540-
_ entities.EventEncodingVersion,
541-
) subscription.Subscription {
542-
return nil
543-
}
544-
545-
func (a *api) SubscribeTransactionStatusesFromStartHeight(
538+
func (a *api) SubscribeTransactionStatuses(
546539
_ context.Context,
547540
_ flow.Identifier,
548-
_ uint64,
549541
_ entities.EventEncodingVersion,
550542
) subscription.Subscription {
551-
return nil
543+
return subscription.NewFailedSubscription(ErrNotImplemented, "failed to call SubscribeTransactionStatuses")
552544
}
553545

554-
func (a *api) SubscribeTransactionStatusesFromLatest(
555-
_ context.Context,
556-
_ flow.Identifier,
557-
_ entities.EventEncodingVersion,
558-
) subscription.Subscription {
559-
return nil
560-
}
561546
func (a *api) SendAndSubscribeTransactionStatuses(
562547
_ context.Context,
563548
_ *flow.TransactionBody,
564549
_ entities.EventEncodingVersion,
565550
) subscription.Subscription {
566-
return nil
551+
return subscription.NewFailedSubscription(ErrNotImplemented, "failed to call SendAndSubscribeTransactionStatuses")
567552
}

engine/access/rest/websockets/data_providers/factory_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,9 +134,9 @@ func (s *DataProviderFactorySuite) TestSupportedTopics() {
134134
{
135135
name: "transaction statuses topic",
136136
topic: TransactionStatusesTopic,
137-
arguments: models.Arguments{},
137+
arguments: models.Arguments{"tx_id": unittest.IdentifierFixture().String()},
138138
setupSubscription: func() {
139-
s.setupSubscription(s.accessApi.On("SubscribeTransactionStatusesFromLatest", mock.Anything, mock.Anything, mock.Anything))
139+
s.setupSubscription(s.accessApi.On("SubscribeTransactionStatuses", mock.Anything, mock.Anything, mock.Anything))
140140
},
141141
assertExpectations: func() {
142142
s.stateStreamApi.AssertExpectations(s.T())

engine/access/rest/websockets/data_providers/transaction_statuses_provider.go

Lines changed: 15 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"github.com/onflow/flow-go/access"
1212
commonmodels "github.com/onflow/flow-go/engine/access/rest/common/models"
1313
"github.com/onflow/flow-go/engine/access/rest/common/parser"
14-
"github.com/onflow/flow-go/engine/access/rest/http/request"
1514
"github.com/onflow/flow-go/engine/access/rest/websockets/models"
1615
"github.com/onflow/flow-go/engine/access/subscription"
1716
"github.com/onflow/flow-go/model/flow"
@@ -20,13 +19,6 @@ import (
2019
"github.com/onflow/flow/protobuf/go/flow/entities"
2120
)
2221

23-
// transactionStatusesArguments contains the arguments required for subscribing to transaction statuses
24-
type transactionStatusesArguments struct {
25-
TxID flow.Identifier // ID of the transaction to monitor.
26-
StartBlockID flow.Identifier // ID of the block to start subscription from
27-
StartBlockHeight uint64 // Height of the block to start subscription from
28-
}
29-
3022
// TransactionStatusesDataProvider is responsible for providing tx statuses
3123
type TransactionStatusesDataProvider struct {
3224
*baseDataProvider
@@ -54,8 +46,8 @@ func NewTransactionStatusesDataProvider(
5446
linkGenerator: linkGenerator,
5547
}
5648

57-
// Initialize arguments passed to the provider.
58-
txStatusesArgs, err := parseTransactionStatusesArguments(arguments)
49+
// Initialize txID passed to the provider.
50+
txID, err := parseTransactionID(arguments)
5951
if err != nil {
6052
return nil, fmt.Errorf("invalid arguments for tx statuses data provider: %w", err)
6153
}
@@ -68,7 +60,7 @@ func NewTransactionStatusesDataProvider(
6860
arguments,
6961
cancel,
7062
send,
71-
p.createSubscription(subCtx, txStatusesArgs), // Set up a subscription to tx statuses based on arguments.
63+
p.createSubscription(subCtx, txID), // Set up a subscription to tx statuses based on arguments.
7264
)
7365

7466
return p, nil
@@ -84,17 +76,9 @@ func (p *TransactionStatusesDataProvider) Run() error {
8476
// createSubscription creates a new subscription using the specified input arguments.
8577
func (p *TransactionStatusesDataProvider) createSubscription(
8678
ctx context.Context,
87-
args transactionStatusesArguments,
79+
txID flow.Identifier,
8880
) subscription.Subscription {
89-
if args.StartBlockID != flow.ZeroID {
90-
return p.api.SubscribeTransactionStatusesFromStartBlockID(ctx, args.TxID, args.StartBlockID, entities.EventEncodingVersion_JSON_CDC_V0)
91-
}
92-
93-
if args.StartBlockHeight != request.EmptyHeight {
94-
return p.api.SubscribeTransactionStatusesFromStartHeight(ctx, args.TxID, args.StartBlockHeight, entities.EventEncodingVersion_JSON_CDC_V0)
95-
}
96-
97-
return p.api.SubscribeTransactionStatusesFromLatest(ctx, args.TxID, entities.EventEncodingVersion_JSON_CDC_V0)
81+
return p.api.SubscribeTransactionStatuses(ctx, txID, entities.EventEncodingVersion_JSON_CDC_V0)
9882
}
9983

10084
// handleResponse processes a tx statuses and sends the formatted response.
@@ -123,42 +107,30 @@ func (p *TransactionStatusesDataProvider) handleResponse() func(txResults []*acc
123107
}
124108
}
125109

126-
// parseAccountStatusesArguments validates and initializes the account statuses arguments.
127-
func parseTransactionStatusesArguments(
110+
// parseTransactionID validates and initializes the transaction ID argument.
111+
func parseTransactionID(
128112
arguments models.Arguments,
129-
) (transactionStatusesArguments, error) {
113+
) (flow.Identifier, error) {
130114
allowedFields := []string{
131-
"start_block_id",
132-
"start_block_height",
133115
"tx_id",
134116
}
135117
err := ensureAllowedFields(arguments, allowedFields)
136118
if err != nil {
137-
return transactionStatusesArguments{}, err
138-
}
139-
140-
var args transactionStatusesArguments
141-
142-
// Parse block arguments
143-
startBlockID, startBlockHeight, err := parseStartBlock(arguments)
144-
if err != nil {
145-
return transactionStatusesArguments{}, err
119+
return flow.ZeroID, err
146120
}
147-
args.StartBlockID = startBlockID
148-
args.StartBlockHeight = startBlockHeight
149121

150122
if txIDIn, ok := arguments["tx_id"]; ok && txIDIn != "" {
151123
result, ok := txIDIn.(string)
152124
if !ok {
153-
return transactionStatusesArguments{}, fmt.Errorf("'tx_id' must be a string")
125+
return flow.ZeroID, fmt.Errorf("'tx_id' must be a string")
154126
}
155-
var txID parser.ID
156-
err := txID.Parse(result)
127+
var txIDParsed parser.ID
128+
err := txIDParsed.Parse(result)
157129
if err != nil {
158-
return transactionStatusesArguments{}, fmt.Errorf("invalid 'tx_id': %w", err)
130+
return flow.ZeroID, fmt.Errorf("invalid 'tx_id': %w", err)
159131
}
160-
args.TxID = txID.Flow()
132+
return txIDParsed.Flow(), nil
161133
}
162134

163-
return args, nil
135+
return flow.ZeroID, fmt.Errorf("arguments are invalid")
164136
}

0 commit comments

Comments
 (0)