From 2bf7280f3fca36df59cbe68d683390c85ba3997b Mon Sep 17 00:00:00 2001 From: Lauri Junkkari Date: Mon, 13 Dec 2021 16:04:09 +0200 Subject: [PATCH 01/12] Make dev emulator persist its state, update emulator --- docker-compose.yml | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 4bcfd856..7d85864c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -51,17 +51,21 @@ services: - redis emulator: - image: gcr.io/flow-container-registry/emulator:v0.23.0 + image: gcr.io/flow-container-registry/emulator:0.27.2 restart: unless-stopped - command: emulator -v + command: emulator -v --persist ports: - "3569:3569" + volumes: + - emulator_persist:/flowdb env_file: - ./.env environment: - FLOW_SERVICEPRIVATEKEY=${FLOW_WALLET_ADMIN_PRIVATE_KEY} - FLOW_SERVICEKEYSIGALGO=ECDSA_P256 - FLOW_SERVICEKEYHASHALGO=SHA3_256 + - FLOW_DBPATH=/flowdb volumes: redis_data: + emulator_persist: From b106c9db8a5b6fb26ff26727790daa3a210d69b2 Mon Sep 17 00:00:00 2001 From: Lauri Junkkari Date: Mon, 13 Dec 2021 16:15:38 +0200 Subject: [PATCH 02/12] Pause system if connection refused by Flow gateway --- chain_events/listener.go | 40 +++++++++-- errors/errors.go | 7 ++ jobs/workerpool.go | 82 +++++++++++++++++----- migrations/internal/m20211213/migration.go | 35 +++++++++ migrations/migrations.go | 6 ++ system/service.go | 16 ++--- system/system.go | 15 +++- tests/system_test.go | 46 ++++++++++-- 8 files changed, 209 insertions(+), 38 deletions(-) create mode 100644 migrations/internal/m20211213/migration.go diff --git a/chain_events/listener.go b/chain_events/listener.go index 2bcea4fe..49412327 100644 --- a/chain_events/listener.go +++ b/chain_events/listener.go @@ -6,6 +6,7 @@ import ( "strings" "time" + wallet_errors "github.com/flow-hydraulics/flow-wallet-api/errors" "github.com/flow-hydraulics/flow-wallet-api/system" "github.com/onflow/flow-go-sdk" "github.com/onflow/flow-go-sdk/client" @@ -116,13 +117,24 @@ func (l *Listener) Start() *Listener { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + entry := log.WithFields(log.Fields{ + "package": "chain_events", + "function": "Listener.Start.goroutine", + }) + for { select { case <-l.done: return case <-l.ticker.C: // Check for maintenance mode - if l.waitMaintenance() { + if halted, err := l.systemHalted(); err != nil { + entry. + WithFields(log.Fields{"error": err}). + Warn("Could not get system settings from DB") + continue + } else if halted { + entry.Debug("System halted") continue } @@ -150,12 +162,23 @@ func (l *Listener) Start() *Listener { }) if err != nil { - log. + if wallet_errors.IsChainConnectionError(err) { + // Unable to connect to chain, pause system. + if l.systemService != nil { + entry.Warn("Unable to connect to chain, pausing system") + l.systemService.Pause() + } else { + entry.Warn("Unable to connect to chain") + } + continue + } + + entry. WithFields(log.Fields{"error": err}). Warn("Error while handling Flow events") if strings.Contains(err.Error(), "key not found") { - log.Warn(`"key not found" error indicates data is not available at this height, please manually set correct starting height`) + entry.Warn(`"key not found" error indicates data is not available at this height, please manually set correct starting height`) } } } @@ -202,6 +225,13 @@ func (l *Listener) Stop() { l.ticker = nil } -func (l *Listener) waitMaintenance() bool { - return l.systemService != nil && l.systemService.IsMaintenanceMode() +func (l *Listener) systemHalted() (bool, error) { + if l.systemService != nil { + s, err := l.systemService.GetSettings() + if err != nil { + return false, err + } + return s.IsMaintenanceMode() || s.IsPaused(), nil + } + return false, nil } diff --git a/errors/errors.go b/errors/errors.go index 9f4f3e65..02557c9d 100644 --- a/errors/errors.go +++ b/errors/errors.go @@ -1,6 +1,8 @@ // Package errors provides an API for errors across the application. package errors +import "strings" + type RequestError struct { StatusCode int Err error @@ -9,3 +11,8 @@ type RequestError struct { func (e *RequestError) Error() string { return e.Err.Error() } + +func IsChainConnectionError(err error) bool { + // TODO: check this properly + return strings.Contains(err.Error(), "connection refused") +} diff --git a/jobs/workerpool.go b/jobs/workerpool.go index 0c1036ed..602175cc 100644 --- a/jobs/workerpool.go +++ b/jobs/workerpool.go @@ -12,6 +12,7 @@ import ( log "github.com/sirupsen/logrus" "github.com/flow-hydraulics/flow-wallet-api/datastore" + wallet_errors "github.com/flow-hydraulics/flow-wallet-api/errors" "github.com/flow-hydraulics/flow-wallet-api/system" ) @@ -173,9 +174,11 @@ func (wp *WorkerPool) Schedule(j *Job) error { entry.Debug("Scheduling job") - if wp.waitMaintenance() { - entry.Debug("In maintenance mode") - // In maintenance mode; prevent immediate scheduling and let dbScheduler handle this job + if halted, err := wp.systemHalted(); err != nil { + return fmt.Errorf("error while getting system settings: %w", err) + } else if halted { + // System halted; prevent immediate scheduling and let dbScheduler handle this job + entry.Debug("System halted") return nil } @@ -227,8 +230,15 @@ func (wp *WorkerPool) accept(job *Job) bool { return true } -func (wp *WorkerPool) waitMaintenance() bool { - return wp.systemService != nil && wp.systemService.IsMaintenanceMode() +func (wp *WorkerPool) systemHalted() (bool, error) { + if wp.systemService != nil { + s, err := wp.systemService.GetSettings() + if err != nil { + return false, err + } + return s.IsMaintenanceMode() || s.IsPaused(), nil + } + return false, nil } func (wp *WorkerPool) startDBJobScheduler() { @@ -243,8 +253,13 @@ func (wp *WorkerPool) startDBJobScheduler() { break jobPoolLoop } - // Check for maintenance mode - if wp.waitMaintenance() { + if halted, err := wp.systemHalted(); err != nil { + wp.logger. + WithFields(log.Fields{"error": err}). + Warn("Could not get system settings from DB") + restTime = wp.dbJobPollInterval + continue + } else if halted { restTime = wp.dbJobPollInterval continue } @@ -280,7 +295,27 @@ func (wp *WorkerPool) startWorkers() { break } - wp.process(job) + if err := wp.process(job); err != nil { + // Handle critical processing errors + + entry := job.logEntry(wp.logger.WithFields(log.Fields{ + "package": "jobs", + "function": "WorkerPool.startWorkers.goroutine", + "error": err, + })) + + if wallet_errors.IsChainConnectionError(err) { + if wp.systemService != nil { + entry.Warn("Unable to connect to chain, pausing system") + // Unable to connect to chain, pause system. + wp.systemService.Pause() + } else { + entry.Warn("Unable to connect to chain") + } + } else { + entry.Warn("Critical error while processing job") + } + } } }() } @@ -306,7 +341,7 @@ func (wp *WorkerPool) tryEnqueue(job *Job, block bool) bool { } } -func (wp *WorkerPool) process(job *Job) { +func (wp *WorkerPool) process(job *Job) error { entry := job.logEntry(wp.logger.WithFields(log.Fields{ "package": "jobs", "function": "WorkerPool.process", @@ -314,42 +349,49 @@ func (wp *WorkerPool) process(job *Job) { if !wp.accept(job) { entry.Info("Failed to accept job") - return + return nil } executor, exists := wp.executors[job.Type] if !exists { entry.Warn("Could not process job, no registered executor for type") + job.State = NoAvailableWorkers + if err := wp.store.UpdateJob(job); err != nil { - entry. - WithFields(log.Fields{"error": err}). - Warn("Could not update DB entry for job") + return fmt.Errorf("error while updating database entry: %w", err) } - return + + return nil } - err := executor(wp.context, job) - if err != nil { + if err := executor(wp.context, job); err != nil { + // Check for chain connection errors + if wallet_errors.IsChainConnectionError(err) { + // Stop processing this job any further, returning it to the pool. + return err + } + if job.ExecCount > wp.maxJobErrorCount || errors.Is(err, ErrPermanentFailure) { job.State = Failed } else { job.State = Error } + job.Error = err.Error() job.Errors = append(job.Errors, err.Error()) + entry. WithFields(log.Fields{"error": err}). Warn("Job execution resulted with error") + } else { job.State = Complete job.Error = "" // Clear the error message for the final & successful execution } if err := wp.store.UpdateJob(job); err != nil { - entry. - WithFields(log.Fields{"error": err}). - Warn("Could not update DB entry for job") + return fmt.Errorf("error while updating database entry: %w", err) } if (job.State == Failed || job.State == Complete) && job.ShouldSendNotification && wp.notificationConfig.ShouldSendJobStatus() { @@ -359,6 +401,8 @@ func (wp *WorkerPool) process(job *Job) { Warn("Could not schedule a status update notification for job") } } + + return nil } func (wp *WorkerPool) executeSendJobStatus(ctx context.Context, j *Job) error { diff --git a/migrations/internal/m20211213/migration.go b/migrations/internal/m20211213/migration.go new file mode 100644 index 00000000..8e8a685b --- /dev/null +++ b/migrations/internal/m20211213/migration.go @@ -0,0 +1,35 @@ +package m20211213 + +import ( + "database/sql" + + "gorm.io/gorm" +) + +const ID = "m20211213" + +type Settings struct { + gorm.Model + MaintenanceMode bool `gorm:"column:maintenance_mode;default:false"` + PausedSince sql.NullTime `gorm:"column:paused_since"` +} + +func (Settings) TableName() string { + return "system_settings" +} + +func Migrate(tx *gorm.DB) error { + if err := tx.AutoMigrate(&Settings{}); err != nil { + return err + } + + return nil +} + +func Rollback(tx *gorm.DB) error { + if err := tx.Migrator().DropColumn(&Settings{}, "paused_since"); err != nil { + return err + } + + return nil +} diff --git a/migrations/migrations.go b/migrations/migrations.go index 9484539c..83394c93 100644 --- a/migrations/migrations.go +++ b/migrations/migrations.go @@ -7,6 +7,7 @@ import ( "github.com/flow-hydraulics/flow-wallet-api/migrations/internal/m20211118" "github.com/flow-hydraulics/flow-wallet-api/migrations/internal/m20211130" "github.com/flow-hydraulics/flow-wallet-api/migrations/internal/m20211202" + "github.com/flow-hydraulics/flow-wallet-api/migrations/internal/m20211213" "github.com/flow-hydraulics/flow-wallet-api/migrations/internal/m20211220" "github.com/flow-hydraulics/flow-wallet-api/migrations/internal/m20211221_1" "github.com/go-gormigrate/gormigrate/v2" @@ -44,6 +45,11 @@ func List() []*gormigrate.Migration { Migrate: m20211202.Migrate, Rollback: m20211202.Rollback, }, + { + ID: m20211213.ID, + Migrate: m20211213.Migrate, + Rollback: m20211213.Rollback, + }, { ID: m20211220.ID, Migrate: m20211220.Migrate, diff --git a/system/service.go b/system/service.go index de5cf052..eeb89ee9 100644 --- a/system/service.go +++ b/system/service.go @@ -1,7 +1,9 @@ package system import ( + "database/sql" "fmt" + "time" log "github.com/sirupsen/logrus" ) @@ -26,16 +28,12 @@ func (svc *Service) SaveSettings(settings *Settings) error { return svc.store.SaveSettings(settings) } -func (svc *Service) IsMaintenanceMode() bool { +func (svc *Service) Pause() error { + log.Trace("Pause system") settings, err := svc.GetSettings() if err != nil { - log. - WithFields(log.Fields{ - "error": err, - "package": "system", - "function": "IsMaintenanceMode", - }). - Warn("Error while getting system settings") + return err } - return err == nil && settings.MaintenanceMode + settings.PausedSince = sql.NullTime{Time: time.Now(), Valid: true} + return svc.SaveSettings(settings) } diff --git a/system/system.go b/system/system.go index db622464..aa14d91a 100644 --- a/system/system.go +++ b/system/system.go @@ -1,14 +1,19 @@ package system import ( + "database/sql" "fmt" + "time" "gorm.io/gorm" ) +const pauseDuration = time.Minute + type Settings struct { gorm.Model - MaintenanceMode bool `gorm:"column:maintenance_mode;default:false"` + MaintenanceMode bool `gorm:"column:maintenance_mode;default:false"` + PausedSince sql.NullTime `gorm:"column:paused_since"` } func (s *Settings) String() string { @@ -26,6 +31,14 @@ func (s *Settings) ToJSON() SettingsJSON { } } +func (s *Settings) IsMaintenanceMode() bool { + return s.MaintenanceMode +} + +func (s *Settings) IsPaused() bool { + return s.PausedSince.Valid && s.PausedSince.Time.After(time.Now().Add(-pauseDuration)) +} + // Update fields according to JSON version func (s *Settings) FromJSON(j SettingsJSON) { s.MaintenanceMode = j.MaintenanceMode diff --git a/tests/system_test.go b/tests/system_test.go index 646c9c6b..e18b713b 100644 --- a/tests/system_test.go +++ b/tests/system_test.go @@ -2,11 +2,13 @@ package tests import ( "bytes" + "database/sql" "io" "io/ioutil" "net/http" "strings" "testing" + "time" "github.com/flow-hydraulics/flow-wallet-api/handlers" "github.com/flow-hydraulics/flow-wallet-api/tests/internal/test" @@ -72,22 +74,58 @@ func TestIsMaintenanceMode(t *testing.T) { sysService := svcs.GetSystem() - if sysService.IsMaintenanceMode() { + settings, err := sysService.GetSettings() + if err != nil { + t.Fatal(err) + } + + if settings.IsMaintenanceMode() { t.Error("expected system not to be in maintenance mode") } + settings.MaintenanceMode = true + + if err := sysService.SaveSettings(settings); err != nil { + t.Fatal(err) + } + + settings, err = sysService.GetSettings() + if err != nil { + t.Fatal(err) + } + + if !settings.IsMaintenanceMode() { + t.Error("expected system to be in maintenance mode") + } +} + +func TestIsPaused(t *testing.T) { + cfg := test.LoadConfig(t, testConfigPath) + svcs := test.GetServices(t, cfg) + + sysService := svcs.GetSystem() + settings, err := sysService.GetSettings() if err != nil { t.Fatal(err) } - settings.MaintenanceMode = true + if settings.IsPaused() { + t.Error("expected system not to be paused") + } + + settings.PausedSince = sql.NullTime{Time: time.Now(), Valid: true} if err := sysService.SaveSettings(settings); err != nil { t.Fatal(err) } - if !sysService.IsMaintenanceMode() { - t.Error("expected system to be in maintenance mode") + settings, err = sysService.GetSettings() + if err != nil { + t.Fatal(err) + } + + if !settings.IsPaused() { + t.Error("expected system to be paused") } } From 9ee9006a72fe3931e8c90e2b8c65e99025169760 Mon Sep 17 00:00:00 2001 From: Lauri Junkkari Date: Mon, 13 Dec 2021 16:40:34 +0200 Subject: [PATCH 03/12] Fix unhandled errors --- chain_events/listener.go | 6 +++++- jobs/jobs_test.go | 39 ++++++++++++++++++++++++++++++--------- jobs/workerpool.go | 6 +++++- 3 files changed, 40 insertions(+), 11 deletions(-) diff --git a/chain_events/listener.go b/chain_events/listener.go index 49412327..469dd4a4 100644 --- a/chain_events/listener.go +++ b/chain_events/listener.go @@ -166,7 +166,11 @@ func (l *Listener) Start() *Listener { // Unable to connect to chain, pause system. if l.systemService != nil { entry.Warn("Unable to connect to chain, pausing system") - l.systemService.Pause() + if err := l.systemService.Pause(); err != nil { + entry. + WithFields(log.Fields{"error": err}). + Warn("Unable to pause system") + } } else { entry.Warn("Unable to connect to chain") } diff --git a/jobs/jobs_test.go b/jobs/jobs_test.go index 62a9e733..4aa79d4a 100644 --- a/jobs/jobs_test.go +++ b/jobs/jobs_test.go @@ -65,7 +65,9 @@ func TestScheduleSendNotification(t *testing.T) { t.Fatal(err) } - wp.process(job) + if err := wp.process(job); err != nil { + t.Fatal(err) + } if len(wp.jobChan) == 0 { t.Fatal("expected job channel to contain a job") @@ -77,7 +79,9 @@ func TestScheduleSendNotification(t *testing.T) { t.Fatalf("expected pool to have a send_job_status job") } - wp.process(sendNotificationJob) + if err := wp.process(sendNotificationJob); err != nil { + t.Fatal(err) + } if !sendNotificationCalled { t.Fatalf("expected 'sendNotificationCalled' to equal true") @@ -124,8 +128,13 @@ func TestExecuteSendNotification(t *testing.T) { t.Fatal(err) } - wp.process(job) - wp.process(<-wp.jobChan) + if err := wp.process(job); err != nil { + t.Fatal(err) + } + + if err := wp.process(<-wp.jobChan); err != nil { + t.Fatal(err) + } if webhookJob.Type != "TestJobType" { t.Fatalf("expected webhook endpoint to have received a notification") @@ -175,8 +184,12 @@ func TestExecuteSendNotification(t *testing.T) { t.Fatal(err) } - wp.process(job) - wp.process(<-wp.jobChan) + if err := wp.process(job); err != nil { + t.Fatal(err) + } + if err := wp.process(<-wp.jobChan); err != nil { + t.Fatal(err) + } if webhookJob.Type != "TestJobType" { t.Fatalf("expected webhook endpoint to have received a notification") @@ -219,7 +232,9 @@ func TestExecuteSendNotification(t *testing.T) { t.Fatal(err) } - wp.process(job) + if err := wp.process(job); err != nil { + t.Fatal(err) + } if len(wp.jobChan) != 0 { t.Errorf("did not expect a job to be queued") @@ -259,9 +274,15 @@ func TestExecuteSendNotification(t *testing.T) { t.Fatal(err) } - wp.process(job) + if err := wp.process(job); err != nil { + t.Fatal() + } + sendNotificationJob := <-wp.jobChan - wp.process(sendNotificationJob) + + if err := wp.process(sendNotificationJob); err != nil { + t.Fatal(err) + } if len(hook.Entries) != 1 { t.Errorf("expected there to be one warning, got %d", len(hook.Entries)) diff --git a/jobs/workerpool.go b/jobs/workerpool.go index 602175cc..2b60ea06 100644 --- a/jobs/workerpool.go +++ b/jobs/workerpool.go @@ -308,7 +308,11 @@ func (wp *WorkerPool) startWorkers() { if wp.systemService != nil { entry.Warn("Unable to connect to chain, pausing system") // Unable to connect to chain, pause system. - wp.systemService.Pause() + if err := wp.systemService.Pause(); err != nil { + entry. + WithFields(log.Fields{"error": err}). + Warn("Unable to pause system") + } } else { entry.Warn("Unable to connect to chain") } From 7fa99b7cf1fd375ccab8e45a8ef23fcfb570ed5d Mon Sep 17 00:00:00 2001 From: Lauri Junkkari Date: Tue, 14 Dec 2021 13:15:30 +0200 Subject: [PATCH 04/12] More elaborate connection error definition --- errors/errors.go | 22 +++++++++++++++++++--- errors/errors_test.go | 22 ++++++++++++++++++++++ 2 files changed, 41 insertions(+), 3 deletions(-) create mode 100644 errors/errors_test.go diff --git a/errors/errors.go b/errors/errors.go index 02557c9d..777c422b 100644 --- a/errors/errors.go +++ b/errors/errors.go @@ -1,7 +1,10 @@ // Package errors provides an API for errors across the application. package errors -import "strings" +import ( + "github.com/onflow/flow-go-sdk/client" + "google.golang.org/grpc/codes" +) type RequestError struct { StatusCode int @@ -12,7 +15,20 @@ func (e *RequestError) Error() string { return e.Err.Error() } +var accessAPIConnectionErrors = []codes.Code{ + codes.ResourceExhausted, + codes.Internal, + codes.Unavailable, +} + func IsChainConnectionError(err error) bool { - // TODO: check this properly - return strings.Contains(err.Error(), "connection refused") + if err, ok := err.(client.RPCError); ok { + // Check for Flow Access API connection errors + for _, code := range accessAPIConnectionErrors { + if err.GRPCStatus().Code() == code { + return true + } + } + } + return false } diff --git a/errors/errors_test.go b/errors/errors_test.go new file mode 100644 index 00000000..3d69f6d0 --- /dev/null +++ b/errors/errors_test.go @@ -0,0 +1,22 @@ +package errors + +import ( + "context" + "testing" + + "github.com/onflow/flow-go-sdk/client" + "google.golang.org/grpc" +) + +func TestIsChainConnectionError(t *testing.T) { + fc, err := client.New("non-existent-address", grpc.WithInsecure()) + if err != nil { + t.Fatal(err) + } + + if _, err := fc.GetLatestBlock(context.Background(), true); err == nil { + t.Fatal("expected an error") + } else if !IsChainConnectionError(err) { + t.Fatal("expected error to be a connection error") + } +} From d45f0ee9ffe1fd2d004588ef408a2e7a8f322031 Mon Sep 17 00:00:00 2001 From: Lauri Junkkari Date: Wed, 15 Dec 2021 13:49:29 +0200 Subject: [PATCH 05/12] Add net.Error --- errors/errors.go | 7 +++++ errors/errors_test.go | 68 ++++++++++++++++++++++++++++++++++++------- 2 files changed, 65 insertions(+), 10 deletions(-) diff --git a/errors/errors.go b/errors/errors.go index 777c422b..e441bec8 100644 --- a/errors/errors.go +++ b/errors/errors.go @@ -2,6 +2,8 @@ package errors import ( + "net" + "github.com/onflow/flow-go-sdk/client" "google.golang.org/grpc/codes" ) @@ -16,12 +18,17 @@ func (e *RequestError) Error() string { } var accessAPIConnectionErrors = []codes.Code{ + codes.DeadlineExceeded, codes.ResourceExhausted, codes.Internal, codes.Unavailable, } func IsChainConnectionError(err error) bool { + if _, ok := err.(net.Error); ok { + return true + } + if err, ok := err.(client.RPCError); ok { // Check for Flow Access API connection errors for _, code := range accessAPIConnectionErrors { diff --git a/errors/errors_test.go b/errors/errors_test.go index 3d69f6d0..677720f4 100644 --- a/errors/errors_test.go +++ b/errors/errors_test.go @@ -2,21 +2,69 @@ package errors import ( "context" + "fmt" "testing" "github.com/onflow/flow-go-sdk/client" "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) +type testNetError struct{} + +func (e *testNetError) Error() string { return "NetError" } +func (e *testNetError) Timeout() bool { return false } +func (e *testNetError) Temporary() bool { return false } + func TestIsChainConnectionError(t *testing.T) { - fc, err := client.New("non-existent-address", grpc.WithInsecure()) - if err != nil { - t.Fatal(err) - } - - if _, err := fc.GetLatestBlock(context.Background(), true); err == nil { - t.Fatal("expected an error") - } else if !IsChainConnectionError(err) { - t.Fatal("expected error to be a connection error") - } + t.Run("error cases", func(t *testing.T) { + var netErr error = &testNetError{} + + valid_errors := []error{ + netErr, + client.RPCError{ + GRPCErr: status.Error(codes.DeadlineExceeded, "DeadlineExceeded"), + }, + client.RPCError{ + GRPCErr: status.Error(codes.ResourceExhausted, "ResourceExhausted"), + }, + client.RPCError{ + GRPCErr: status.Error(codes.Internal, "Internal"), + }, + client.RPCError{ + GRPCErr: status.Error(codes.Unavailable, "Unavailable"), + }, + } + + invalid_errors := []error{ + fmt.Errorf("not a connection error"), + } + + for _, err := range valid_errors { + if !IsChainConnectionError(err) { + t.Fatalf("expected error to be a connection error, got \"%s\"", err) + } + } + + for _, err := range invalid_errors { + if IsChainConnectionError(err) { + t.Fatalf("expected error not to be a connection error, got \"%s\"", err) + } + } + }) + + t.Run("non existent gateway", func(t *testing.T) { + fc, err := client.New("non-existent-address", grpc.WithInsecure()) + if err != nil { + t.Fatal(err) + } + + if _, err := fc.GetLatestBlock(context.Background(), true); err == nil { + t.Fatal("expected an error") + } else if !IsChainConnectionError(err) { + t.Fatal("expected error to be a connection error") + } + }) + } From a659a6628b7eb12722f52c2af25c03b888ff9fe2 Mon Sep 17 00:00:00 2001 From: Lauri Junkkari Date: Wed, 15 Dec 2021 14:17:19 +0200 Subject: [PATCH 06/12] Ping the emulator in tests to fail early --- tests/internal/test/flow.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/internal/test/flow.go b/tests/internal/test/flow.go index dda33698..02a0449e 100644 --- a/tests/internal/test/flow.go +++ b/tests/internal/test/flow.go @@ -29,6 +29,10 @@ func NewFlowClient(t *testing.T, cfg *configs.Config) *client.Client { t.Fatal(err) } + if err := fc.Ping(context.Background()); err != nil { + t.Fatal(err) + } + close := func() { err := fc.Close() if err != nil { From 40a3b85a13aa2c581fa80a2a5a8612ac1b8e11e2 Mon Sep 17 00:00:00 2001 From: Lauri Junkkari Date: Wed, 15 Dec 2021 14:17:59 +0200 Subject: [PATCH 07/12] Allow configuration of pause duration --- chain_events/listener.go | 6 +----- jobs/workerpool.go | 6 +----- system/options.go | 13 +++++++++++++ system/service.go | 27 ++++++++++++++++++++++++--- system/system.go | 4 +--- tests/system_test.go | 23 +++++++++++++++++++++-- 6 files changed, 61 insertions(+), 18 deletions(-) create mode 100644 system/options.go diff --git a/chain_events/listener.go b/chain_events/listener.go index 469dd4a4..95da9d8f 100644 --- a/chain_events/listener.go +++ b/chain_events/listener.go @@ -231,11 +231,7 @@ func (l *Listener) Stop() { func (l *Listener) systemHalted() (bool, error) { if l.systemService != nil { - s, err := l.systemService.GetSettings() - if err != nil { - return false, err - } - return s.IsMaintenanceMode() || s.IsPaused(), nil + return l.systemService.IsHalted() } return false, nil } diff --git a/jobs/workerpool.go b/jobs/workerpool.go index 2b60ea06..20f7042f 100644 --- a/jobs/workerpool.go +++ b/jobs/workerpool.go @@ -232,11 +232,7 @@ func (wp *WorkerPool) accept(job *Job) bool { func (wp *WorkerPool) systemHalted() (bool, error) { if wp.systemService != nil { - s, err := wp.systemService.GetSettings() - if err != nil { - return false, err - } - return s.IsMaintenanceMode() || s.IsPaused(), nil + return wp.systemService.IsHalted() } return false, nil } diff --git a/system/options.go b/system/options.go new file mode 100644 index 00000000..d6a4820f --- /dev/null +++ b/system/options.go @@ -0,0 +1,13 @@ +package system + +import ( + "time" +) + +type ServiceOption func(*Service) + +func WithPauseDuration(duration time.Duration) ServiceOption { + return func(svc *Service) { + svc.pauseDuration = duration + } +} diff --git a/system/service.go b/system/service.go index eeb89ee9..bcfb8990 100644 --- a/system/service.go +++ b/system/service.go @@ -9,11 +9,24 @@ import ( ) type Service struct { - store Store + store Store + pauseDuration time.Duration } -func NewService(store Store) *Service { - return &Service{store} +const defaultPauseDuration = time.Minute + +func NewService(store Store, opts ...ServiceOption) *Service { + svc := &Service{ + store: store, + pauseDuration: defaultPauseDuration, + } + + // Go through options + for _, opt := range opts { + opt(svc) + } + + return svc } func (svc *Service) GetSettings() (*Settings, error) { @@ -37,3 +50,11 @@ func (svc *Service) Pause() error { settings.PausedSince = sql.NullTime{Time: time.Now(), Valid: true} return svc.SaveSettings(settings) } + +func (svc *Service) IsHalted() (bool, error) { + s, err := svc.GetSettings() + if err != nil { + return false, err + } + return s.IsMaintenanceMode() || s.IsPaused(svc.pauseDuration), nil +} diff --git a/system/system.go b/system/system.go index aa14d91a..31d37bd8 100644 --- a/system/system.go +++ b/system/system.go @@ -8,8 +8,6 @@ import ( "gorm.io/gorm" ) -const pauseDuration = time.Minute - type Settings struct { gorm.Model MaintenanceMode bool `gorm:"column:maintenance_mode;default:false"` @@ -35,7 +33,7 @@ func (s *Settings) IsMaintenanceMode() bool { return s.MaintenanceMode } -func (s *Settings) IsPaused() bool { +func (s *Settings) IsPaused(pauseDuration time.Duration) bool { return s.PausedSince.Valid && s.PausedSince.Time.After(time.Now().Add(-pauseDuration)) } diff --git a/tests/system_test.go b/tests/system_test.go index e18b713b..a925b017 100644 --- a/tests/system_test.go +++ b/tests/system_test.go @@ -110,7 +110,7 @@ func TestIsPaused(t *testing.T) { t.Fatal(err) } - if settings.IsPaused() { + if settings.IsPaused(time.Minute) { t.Error("expected system not to be paused") } @@ -125,7 +125,26 @@ func TestIsPaused(t *testing.T) { t.Fatal(err) } - if !settings.IsPaused() { + if !settings.IsPaused(time.Minute) { t.Error("expected system to be paused") } } + +func TestPausing(t *testing.T) { + cfg := test.LoadConfig(t, testConfigPath) + svcs := test.GetServices(t, cfg) + + sysService := svcs.GetSystem() + + if halted, err := sysService.IsHalted(); err != nil || halted { + t.Error("expected system not to be halted") + } + + if err := sysService.Pause(); err != nil { + t.Fatal(err) + } + + if halted, err := sysService.IsHalted(); err != nil || !halted { + t.Error("expected system to be halted") + } +} From 1acaa6b4b92189f7d700051ce0423fb1bacdf24a Mon Sep 17 00:00:00 2001 From: Lauri Junkkari Date: Wed, 15 Dec 2021 15:03:36 +0200 Subject: [PATCH 08/12] Ping Flow client in main_test --- main_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/main_test.go b/main_test.go index 925a9588..bd5a8cfe 100644 --- a/main_test.go +++ b/main_test.go @@ -146,6 +146,9 @@ func getTestApp(t *testing.T, cfg *configs.Config, ignoreLeaks bool) TestApp { fc, err := client.New(cfg.AccessAPIHost, grpc.WithInsecure()) fatal(t, err) + + fatal(t, fc.Ping(context.Background())) + t.Cleanup(func() { fc.Close() }) From 3cb383ffb1a88c0e56a4fc083e4e98373245b873 Mon Sep 17 00:00:00 2001 From: Lauri Junkkari Date: Tue, 21 Dec 2021 12:37:43 +0200 Subject: [PATCH 09/12] Fix migrations --- migrations/internal/m20211130/migration.go | 1 + .../internal/{m20211213 => m20211221_2}/migration.go | 4 ++-- migrations/migrations.go | 12 ++++++------ 3 files changed, 9 insertions(+), 8 deletions(-) rename migrations/internal/{m20211213 => m20211221_2}/migration.go (92%) diff --git a/migrations/internal/m20211130/migration.go b/migrations/internal/m20211130/migration.go index 1882bd4c..ec0b2749 100644 --- a/migrations/internal/m20211130/migration.go +++ b/migrations/internal/m20211130/migration.go @@ -4,6 +4,7 @@ import ( "gorm.io/gorm" ) +// Note: there is an 'm' here, it is a typo but it should not be removed const ID = "m20211130" type Settings struct { diff --git a/migrations/internal/m20211213/migration.go b/migrations/internal/m20211221_2/migration.go similarity index 92% rename from migrations/internal/m20211213/migration.go rename to migrations/internal/m20211221_2/migration.go index 8e8a685b..c7e8b16c 100644 --- a/migrations/internal/m20211213/migration.go +++ b/migrations/internal/m20211221_2/migration.go @@ -1,4 +1,4 @@ -package m20211213 +package m20211221_2 import ( "database/sql" @@ -6,7 +6,7 @@ import ( "gorm.io/gorm" ) -const ID = "m20211213" +const ID = "20211221_2" type Settings struct { gorm.Model diff --git a/migrations/migrations.go b/migrations/migrations.go index 83394c93..3db6f938 100644 --- a/migrations/migrations.go +++ b/migrations/migrations.go @@ -7,9 +7,9 @@ import ( "github.com/flow-hydraulics/flow-wallet-api/migrations/internal/m20211118" "github.com/flow-hydraulics/flow-wallet-api/migrations/internal/m20211130" "github.com/flow-hydraulics/flow-wallet-api/migrations/internal/m20211202" - "github.com/flow-hydraulics/flow-wallet-api/migrations/internal/m20211213" "github.com/flow-hydraulics/flow-wallet-api/migrations/internal/m20211220" "github.com/flow-hydraulics/flow-wallet-api/migrations/internal/m20211221_1" + "github.com/flow-hydraulics/flow-wallet-api/migrations/internal/m20211221_2" "github.com/go-gormigrate/gormigrate/v2" ) @@ -45,11 +45,6 @@ func List() []*gormigrate.Migration { Migrate: m20211202.Migrate, Rollback: m20211202.Rollback, }, - { - ID: m20211213.ID, - Migrate: m20211213.Migrate, - Rollback: m20211213.Rollback, - }, { ID: m20211220.ID, Migrate: m20211220.Migrate, @@ -60,6 +55,11 @@ func List() []*gormigrate.Migration { Migrate: m20211221_1.Migrate, Rollback: m20211221_1.Rollback, }, + { + ID: m20211221_2.ID, + Migrate: m20211221_2.Migrate, + Rollback: m20211221_2.Rollback, + }, } return ms } From f2d118b71518685b1abb45ce1f1c3e95c1ce8404 Mon Sep 17 00:00:00 2001 From: Lauri Junkkari Date: Tue, 21 Dec 2021 13:16:08 +0200 Subject: [PATCH 10/12] Add migration comment [skip ci] --- migrations/internal/m20211221_2/migration.go | 1 + 1 file changed, 1 insertion(+) diff --git a/migrations/internal/m20211221_2/migration.go b/migrations/internal/m20211221_2/migration.go index c7e8b16c..07cc2636 100644 --- a/migrations/internal/m20211221_2/migration.go +++ b/migrations/internal/m20211221_2/migration.go @@ -1,3 +1,4 @@ +// m20211221_2 handles Settings.PausedSince migration package m20211221_2 import ( From f9517ffdc74aed427ce0c9a5332f502fee58bfde Mon Sep 17 00:00:00 2001 From: Lauri Junkkari Date: Tue, 21 Dec 2021 13:24:57 +0200 Subject: [PATCH 11/12] Handle wp.process errors --- jobs/jobs_test.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/jobs/jobs_test.go b/jobs/jobs_test.go index 4aa79d4a..4d89abb9 100644 --- a/jobs/jobs_test.go +++ b/jobs/jobs_test.go @@ -350,12 +350,16 @@ func TestJobErrorMessages(t *testing.T) { // Explicitly retry to trigger n errors and a final successful execution, n = retryCount for n := 0; n < retryCount+1; n++ { - wp.process(job) + if err := wp.process(job); err != nil { + t.Fatal(err) + } } // Send the notification sendNotificationJob := <-wp.jobChan - wp.process(sendNotificationJob) + if err := wp.process(sendNotificationJob); err != nil { + t.Fatal(err) + } // Check log entries if len(hook.Entries) != retryCount { From 6767dce33161d2dc4019f7504fc81392f7a16fdc Mon Sep 17 00:00:00 2001 From: Lauri Junkkari Date: Tue, 21 Dec 2021 13:38:42 +0200 Subject: [PATCH 12/12] Fix logic in CheckAdminProposalKeyCount --- keys/basic/keys.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/keys/basic/keys.go b/keys/basic/keys.go index dbbdf61b..9fb8a7fa 100644 --- a/keys/basic/keys.go +++ b/keys/basic/keys.go @@ -78,7 +78,7 @@ func (s *KeyManager) CheckAdminProposalKeyCount(ctx context.Context) error { } } - if onChainCount != int(s.cfg.AdminProposalKeyCount) { + if onChainCount < int(s.cfg.AdminProposalKeyCount) { return fmt.Errorf( "configured: %d, onchain: %d, %w", s.cfg.AdminProposalKeyCount, @@ -89,7 +89,7 @@ func (s *KeyManager) CheckAdminProposalKeyCount(ctx context.Context) error { if inDBCount, err := s.store.ProposalKeyCount(); err != nil { return fmt.Errorf("error while fetching admin proposal key count from database: %w", err) - } else if inDBCount != int64(s.cfg.AdminProposalKeyCount) { + } else if inDBCount < int64(s.cfg.AdminProposalKeyCount) { return fmt.Errorf( "configured: %d, in database: %d, %w", s.cfg.AdminProposalKeyCount,