Skip to content

Commit a149dc5

Browse files
Merge branch 'master' into illia-malachyn/7040-fix-context-canceled-error-propagation
2 parents 772c40b + 4cabd39 commit a149dc5

37 files changed

+485
-364
lines changed

engine/access/rest/common/models/block.go

+72
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,68 @@ import (
88
const ExpandableFieldPayload = "payload"
99
const ExpandableExecutionResult = "execution_result"
1010

11+
func NewBlock(
12+
block *flow.Block,
13+
execResult *flow.ExecutionResult,
14+
link LinkGenerator,
15+
blockStatus flow.BlockStatus,
16+
expand map[string]bool,
17+
) (*Block, error) {
18+
self, err := SelfLink(block.ID(), link.BlockLink)
19+
if err != nil {
20+
return nil, err
21+
}
22+
23+
var result Block
24+
result.Header = NewBlockHeader(block.Header)
25+
26+
// add the payload to the response if it is specified as an expandable field
27+
result.Expandable = &BlockExpandable{}
28+
if expand[ExpandableFieldPayload] {
29+
var payload BlockPayload
30+
err := payload.Build(block.Payload)
31+
if err != nil {
32+
return nil, err
33+
}
34+
result.Payload = &payload
35+
} else {
36+
// else add the payload expandable link
37+
payloadExpandable, err := link.PayloadLink(block.ID())
38+
if err != nil {
39+
return nil, err
40+
}
41+
result.Expandable.Payload = payloadExpandable
42+
}
43+
44+
// execution result might not yet exist
45+
if execResult != nil {
46+
// add the execution result to the response if it is specified as an expandable field
47+
if expand[ExpandableExecutionResult] {
48+
var exeResult ExecutionResult
49+
err := exeResult.Build(execResult, link)
50+
if err != nil {
51+
return nil, err
52+
}
53+
result.ExecutionResult = &exeResult
54+
} else {
55+
// else add the execution result expandable link
56+
executionResultExpandable, err := link.ExecutionResultLink(execResult.ID())
57+
if err != nil {
58+
return nil, err
59+
}
60+
result.Expandable.ExecutionResult = executionResultExpandable
61+
}
62+
}
63+
64+
result.Links = self
65+
66+
var status BlockStatus
67+
status.Build(blockStatus)
68+
result.BlockStatus = &status
69+
70+
return &result, nil
71+
}
72+
1173
func (b *Block) Build(
1274
block *flow.Block,
1375
execResult *flow.ExecutionResult,
@@ -99,6 +161,16 @@ func (b *BlockPayload) Build(payload *flow.Payload) error {
99161
return nil
100162
}
101163

164+
func NewBlockHeader(header *flow.Header) *BlockHeader {
165+
return &BlockHeader{
166+
Id: header.ID().String(),
167+
ParentId: header.ParentID.String(),
168+
Height: util.FromUint(header.Height),
169+
Timestamp: header.Timestamp,
170+
ParentVoterSignature: util.ToBase64(header.ParentVoterSigData),
171+
}
172+
}
173+
102174
func (b *BlockHeader) Build(header *flow.Header) {
103175
b.Id = header.ID().String()
104176
b.ParentId = header.ParentID.String()

engine/access/rest/websockets/controller.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -576,7 +576,7 @@ func (c *Controller) writeResponse(ctx context.Context, response interface{}) {
576576
func wrapErrorMessage(code int, message string, action string, subscriptionID string) models.BaseMessageResponse {
577577
return models.BaseMessageResponse{
578578
SubscriptionID: subscriptionID,
579-
Error: models.ErrorMessage{
579+
Error: &models.ErrorMessage{
580580
Code: code,
581581
Message: message,
582582
},

engine/access/rest/websockets/data_providers/account_statuses_provider.go

+12-10
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ import (
77
"github.com/rs/zerolog"
88

99
"github.com/onflow/flow-go/engine/access/rest/http/request"
10-
"github.com/onflow/flow-go/engine/access/rest/websockets/models"
10+
"github.com/onflow/flow-go/engine/access/rest/websockets/data_providers/models"
11+
wsmodels "github.com/onflow/flow-go/engine/access/rest/websockets/models"
1112
"github.com/onflow/flow-go/engine/access/state_stream"
1213
"github.com/onflow/flow-go/engine/access/state_stream/backend"
1314
"github.com/onflow/flow-go/engine/access/subscription"
@@ -40,7 +41,7 @@ func NewAccountStatusesDataProvider(
4041
stateStreamApi state_stream.API,
4142
subscriptionID string,
4243
topic string,
43-
rawArguments models.Arguments,
44+
rawArguments wsmodels.Arguments,
4445
send chan<- interface{},
4546
chain flow.Chain,
4647
eventFilterConfig state_stream.EventFilterConfig,
@@ -114,14 +115,15 @@ func (p *AccountStatusesDataProvider) sendResponse(
114115
return nil
115116
}
116117

117-
var accountStatusesPayload models.AccountStatusesResponse
118-
accountStatusesPayload.Build(response, messageIndex.Value())
119-
messageIndex.Increment()
120-
121-
var resp models.BaseDataProvidersResponse
122-
resp.Build(p.ID(), p.Topic(), &accountStatusesPayload)
123-
118+
accountStatusesPayload := models.NewAccountStatusesResponse(response, messageIndex.Value())
119+
resp := models.BaseDataProvidersResponse{
120+
SubscriptionID: p.ID(),
121+
Topic: p.Topic(),
122+
Payload: accountStatusesPayload,
123+
}
124124
p.send <- &resp
125+
126+
messageIndex.Increment()
125127
*blocksSinceLastMessage = 0
126128

127129
return nil
@@ -142,7 +144,7 @@ func (p *AccountStatusesDataProvider) createAndStartSubscription(ctx context.Con
142144

143145
// parseAccountStatusesArguments validates and initializes the account statuses arguments.
144146
func parseAccountStatusesArguments(
145-
arguments models.Arguments,
147+
arguments wsmodels.Arguments,
146148
chain flow.Chain,
147149
eventFilterConfig state_stream.EventFilterConfig,
148150
defaultHeartbeatInterval uint64,

engine/access/rest/websockets/data_providers/account_statuses_provider_test.go

+9-14
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@ import (
1212
"github.com/stretchr/testify/require"
1313
"github.com/stretchr/testify/suite"
1414

15-
"github.com/onflow/flow-go/engine/access/rest/websockets/models"
15+
"github.com/onflow/flow-go/engine/access/rest/websockets/data_providers/models"
16+
wsmodels "github.com/onflow/flow-go/engine/access/rest/websockets/models"
1617
"github.com/onflow/flow-go/engine/access/state_stream"
1718
"github.com/onflow/flow-go/engine/access/state_stream/backend"
1819
ssmock "github.com/onflow/flow-go/engine/access/state_stream/mock"
@@ -95,7 +96,7 @@ func (s *AccountStatusesProviderSuite) TestAccountStatusesDataProvider_StateStre
9596
nil,
9697
"dummy-id",
9798
topic,
98-
models.Arguments{},
99+
wsmodels.Arguments{},
99100
send,
100101
s.chain,
101102
state_stream.DefaultEventFilterConfig,
@@ -114,7 +115,7 @@ func (s *AccountStatusesProviderSuite) subscribeAccountStatusesDataProviderTestC
114115
return []testType{
115116
{
116117
name: "SubscribeAccountStatusesFromStartBlockID happy path",
117-
arguments: models.Arguments{
118+
arguments: wsmodels.Arguments{
118119
"start_block_id": s.rootBlock.ID().String(),
119120
"event_types": []string{string(flow.EventAccountCreated)},
120121
"account_addresses": []string{unittest.AddressFixture().String()},
@@ -131,7 +132,7 @@ func (s *AccountStatusesProviderSuite) subscribeAccountStatusesDataProviderTestC
131132
},
132133
{
133134
name: "SubscribeAccountStatusesFromStartHeight happy path",
134-
arguments: models.Arguments{
135+
arguments: wsmodels.Arguments{
135136
"start_block_height": strconv.FormatUint(s.rootBlock.Header.Height, 10),
136137
"event_types": []string{string(flow.EventAccountCreated)},
137138
"account_addresses": []string{unittest.AddressFixture().String()},
@@ -148,7 +149,7 @@ func (s *AccountStatusesProviderSuite) subscribeAccountStatusesDataProviderTestC
148149
},
149150
{
150151
name: "SubscribeAccountStatusesFromLatestBlock happy path",
151-
arguments: models.Arguments{
152+
arguments: wsmodels.Arguments{
152153
"event_types": []string{string(flow.EventAccountCreated)},
153154
"account_addresses": []string{unittest.AddressFixture().String()},
154155
},
@@ -188,12 +189,10 @@ func (s *AccountStatusesProviderSuite) expectedAccountStatusesResponses(backendR
188189
expectedResponses := make([]interface{}, len(backendResponses))
189190

190191
for i, resp := range backendResponses {
191-
var expectedResponsePayload models.AccountStatusesResponse
192-
expectedResponsePayload.Build(resp, uint64(i))
193-
192+
expectedResponsePayload := models.NewAccountStatusesResponse(resp, uint64(i))
194193
expectedResponses[i] = &models.BaseDataProvidersResponse{
195194
Topic: AccountStatusesTopic,
196-
Payload: &expectedResponsePayload,
195+
Payload: expectedResponsePayload,
197196
}
198197
}
199198

@@ -203,10 +202,6 @@ func (s *AccountStatusesProviderSuite) expectedAccountStatusesResponses(backendR
203202
// TestAccountStatusesDataProvider_InvalidArguments tests the behavior of the account statuses data provider
204203
// when invalid arguments are provided. It verifies that appropriate errors are returned
205204
// for missing or conflicting arguments.
206-
// This test covers the test cases:
207-
// 1. Providing both 'start_block_id' and 'start_block_height' simultaneously.
208-
// 2. Invalid 'start_block_id' argument.
209-
// 3. Invalid 'start_block_height' argument.
210205
func (s *AccountStatusesProviderSuite) TestAccountStatusesDataProvider_InvalidArguments() {
211206
send := make(chan interface{})
212207
topic := AccountStatusesTopic
@@ -335,7 +330,7 @@ func invalidAccountStatusesArgumentsTestCases() []testErrType {
335330
return []testErrType{
336331
{
337332
name: "provide both 'start_block_id' and 'start_block_height' arguments",
338-
arguments: models.Arguments{
333+
arguments: wsmodels.Arguments{
339334
"start_block_id": unittest.BlockFixture().ID().String(),
340335
"start_block_height": fmt.Sprintf("%d", unittest.BlockFixture().Header.Height),
341336
"event_types": []string{state_stream.CoreEventAccountCreated},

engine/access/rest/websockets/data_providers/base_provider.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88
"github.com/rs/zerolog"
99

1010
"github.com/onflow/flow-go/access"
11-
"github.com/onflow/flow-go/engine/access/rest/websockets/models"
11+
wsmodels "github.com/onflow/flow-go/engine/access/rest/websockets/models"
1212
"github.com/onflow/flow-go/engine/access/subscription"
1313
)
1414

@@ -18,7 +18,7 @@ type baseDataProvider struct {
1818
api access.API
1919
subscriptionID string
2020
topic string
21-
rawArguments models.Arguments
21+
rawArguments wsmodels.Arguments
2222
doneOnce sync.Once
2323
done chan struct{}
2424
send chan<- interface{}
@@ -43,7 +43,7 @@ func newBaseDataProvider(
4343
api access.API,
4444
subscriptionID string,
4545
topic string,
46-
rawArguments models.Arguments,
46+
rawArguments wsmodels.Arguments,
4747
send chan<- interface{},
4848
) *baseDataProvider {
4949
return &baseDataProvider{
@@ -70,7 +70,7 @@ func (b *baseDataProvider) Topic() string {
7070
}
7171

7272
// Arguments returns the arguments associated with the data provider.
73-
func (b *baseDataProvider) Arguments() models.Arguments {
73+
func (b *baseDataProvider) Arguments() wsmodels.Arguments {
7474
return b.rawArguments
7575
}
7676

engine/access/rest/websockets/data_providers/block_digests_provider.go

+9-7
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ import (
88

99
"github.com/onflow/flow-go/access"
1010
"github.com/onflow/flow-go/engine/access/rest/http/request"
11-
"github.com/onflow/flow-go/engine/access/rest/websockets/models"
11+
"github.com/onflow/flow-go/engine/access/rest/websockets/data_providers/models"
12+
wsmodels "github.com/onflow/flow-go/engine/access/rest/websockets/models"
1213
"github.com/onflow/flow-go/engine/access/subscription"
1314
"github.com/onflow/flow-go/model/flow"
1415
)
@@ -28,7 +29,7 @@ func NewBlockDigestsDataProvider(
2829
api access.API,
2930
subscriptionID string,
3031
topic string,
31-
rawArguments models.Arguments,
32+
rawArguments wsmodels.Arguments,
3233
send chan<- interface{},
3334
) (*BlockDigestsDataProvider, error) {
3435
args, err := parseBlocksArguments(rawArguments)
@@ -64,11 +65,12 @@ func (p *BlockDigestsDataProvider) Run(ctx context.Context) error {
6465
p.baseDataProvider.done,
6566
p.subscriptionState.subscription,
6667
func(b *flow.BlockDigest) error {
67-
var blockDigest models.BlockDigest
68-
blockDigest.Build(b)
69-
70-
var response models.BaseDataProvidersResponse
71-
response.Build(p.ID(), p.Topic(), &blockDigest)
68+
blockDigest := models.NewBlockDigest(b)
69+
response := models.BaseDataProvidersResponse{
70+
SubscriptionID: p.ID(),
71+
Topic: p.Topic(),
72+
Payload: blockDigest,
73+
}
7274
p.send <- &response
7375

7476
return nil

engine/access/rest/websockets/data_providers/block_digests_provider_test.go

+7-13
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ import (
88
"github.com/stretchr/testify/suite"
99

1010
"github.com/onflow/flow-go/engine/access/rest/common/parser"
11-
"github.com/onflow/flow-go/engine/access/rest/websockets/models"
11+
"github.com/onflow/flow-go/engine/access/rest/websockets/data_providers/models"
12+
wsmodels "github.com/onflow/flow-go/engine/access/rest/websockets/models"
1213
statestreamsmock "github.com/onflow/flow-go/engine/access/state_stream/mock"
1314
"github.com/onflow/flow-go/model/flow"
1415
)
@@ -51,20 +52,17 @@ func (s *BlockDigestsProviderSuite) validBlockDigestsArgumentsTestCases() []test
5152
expectedResponses := make([]interface{}, len(s.blocks))
5253
for i, b := range s.blocks {
5354
blockDigest := flow.NewBlockDigest(b.Header.ID(), b.Header.Height, b.Header.Timestamp)
54-
55-
var block models.BlockDigest
56-
block.Build(blockDigest)
57-
55+
blockDigestPayload := models.NewBlockDigest(blockDigest)
5856
expectedResponses[i] = &models.BaseDataProvidersResponse{
5957
Topic: BlockDigestsTopic,
60-
Payload: &block,
58+
Payload: blockDigestPayload,
6159
}
6260
}
6361

6462
return []testType{
6563
{
6664
name: "happy path with start_block_id argument",
67-
arguments: models.Arguments{
65+
arguments: wsmodels.Arguments{
6866
"start_block_id": s.rootBlock.ID().String(),
6967
"block_status": parser.Finalized,
7068
},
@@ -80,7 +78,7 @@ func (s *BlockDigestsProviderSuite) validBlockDigestsArgumentsTestCases() []test
8078
},
8179
{
8280
name: "happy path with start_block_height argument",
83-
arguments: models.Arguments{
81+
arguments: wsmodels.Arguments{
8482
"start_block_height": strconv.FormatUint(s.rootBlock.Header.Height, 10),
8583
"block_status": parser.Finalized,
8684
},
@@ -96,7 +94,7 @@ func (s *BlockDigestsProviderSuite) validBlockDigestsArgumentsTestCases() []test
9694
},
9795
{
9896
name: "happy path without any start argument",
99-
arguments: models.Arguments{
97+
arguments: wsmodels.Arguments{
10098
"block_status": parser.Finalized,
10199
},
102100
setupBackend: func(sub *statestreamsmock.Subscription) {
@@ -123,10 +121,6 @@ func (s *BlocksProviderSuite) requireBlockDigest(actual interface{}, expected in
123121
// TestBlockDigestsDataProvider_InvalidArguments tests the behavior of the block digests data provider
124122
// when invalid arguments are provided. It verifies that appropriate errors are returned
125123
// for missing or conflicting arguments.
126-
// This test covers the test cases:
127-
// 1. Missing 'block_status' argument.
128-
// 2. Invalid 'block_status' argument.
129-
// 3. Providing both 'start_block_id' and 'start_block_height' simultaneously.
130124
func (s *BlockDigestsProviderSuite) TestBlockDigestsDataProvider_InvalidArguments() {
131125
send := make(chan interface{})
132126

0 commit comments

Comments
 (0)