Skip to content

Commit

Permalink
Merge pull request #245 from flow-hydraulics/latenssi/chain-offline-2
Browse files Browse the repository at this point in the history
Handle situation when chain is unreachable
  • Loading branch information
latenssi authored Dec 21, 2021
2 parents 9b20b3c + a530817 commit d4d172e
Show file tree
Hide file tree
Showing 16 changed files with 409 additions and 56 deletions.
40 changes: 35 additions & 5 deletions chain_events/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"strings"
"time"

wallet_errors "github.com/flow-hydraulics/flow-wallet-api/errors"
"github.com/flow-hydraulics/flow-wallet-api/system"
"github.com/onflow/flow-go-sdk"
"github.com/onflow/flow-go-sdk/client"
Expand Down Expand Up @@ -116,13 +117,24 @@ func (l *Listener) Start() *Listener {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

entry := log.WithFields(log.Fields{
"package": "chain_events",
"function": "Listener.Start.goroutine",
})

for {
select {
case <-l.done:
return
case <-l.ticker.C:
// Check for maintenance mode
if l.waitMaintenance() {
if halted, err := l.systemHalted(); err != nil {
entry.
WithFields(log.Fields{"error": err}).
Warn("Could not get system settings from DB")
continue
} else if halted {
entry.Debug("System halted")
continue
}

Expand Down Expand Up @@ -150,12 +162,27 @@ func (l *Listener) Start() *Listener {
})

if err != nil {
log.
if wallet_errors.IsChainConnectionError(err) {
// Unable to connect to chain, pause system.
if l.systemService != nil {
entry.Warn("Unable to connect to chain, pausing system")
if err := l.systemService.Pause(); err != nil {
entry.
WithFields(log.Fields{"error": err}).
Warn("Unable to pause system")
}
} else {
entry.Warn("Unable to connect to chain")
}
continue
}

entry.
WithFields(log.Fields{"error": err}).
Warn("Error while handling Flow events")

if strings.Contains(err.Error(), "key not found") {
log.Warn(`"key not found" error indicates data is not available at this height, please manually set correct starting height`)
entry.Warn(`"key not found" error indicates data is not available at this height, please manually set correct starting height`)
}
}
}
Expand Down Expand Up @@ -202,6 +229,9 @@ func (l *Listener) Stop() {
l.ticker = nil
}

func (l *Listener) waitMaintenance() bool {
return l.systemService != nil && l.systemService.IsMaintenanceMode()
func (l *Listener) systemHalted() (bool, error) {
if l.systemService != nil {
return l.systemService.IsHalted()
}
return false, nil
}
8 changes: 6 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,21 @@ services:
- redis

emulator:
image: gcr.io/flow-container-registry/emulator:v0.23.0
image: gcr.io/flow-container-registry/emulator:0.27.2
restart: unless-stopped
command: emulator -v
command: emulator -v --persist
ports:
- "3569:3569"
volumes:
- emulator_persist:/flowdb
env_file:
- ./.env
environment:
- FLOW_SERVICEPRIVATEKEY=${FLOW_WALLET_ADMIN_PRIVATE_KEY}
- FLOW_SERVICEKEYSIGALGO=ECDSA_P256
- FLOW_SERVICEKEYHASHALGO=SHA3_256
- FLOW_DBPATH=/flowdb

volumes:
redis_data:
emulator_persist:
30 changes: 30 additions & 0 deletions errors/errors.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
// Package errors provides an API for errors across the application.
package errors

import (
"net"

"github.com/onflow/flow-go-sdk/client"
"google.golang.org/grpc/codes"
)

type RequestError struct {
StatusCode int
Err error
Expand All @@ -9,3 +16,26 @@ type RequestError struct {
func (e *RequestError) Error() string {
return e.Err.Error()
}

var accessAPIConnectionErrors = []codes.Code{
codes.DeadlineExceeded,
codes.ResourceExhausted,
codes.Internal,
codes.Unavailable,
}

func IsChainConnectionError(err error) bool {
if _, ok := err.(net.Error); ok {
return true
}

if err, ok := err.(client.RPCError); ok {
// Check for Flow Access API connection errors
for _, code := range accessAPIConnectionErrors {
if err.GRPCStatus().Code() == code {
return true
}
}
}
return false
}
70 changes: 70 additions & 0 deletions errors/errors_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package errors

import (
"context"
"fmt"
"testing"

"github.com/onflow/flow-go-sdk/client"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

type testNetError struct{}

func (e *testNetError) Error() string { return "NetError" }
func (e *testNetError) Timeout() bool { return false }
func (e *testNetError) Temporary() bool { return false }

func TestIsChainConnectionError(t *testing.T) {
t.Run("error cases", func(t *testing.T) {
var netErr error = &testNetError{}

valid_errors := []error{
netErr,
client.RPCError{
GRPCErr: status.Error(codes.DeadlineExceeded, "DeadlineExceeded"),
},
client.RPCError{
GRPCErr: status.Error(codes.ResourceExhausted, "ResourceExhausted"),
},
client.RPCError{
GRPCErr: status.Error(codes.Internal, "Internal"),
},
client.RPCError{
GRPCErr: status.Error(codes.Unavailable, "Unavailable"),
},
}

invalid_errors := []error{
fmt.Errorf("not a connection error"),
}

for _, err := range valid_errors {
if !IsChainConnectionError(err) {
t.Fatalf("expected error to be a connection error, got \"%s\"", err)
}
}

for _, err := range invalid_errors {
if IsChainConnectionError(err) {
t.Fatalf("expected error not to be a connection error, got \"%s\"", err)
}
}
})

t.Run("non existent gateway", func(t *testing.T) {
fc, err := client.New("non-existent-address", grpc.WithInsecure())
if err != nil {
t.Fatal(err)
}

if _, err := fc.GetLatestBlock(context.Background(), true); err == nil {
t.Fatal("expected an error")
} else if !IsChainConnectionError(err) {
t.Fatal("expected error to be a connection error")
}
})

}
47 changes: 36 additions & 11 deletions jobs/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ func TestScheduleSendNotification(t *testing.T) {
t.Fatal(err)
}

wp.process(job)
if err := wp.process(job); err != nil {
t.Fatal(err)
}

if len(wp.jobChan) == 0 {
t.Fatal("expected job channel to contain a job")
Expand All @@ -77,7 +79,9 @@ func TestScheduleSendNotification(t *testing.T) {
t.Fatalf("expected pool to have a send_job_status job")
}

wp.process(sendNotificationJob)
if err := wp.process(sendNotificationJob); err != nil {
t.Fatal(err)
}

if !sendNotificationCalled {
t.Fatalf("expected 'sendNotificationCalled' to equal true")
Expand Down Expand Up @@ -124,8 +128,13 @@ func TestExecuteSendNotification(t *testing.T) {
t.Fatal(err)
}

wp.process(job)
wp.process(<-wp.jobChan)
if err := wp.process(job); err != nil {
t.Fatal(err)
}

if err := wp.process(<-wp.jobChan); err != nil {
t.Fatal(err)
}

if webhookJob.Type != "TestJobType" {
t.Fatalf("expected webhook endpoint to have received a notification")
Expand Down Expand Up @@ -175,8 +184,12 @@ func TestExecuteSendNotification(t *testing.T) {
t.Fatal(err)
}

wp.process(job)
wp.process(<-wp.jobChan)
if err := wp.process(job); err != nil {
t.Fatal(err)
}
if err := wp.process(<-wp.jobChan); err != nil {
t.Fatal(err)
}

if webhookJob.Type != "TestJobType" {
t.Fatalf("expected webhook endpoint to have received a notification")
Expand Down Expand Up @@ -219,7 +232,9 @@ func TestExecuteSendNotification(t *testing.T) {
t.Fatal(err)
}

wp.process(job)
if err := wp.process(job); err != nil {
t.Fatal(err)
}

if len(wp.jobChan) != 0 {
t.Errorf("did not expect a job to be queued")
Expand Down Expand Up @@ -259,9 +274,15 @@ func TestExecuteSendNotification(t *testing.T) {
t.Fatal(err)
}

wp.process(job)
if err := wp.process(job); err != nil {
t.Fatal()
}

sendNotificationJob := <-wp.jobChan
wp.process(sendNotificationJob)

if err := wp.process(sendNotificationJob); err != nil {
t.Fatal(err)
}

if len(hook.Entries) != 1 {
t.Errorf("expected there to be one warning, got %d", len(hook.Entries))
Expand Down Expand Up @@ -329,12 +350,16 @@ func TestJobErrorMessages(t *testing.T) {

// Explicitly retry to trigger n errors and a final successful execution, n = retryCount
for n := 0; n < retryCount+1; n++ {
wp.process(job)
if err := wp.process(job); err != nil {
t.Fatal(err)
}
}

// Send the notification
sendNotificationJob := <-wp.jobChan
wp.process(sendNotificationJob)
if err := wp.process(sendNotificationJob); err != nil {
t.Fatal(err)
}

// Check log entries
if len(hook.Entries) != retryCount {
Expand Down
Loading

0 comments on commit d4d172e

Please sign in to comment.