diff --git a/chain_events/listener.go b/chain_events/listener.go index 2bcea4fe..95da9d8f 100644 --- a/chain_events/listener.go +++ b/chain_events/listener.go @@ -6,6 +6,7 @@ import ( "strings" "time" + wallet_errors "github.com/flow-hydraulics/flow-wallet-api/errors" "github.com/flow-hydraulics/flow-wallet-api/system" "github.com/onflow/flow-go-sdk" "github.com/onflow/flow-go-sdk/client" @@ -116,13 +117,24 @@ func (l *Listener) Start() *Listener { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + entry := log.WithFields(log.Fields{ + "package": "chain_events", + "function": "Listener.Start.goroutine", + }) + for { select { case <-l.done: return case <-l.ticker.C: // Check for maintenance mode - if l.waitMaintenance() { + if halted, err := l.systemHalted(); err != nil { + entry. + WithFields(log.Fields{"error": err}). + Warn("Could not get system settings from DB") + continue + } else if halted { + entry.Debug("System halted") continue } @@ -150,12 +162,27 @@ func (l *Listener) Start() *Listener { }) if err != nil { - log. + if wallet_errors.IsChainConnectionError(err) { + // Unable to connect to chain, pause system. + if l.systemService != nil { + entry.Warn("Unable to connect to chain, pausing system") + if err := l.systemService.Pause(); err != nil { + entry. + WithFields(log.Fields{"error": err}). + Warn("Unable to pause system") + } + } else { + entry.Warn("Unable to connect to chain") + } + continue + } + + entry. WithFields(log.Fields{"error": err}). Warn("Error while handling Flow events") if strings.Contains(err.Error(), "key not found") { - log.Warn(`"key not found" error indicates data is not available at this height, please manually set correct starting height`) + entry.Warn(`"key not found" error indicates data is not available at this height, please manually set correct starting height`) } } } @@ -202,6 +229,9 @@ func (l *Listener) Stop() { l.ticker = nil } -func (l *Listener) waitMaintenance() bool { - return l.systemService != nil && l.systemService.IsMaintenanceMode() +func (l *Listener) systemHalted() (bool, error) { + if l.systemService != nil { + return l.systemService.IsHalted() + } + return false, nil } diff --git a/docker-compose.yml b/docker-compose.yml index 4bcfd856..7d85864c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -51,17 +51,21 @@ services: - redis emulator: - image: gcr.io/flow-container-registry/emulator:v0.23.0 + image: gcr.io/flow-container-registry/emulator:0.27.2 restart: unless-stopped - command: emulator -v + command: emulator -v --persist ports: - "3569:3569" + volumes: + - emulator_persist:/flowdb env_file: - ./.env environment: - FLOW_SERVICEPRIVATEKEY=${FLOW_WALLET_ADMIN_PRIVATE_KEY} - FLOW_SERVICEKEYSIGALGO=ECDSA_P256 - FLOW_SERVICEKEYHASHALGO=SHA3_256 + - FLOW_DBPATH=/flowdb volumes: redis_data: + emulator_persist: diff --git a/errors/errors.go b/errors/errors.go index 9f4f3e65..e441bec8 100644 --- a/errors/errors.go +++ b/errors/errors.go @@ -1,6 +1,13 @@ // Package errors provides an API for errors across the application. package errors +import ( + "net" + + "github.com/onflow/flow-go-sdk/client" + "google.golang.org/grpc/codes" +) + type RequestError struct { StatusCode int Err error @@ -9,3 +16,26 @@ type RequestError struct { func (e *RequestError) Error() string { return e.Err.Error() } + +var accessAPIConnectionErrors = []codes.Code{ + codes.DeadlineExceeded, + codes.ResourceExhausted, + codes.Internal, + codes.Unavailable, +} + +func IsChainConnectionError(err error) bool { + if _, ok := err.(net.Error); ok { + return true + } + + if err, ok := err.(client.RPCError); ok { + // Check for Flow Access API connection errors + for _, code := range accessAPIConnectionErrors { + if err.GRPCStatus().Code() == code { + return true + } + } + } + return false +} diff --git a/errors/errors_test.go b/errors/errors_test.go new file mode 100644 index 00000000..677720f4 --- /dev/null +++ b/errors/errors_test.go @@ -0,0 +1,70 @@ +package errors + +import ( + "context" + "fmt" + "testing" + + "github.com/onflow/flow-go-sdk/client" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +type testNetError struct{} + +func (e *testNetError) Error() string { return "NetError" } +func (e *testNetError) Timeout() bool { return false } +func (e *testNetError) Temporary() bool { return false } + +func TestIsChainConnectionError(t *testing.T) { + t.Run("error cases", func(t *testing.T) { + var netErr error = &testNetError{} + + valid_errors := []error{ + netErr, + client.RPCError{ + GRPCErr: status.Error(codes.DeadlineExceeded, "DeadlineExceeded"), + }, + client.RPCError{ + GRPCErr: status.Error(codes.ResourceExhausted, "ResourceExhausted"), + }, + client.RPCError{ + GRPCErr: status.Error(codes.Internal, "Internal"), + }, + client.RPCError{ + GRPCErr: status.Error(codes.Unavailable, "Unavailable"), + }, + } + + invalid_errors := []error{ + fmt.Errorf("not a connection error"), + } + + for _, err := range valid_errors { + if !IsChainConnectionError(err) { + t.Fatalf("expected error to be a connection error, got \"%s\"", err) + } + } + + for _, err := range invalid_errors { + if IsChainConnectionError(err) { + t.Fatalf("expected error not to be a connection error, got \"%s\"", err) + } + } + }) + + t.Run("non existent gateway", func(t *testing.T) { + fc, err := client.New("non-existent-address", grpc.WithInsecure()) + if err != nil { + t.Fatal(err) + } + + if _, err := fc.GetLatestBlock(context.Background(), true); err == nil { + t.Fatal("expected an error") + } else if !IsChainConnectionError(err) { + t.Fatal("expected error to be a connection error") + } + }) + +} diff --git a/jobs/jobs_test.go b/jobs/jobs_test.go index 62a9e733..4d89abb9 100644 --- a/jobs/jobs_test.go +++ b/jobs/jobs_test.go @@ -65,7 +65,9 @@ func TestScheduleSendNotification(t *testing.T) { t.Fatal(err) } - wp.process(job) + if err := wp.process(job); err != nil { + t.Fatal(err) + } if len(wp.jobChan) == 0 { t.Fatal("expected job channel to contain a job") @@ -77,7 +79,9 @@ func TestScheduleSendNotification(t *testing.T) { t.Fatalf("expected pool to have a send_job_status job") } - wp.process(sendNotificationJob) + if err := wp.process(sendNotificationJob); err != nil { + t.Fatal(err) + } if !sendNotificationCalled { t.Fatalf("expected 'sendNotificationCalled' to equal true") @@ -124,8 +128,13 @@ func TestExecuteSendNotification(t *testing.T) { t.Fatal(err) } - wp.process(job) - wp.process(<-wp.jobChan) + if err := wp.process(job); err != nil { + t.Fatal(err) + } + + if err := wp.process(<-wp.jobChan); err != nil { + t.Fatal(err) + } if webhookJob.Type != "TestJobType" { t.Fatalf("expected webhook endpoint to have received a notification") @@ -175,8 +184,12 @@ func TestExecuteSendNotification(t *testing.T) { t.Fatal(err) } - wp.process(job) - wp.process(<-wp.jobChan) + if err := wp.process(job); err != nil { + t.Fatal(err) + } + if err := wp.process(<-wp.jobChan); err != nil { + t.Fatal(err) + } if webhookJob.Type != "TestJobType" { t.Fatalf("expected webhook endpoint to have received a notification") @@ -219,7 +232,9 @@ func TestExecuteSendNotification(t *testing.T) { t.Fatal(err) } - wp.process(job) + if err := wp.process(job); err != nil { + t.Fatal(err) + } if len(wp.jobChan) != 0 { t.Errorf("did not expect a job to be queued") @@ -259,9 +274,15 @@ func TestExecuteSendNotification(t *testing.T) { t.Fatal(err) } - wp.process(job) + if err := wp.process(job); err != nil { + t.Fatal() + } + sendNotificationJob := <-wp.jobChan - wp.process(sendNotificationJob) + + if err := wp.process(sendNotificationJob); err != nil { + t.Fatal(err) + } if len(hook.Entries) != 1 { t.Errorf("expected there to be one warning, got %d", len(hook.Entries)) @@ -329,12 +350,16 @@ func TestJobErrorMessages(t *testing.T) { // Explicitly retry to trigger n errors and a final successful execution, n = retryCount for n := 0; n < retryCount+1; n++ { - wp.process(job) + if err := wp.process(job); err != nil { + t.Fatal(err) + } } // Send the notification sendNotificationJob := <-wp.jobChan - wp.process(sendNotificationJob) + if err := wp.process(sendNotificationJob); err != nil { + t.Fatal(err) + } // Check log entries if len(hook.Entries) != retryCount { diff --git a/jobs/workerpool.go b/jobs/workerpool.go index 0c1036ed..20f7042f 100644 --- a/jobs/workerpool.go +++ b/jobs/workerpool.go @@ -12,6 +12,7 @@ import ( log "github.com/sirupsen/logrus" "github.com/flow-hydraulics/flow-wallet-api/datastore" + wallet_errors "github.com/flow-hydraulics/flow-wallet-api/errors" "github.com/flow-hydraulics/flow-wallet-api/system" ) @@ -173,9 +174,11 @@ func (wp *WorkerPool) Schedule(j *Job) error { entry.Debug("Scheduling job") - if wp.waitMaintenance() { - entry.Debug("In maintenance mode") - // In maintenance mode; prevent immediate scheduling and let dbScheduler handle this job + if halted, err := wp.systemHalted(); err != nil { + return fmt.Errorf("error while getting system settings: %w", err) + } else if halted { + // System halted; prevent immediate scheduling and let dbScheduler handle this job + entry.Debug("System halted") return nil } @@ -227,8 +230,11 @@ func (wp *WorkerPool) accept(job *Job) bool { return true } -func (wp *WorkerPool) waitMaintenance() bool { - return wp.systemService != nil && wp.systemService.IsMaintenanceMode() +func (wp *WorkerPool) systemHalted() (bool, error) { + if wp.systemService != nil { + return wp.systemService.IsHalted() + } + return false, nil } func (wp *WorkerPool) startDBJobScheduler() { @@ -243,8 +249,13 @@ func (wp *WorkerPool) startDBJobScheduler() { break jobPoolLoop } - // Check for maintenance mode - if wp.waitMaintenance() { + if halted, err := wp.systemHalted(); err != nil { + wp.logger. + WithFields(log.Fields{"error": err}). + Warn("Could not get system settings from DB") + restTime = wp.dbJobPollInterval + continue + } else if halted { restTime = wp.dbJobPollInterval continue } @@ -280,7 +291,31 @@ func (wp *WorkerPool) startWorkers() { break } - wp.process(job) + if err := wp.process(job); err != nil { + // Handle critical processing errors + + entry := job.logEntry(wp.logger.WithFields(log.Fields{ + "package": "jobs", + "function": "WorkerPool.startWorkers.goroutine", + "error": err, + })) + + if wallet_errors.IsChainConnectionError(err) { + if wp.systemService != nil { + entry.Warn("Unable to connect to chain, pausing system") + // Unable to connect to chain, pause system. + if err := wp.systemService.Pause(); err != nil { + entry. + WithFields(log.Fields{"error": err}). + Warn("Unable to pause system") + } + } else { + entry.Warn("Unable to connect to chain") + } + } else { + entry.Warn("Critical error while processing job") + } + } } }() } @@ -306,7 +341,7 @@ func (wp *WorkerPool) tryEnqueue(job *Job, block bool) bool { } } -func (wp *WorkerPool) process(job *Job) { +func (wp *WorkerPool) process(job *Job) error { entry := job.logEntry(wp.logger.WithFields(log.Fields{ "package": "jobs", "function": "WorkerPool.process", @@ -314,42 +349,49 @@ func (wp *WorkerPool) process(job *Job) { if !wp.accept(job) { entry.Info("Failed to accept job") - return + return nil } executor, exists := wp.executors[job.Type] if !exists { entry.Warn("Could not process job, no registered executor for type") + job.State = NoAvailableWorkers + if err := wp.store.UpdateJob(job); err != nil { - entry. - WithFields(log.Fields{"error": err}). - Warn("Could not update DB entry for job") + return fmt.Errorf("error while updating database entry: %w", err) } - return + + return nil } - err := executor(wp.context, job) - if err != nil { + if err := executor(wp.context, job); err != nil { + // Check for chain connection errors + if wallet_errors.IsChainConnectionError(err) { + // Stop processing this job any further, returning it to the pool. + return err + } + if job.ExecCount > wp.maxJobErrorCount || errors.Is(err, ErrPermanentFailure) { job.State = Failed } else { job.State = Error } + job.Error = err.Error() job.Errors = append(job.Errors, err.Error()) + entry. WithFields(log.Fields{"error": err}). Warn("Job execution resulted with error") + } else { job.State = Complete job.Error = "" // Clear the error message for the final & successful execution } if err := wp.store.UpdateJob(job); err != nil { - entry. - WithFields(log.Fields{"error": err}). - Warn("Could not update DB entry for job") + return fmt.Errorf("error while updating database entry: %w", err) } if (job.State == Failed || job.State == Complete) && job.ShouldSendNotification && wp.notificationConfig.ShouldSendJobStatus() { @@ -359,6 +401,8 @@ func (wp *WorkerPool) process(job *Job) { Warn("Could not schedule a status update notification for job") } } + + return nil } func (wp *WorkerPool) executeSendJobStatus(ctx context.Context, j *Job) error { diff --git a/keys/basic/keys.go b/keys/basic/keys.go index dbbdf61b..9fb8a7fa 100644 --- a/keys/basic/keys.go +++ b/keys/basic/keys.go @@ -78,7 +78,7 @@ func (s *KeyManager) CheckAdminProposalKeyCount(ctx context.Context) error { } } - if onChainCount != int(s.cfg.AdminProposalKeyCount) { + if onChainCount < int(s.cfg.AdminProposalKeyCount) { return fmt.Errorf( "configured: %d, onchain: %d, %w", s.cfg.AdminProposalKeyCount, @@ -89,7 +89,7 @@ func (s *KeyManager) CheckAdminProposalKeyCount(ctx context.Context) error { if inDBCount, err := s.store.ProposalKeyCount(); err != nil { return fmt.Errorf("error while fetching admin proposal key count from database: %w", err) - } else if inDBCount != int64(s.cfg.AdminProposalKeyCount) { + } else if inDBCount < int64(s.cfg.AdminProposalKeyCount) { return fmt.Errorf( "configured: %d, in database: %d, %w", s.cfg.AdminProposalKeyCount, diff --git a/main_test.go b/main_test.go index 925a9588..bd5a8cfe 100644 --- a/main_test.go +++ b/main_test.go @@ -146,6 +146,9 @@ func getTestApp(t *testing.T, cfg *configs.Config, ignoreLeaks bool) TestApp { fc, err := client.New(cfg.AccessAPIHost, grpc.WithInsecure()) fatal(t, err) + + fatal(t, fc.Ping(context.Background())) + t.Cleanup(func() { fc.Close() }) diff --git a/migrations/internal/m20211130/migration.go b/migrations/internal/m20211130/migration.go index 1882bd4c..ec0b2749 100644 --- a/migrations/internal/m20211130/migration.go +++ b/migrations/internal/m20211130/migration.go @@ -4,6 +4,7 @@ import ( "gorm.io/gorm" ) +// Note: there is an 'm' here, it is a typo but it should not be removed const ID = "m20211130" type Settings struct { diff --git a/migrations/internal/m20211221_2/migration.go b/migrations/internal/m20211221_2/migration.go new file mode 100644 index 00000000..07cc2636 --- /dev/null +++ b/migrations/internal/m20211221_2/migration.go @@ -0,0 +1,36 @@ +// m20211221_2 handles Settings.PausedSince migration +package m20211221_2 + +import ( + "database/sql" + + "gorm.io/gorm" +) + +const ID = "20211221_2" + +type Settings struct { + gorm.Model + MaintenanceMode bool `gorm:"column:maintenance_mode;default:false"` + PausedSince sql.NullTime `gorm:"column:paused_since"` +} + +func (Settings) TableName() string { + return "system_settings" +} + +func Migrate(tx *gorm.DB) error { + if err := tx.AutoMigrate(&Settings{}); err != nil { + return err + } + + return nil +} + +func Rollback(tx *gorm.DB) error { + if err := tx.Migrator().DropColumn(&Settings{}, "paused_since"); err != nil { + return err + } + + return nil +} diff --git a/migrations/migrations.go b/migrations/migrations.go index 9484539c..3db6f938 100644 --- a/migrations/migrations.go +++ b/migrations/migrations.go @@ -9,6 +9,7 @@ import ( "github.com/flow-hydraulics/flow-wallet-api/migrations/internal/m20211202" "github.com/flow-hydraulics/flow-wallet-api/migrations/internal/m20211220" "github.com/flow-hydraulics/flow-wallet-api/migrations/internal/m20211221_1" + "github.com/flow-hydraulics/flow-wallet-api/migrations/internal/m20211221_2" "github.com/go-gormigrate/gormigrate/v2" ) @@ -54,6 +55,11 @@ func List() []*gormigrate.Migration { Migrate: m20211221_1.Migrate, Rollback: m20211221_1.Rollback, }, + { + ID: m20211221_2.ID, + Migrate: m20211221_2.Migrate, + Rollback: m20211221_2.Rollback, + }, } return ms } diff --git a/system/options.go b/system/options.go new file mode 100644 index 00000000..d6a4820f --- /dev/null +++ b/system/options.go @@ -0,0 +1,13 @@ +package system + +import ( + "time" +) + +type ServiceOption func(*Service) + +func WithPauseDuration(duration time.Duration) ServiceOption { + return func(svc *Service) { + svc.pauseDuration = duration + } +} diff --git a/system/service.go b/system/service.go index de5cf052..bcfb8990 100644 --- a/system/service.go +++ b/system/service.go @@ -1,17 +1,32 @@ package system import ( + "database/sql" "fmt" + "time" log "github.com/sirupsen/logrus" ) type Service struct { - store Store + store Store + pauseDuration time.Duration } -func NewService(store Store) *Service { - return &Service{store} +const defaultPauseDuration = time.Minute + +func NewService(store Store, opts ...ServiceOption) *Service { + svc := &Service{ + store: store, + pauseDuration: defaultPauseDuration, + } + + // Go through options + for _, opt := range opts { + opt(svc) + } + + return svc } func (svc *Service) GetSettings() (*Settings, error) { @@ -26,16 +41,20 @@ func (svc *Service) SaveSettings(settings *Settings) error { return svc.store.SaveSettings(settings) } -func (svc *Service) IsMaintenanceMode() bool { +func (svc *Service) Pause() error { + log.Trace("Pause system") settings, err := svc.GetSettings() if err != nil { - log. - WithFields(log.Fields{ - "error": err, - "package": "system", - "function": "IsMaintenanceMode", - }). - Warn("Error while getting system settings") + return err + } + settings.PausedSince = sql.NullTime{Time: time.Now(), Valid: true} + return svc.SaveSettings(settings) +} + +func (svc *Service) IsHalted() (bool, error) { + s, err := svc.GetSettings() + if err != nil { + return false, err } - return err == nil && settings.MaintenanceMode + return s.IsMaintenanceMode() || s.IsPaused(svc.pauseDuration), nil } diff --git a/system/system.go b/system/system.go index db622464..31d37bd8 100644 --- a/system/system.go +++ b/system/system.go @@ -1,14 +1,17 @@ package system import ( + "database/sql" "fmt" + "time" "gorm.io/gorm" ) type Settings struct { gorm.Model - MaintenanceMode bool `gorm:"column:maintenance_mode;default:false"` + MaintenanceMode bool `gorm:"column:maintenance_mode;default:false"` + PausedSince sql.NullTime `gorm:"column:paused_since"` } func (s *Settings) String() string { @@ -26,6 +29,14 @@ func (s *Settings) ToJSON() SettingsJSON { } } +func (s *Settings) IsMaintenanceMode() bool { + return s.MaintenanceMode +} + +func (s *Settings) IsPaused(pauseDuration time.Duration) bool { + return s.PausedSince.Valid && s.PausedSince.Time.After(time.Now().Add(-pauseDuration)) +} + // Update fields according to JSON version func (s *Settings) FromJSON(j SettingsJSON) { s.MaintenanceMode = j.MaintenanceMode diff --git a/tests/internal/test/flow.go b/tests/internal/test/flow.go index dda33698..02a0449e 100644 --- a/tests/internal/test/flow.go +++ b/tests/internal/test/flow.go @@ -29,6 +29,10 @@ func NewFlowClient(t *testing.T, cfg *configs.Config) *client.Client { t.Fatal(err) } + if err := fc.Ping(context.Background()); err != nil { + t.Fatal(err) + } + close := func() { err := fc.Close() if err != nil { diff --git a/tests/system_test.go b/tests/system_test.go index 646c9c6b..a925b017 100644 --- a/tests/system_test.go +++ b/tests/system_test.go @@ -2,11 +2,13 @@ package tests import ( "bytes" + "database/sql" "io" "io/ioutil" "net/http" "strings" "testing" + "time" "github.com/flow-hydraulics/flow-wallet-api/handlers" "github.com/flow-hydraulics/flow-wallet-api/tests/internal/test" @@ -72,22 +74,77 @@ func TestIsMaintenanceMode(t *testing.T) { sysService := svcs.GetSystem() - if sysService.IsMaintenanceMode() { + settings, err := sysService.GetSettings() + if err != nil { + t.Fatal(err) + } + + if settings.IsMaintenanceMode() { t.Error("expected system not to be in maintenance mode") } + settings.MaintenanceMode = true + + if err := sysService.SaveSettings(settings); err != nil { + t.Fatal(err) + } + + settings, err = sysService.GetSettings() + if err != nil { + t.Fatal(err) + } + + if !settings.IsMaintenanceMode() { + t.Error("expected system to be in maintenance mode") + } +} + +func TestIsPaused(t *testing.T) { + cfg := test.LoadConfig(t, testConfigPath) + svcs := test.GetServices(t, cfg) + + sysService := svcs.GetSystem() + settings, err := sysService.GetSettings() if err != nil { t.Fatal(err) } - settings.MaintenanceMode = true + if settings.IsPaused(time.Minute) { + t.Error("expected system not to be paused") + } + + settings.PausedSince = sql.NullTime{Time: time.Now(), Valid: true} if err := sysService.SaveSettings(settings); err != nil { t.Fatal(err) } - if !sysService.IsMaintenanceMode() { - t.Error("expected system to be in maintenance mode") + settings, err = sysService.GetSettings() + if err != nil { + t.Fatal(err) + } + + if !settings.IsPaused(time.Minute) { + t.Error("expected system to be paused") + } +} + +func TestPausing(t *testing.T) { + cfg := test.LoadConfig(t, testConfigPath) + svcs := test.GetServices(t, cfg) + + sysService := svcs.GetSystem() + + if halted, err := sysService.IsHalted(); err != nil || halted { + t.Error("expected system not to be halted") + } + + if err := sysService.Pause(); err != nil { + t.Fatal(err) + } + + if halted, err := sysService.IsHalted(); err != nil || !halted { + t.Error("expected system to be halted") } }