From d99f36fbb5f3958ff086cf4f4d16adf4471df95c Mon Sep 17 00:00:00 2001 From: Lauri Junkkari Date: Thu, 2 Sep 2021 08:36:48 +0300 Subject: [PATCH 1/4] Clean up --- chain_events/lib.go | 8 ++++++++ chain_events/listener.go | 20 ++++++-------------- 2 files changed, 14 insertions(+), 14 deletions(-) create mode 100644 chain_events/lib.go diff --git a/chain_events/lib.go b/chain_events/lib.go new file mode 100644 index 00000000..696868c3 --- /dev/null +++ b/chain_events/lib.go @@ -0,0 +1,8 @@ +package chain_events + +func min(x, y uint64) uint64 { + if x > y { + return y + } + return x +} diff --git a/chain_events/listener.go b/chain_events/listener.go index 97e94db5..08fe2b88 100644 --- a/chain_events/listener.go +++ b/chain_events/listener.go @@ -73,14 +73,14 @@ func (l *Listener) handleError(err error) { func (l *Listener) Start() *Listener { if l.ticker != nil { + // Already started return l } l.ticker = time.NewTicker(l.interval) go func() { - ctx := context.Background() - ctx, cancel := context.WithCancel(ctx) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() status, err := l.db.GetListenerStatus() @@ -93,22 +93,21 @@ func (l *Listener) Start() *Listener { case <-l.done: return case <-l.ticker.C: - cur, err := l.fc.GetLatestBlock(ctx, true) + latestBlock, err := l.fc.GetLatestBlock(ctx, true) if err != nil { l.handleError(err) continue } - curHeight := cur.Height + curHeight := latestBlock.Height if curHeight > status.LatestHeight { - start := status.LatestHeight + 1 // latestHeight has already been checked, add 1 + start := status.LatestHeight + 1 // LatestHeight has already been checked, add 1 end := min(curHeight, start+l.maxDiff) // Limit maximum end if err := l.run(ctx, start, end); err != nil { l.handleError(err) continue } status.LatestHeight = end - err := l.db.UpdateListenerStatus(status) - if err != nil { + if err := l.db.UpdateListenerStatus(status); err != nil { l.handleError(err) continue } @@ -125,10 +124,3 @@ func (l *Listener) Stop() { l.done <- true l.ticker = nil } - -func min(x, y uint64) uint64 { - if x > y { - return y - } - return x -} From 6aa73281837504d534ff01be1b80345e5e1ba13c Mon Sep 17 00:00:00 2001 From: Lauri Junkkari Date: Thu, 2 Sep 2021 12:04:21 +0300 Subject: [PATCH 2/4] Refactor chain event polling Resolves #177 Locks the status row in database so only one poller can run at a time Makes starting height, max blocks to check and polling interval configurable --- Makefile | 2 +- chain_events/listener.go | 124 ++++++++++++++++++++++++++----------- chain_events/store.go | 3 +- chain_events/store_gorm.go | 27 ++++++-- configs/configs.go | 10 +++ docker-compose.yml | 13 +++- main.go | 17 +++-- 7 files changed, 140 insertions(+), 56 deletions(-) diff --git a/Makefile b/Makefile index c746921c..81088e98 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ .PHONY: dev dev: - docker-compose up -d db emulator + docker-compose up -d db pgadmin emulator docker-compose logs -f .PHONY: stop diff --git a/chain_events/listener.go b/chain_events/listener.go index 08fe2b88..865a01a3 100644 --- a/chain_events/listener.go +++ b/chain_events/listener.go @@ -2,7 +2,9 @@ package chain_events import ( "context" - "fmt" + "log" + "os" + "strings" "time" "github.com/onflow/flow-go-sdk" @@ -13,13 +15,15 @@ import ( type GetEventTypes func() []string type Listener struct { - ticker *time.Ticker - done chan bool - fc *client.Client - db Store - maxDiff uint64 - interval time.Duration - getTypes GetEventTypes + ticker *time.Ticker + done chan bool + logger *log.Logger + fc *client.Client + db Store + getTypes GetEventTypes + maxBlocks uint64 + interval time.Duration + startingHeight uint64 } type ListenerStatus struct { @@ -32,35 +36,44 @@ func (ListenerStatus) TableName() string { } func NewListener( + logger *log.Logger, fc *client.Client, db Store, + getTypes GetEventTypes, maxDiff uint64, interval time.Duration, - getTypes GetEventTypes, + startingHeight uint64, ) *Listener { - return &Listener{nil, make(chan bool), fc, db, maxDiff, interval, getTypes} + if logger == nil { + logger = log.New(os.Stdout, "[EVENT-POLLER] ", log.LstdFlags|log.Lshortfile) + } + return &Listener{ + nil, make(chan bool), + logger, fc, db, getTypes, + maxDiff, interval, startingHeight, + } } func (l *Listener) run(ctx context.Context, start, end uint64) error { - ee := make([]flow.Event, 0) + events := make([]flow.Event, 0) - types := l.getTypes() + eventTypes := l.getTypes() - for _, t := range types { + for _, t := range eventTypes { r, err := l.fc.GetEventsForHeightRange(ctx, client.EventRangeQuery{ Type: t, StartHeight: start, EndHeight: end, }) if err != nil { - return fmt.Errorf("error while fetching events: %w", err) + return err } for _, b := range r { - ee = append(ee, b.Events...) + events = append(events, b.Events...) } } - for _, event := range ee { + for _, event := range events { Event.Trigger(event) } @@ -68,7 +81,10 @@ func (l *Listener) run(ctx context.Context, start, end uint64) error { } func (l *Listener) handleError(err error) { - fmt.Println(err) + l.logger.Println(err) + if strings.Contains(err.Error(), "key not found") { + l.logger.Println(`"key not found" error indicates data is not available at this height, please manually set correct starting height`) + } } func (l *Listener) Start() *Listener { @@ -77,49 +93,83 @@ func (l *Listener) Start() *Listener { return l } + if err := l.initHeight(); err != nil { + if strings.Contains(err.Error(), "could not obtain lock on row") { + // Skip as another listener is already handling this + } else { + panic(err) + } + } + + // TODO (latenssi): should use random intervals instead l.ticker = time.NewTicker(l.interval) go func() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - status, err := l.db.GetListenerStatus() - if err != nil { - panic(err) - } - for { select { case <-l.done: return case <-l.ticker.C: - latestBlock, err := l.fc.GetLatestBlock(ctx, true) - if err != nil { - l.handleError(err) - continue - } - curHeight := latestBlock.Height - if curHeight > status.LatestHeight { - start := status.LatestHeight + 1 // LatestHeight has already been checked, add 1 - end := min(curHeight, start+l.maxDiff) // Limit maximum end - if err := l.run(ctx, start, end); err != nil { - l.handleError(err) - continue + lockErr := l.db.LockedStatus(func(status *ListenerStatus) error { + latestBlock, err := l.fc.GetLatestBlock(ctx, true) + if err != nil { + return err + } + + if latestBlock.Height > status.LatestHeight { + start := status.LatestHeight + 1 // LatestHeight has already been checked, add 1 + end := min(latestBlock.Height, start+l.maxBlocks) // Limit maximum end + if err := l.run(ctx, start, end); err != nil { + return err + } + status.LatestHeight = end } - status.LatestHeight = end - if err := l.db.UpdateListenerStatus(status); err != nil { - l.handleError(err) + + return nil + }) + + if lockErr != nil { + if strings.Contains(lockErr.Error(), "could not obtain lock on row") { + // Skip as another listener is already handling this round continue } + l.handleError(lockErr) } } } }() + l.logger.Println("started") + return l } +func (l *Listener) initHeight() error { + return l.db.LockedStatus(func(status *ListenerStatus) error { + if l.startingHeight > 0 && status.LatestHeight < l.startingHeight-1 { + status.LatestHeight = l.startingHeight - 1 + } + + if status.LatestHeight == 0 { + // If starting fresh, we need to start from the latest block as we can't + // know what is the root of the current spork. + // Data on Flow is only accessible for the current spork height. + latestBlock, err := l.fc.GetLatestBlock(context.Background(), true) + if err != nil { + return err + } + status.LatestHeight = latestBlock.Height + } + + return nil + }) +} + func (l *Listener) Stop() { + l.logger.Println("stopping...") l.ticker.Stop() l.done <- true l.ticker = nil diff --git a/chain_events/store.go b/chain_events/store.go index 778773d9..f6d7af12 100644 --- a/chain_events/store.go +++ b/chain_events/store.go @@ -2,6 +2,5 @@ package chain_events // Store manages data regarding tokens. type Store interface { - UpdateListenerStatus(s *ListenerStatus) error - GetListenerStatus() (*ListenerStatus, error) + LockedStatus(func(*ListenerStatus) error) error } diff --git a/chain_events/store_gorm.go b/chain_events/store_gorm.go index 33d26041..f1eb4f73 100644 --- a/chain_events/store_gorm.go +++ b/chain_events/store_gorm.go @@ -2,6 +2,7 @@ package chain_events import ( "gorm.io/gorm" + "gorm.io/gorm/clause" ) type GormStore struct { @@ -13,11 +14,25 @@ func NewGormStore(db *gorm.DB) *GormStore { return &GormStore{db} } -func (s *GormStore) GetListenerStatus() (t *ListenerStatus, err error) { - err = s.db.FirstOrCreate(&t).Error - return -} +// LockedStatus runs a transaction on the database manipulating 'status' of type ListenerStatus. +// It takes a function 'fn' as argument. In the context of 'fn' 'status' is locked. +// Uses NOWAIT modifier on the lock so simultaneous requests can be ignored. +func (s *GormStore) LockedStatus(fn func(status *ListenerStatus) error) error { + return s.db.Transaction(func(tx *gorm.DB) error { + status := ListenerStatus{} + + if err := tx.Clauses(clause.Locking{Strength: "UPDATE", Options: "NOWAIT"}).FirstOrCreate(&status).Error; err != nil { + return err // rollback + } + + if err := fn(&status); err != nil { + return err // rollback + } + + if err := tx.Save(&status).Error; err != nil { + return err // rollback + } -func (s *GormStore) UpdateListenerStatus(t *ListenerStatus) error { - return s.db.Save(&t).Error + return nil // commit + }) } diff --git a/configs/configs.go b/configs/configs.go index 499951e7..4be1ae58 100644 --- a/configs/configs.go +++ b/configs/configs.go @@ -82,6 +82,16 @@ type Config struct { // Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". // For more info: https://pkg.go.dev/time#ParseDuration TransactionTimeout time.Duration `env:"FLOW_WALLET_TRANSACTION_TIMEOUT" envDefault:"0"` + + // Set the starting height for event polling. This won't have any effect if the value in + // database (chain_event_status[0].latest_height) is greater. + // If 0 (default) use latest block height if starting fresh (no previous value in database). + ChainListenerStartingHeight uint64 `env:"FLOW_WALLET_EVENTS_STARTING_HEIGHT" envDefault:"0"` + // Maximum number of blocks to check at once. + ChainListenerMaxBlocks uint64 `env:"FLOW_WALLET_EVENTS_MAX_BLOCKS" envDefault:"100"` + // Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". + // For more info: https://pkg.go.dev/time#ParseDuration + ChainListenerInterval time.Duration `env:"FLOW_WALLET_EVENTS_INTERVAL" envDefault:"10s"` } type Options struct { diff --git a/docker-compose.yml b/docker-compose.yml index 42fcc5c5..6d63405e 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -2,6 +2,7 @@ version: "3.9" services: db: image: postgres:13-alpine + restart: unless-stopped ports: - "5432:5432" environment: @@ -9,14 +10,23 @@ services: - POSTGRES_USER=wallet - POSTGRES_PASSWORD=wallet + pgadmin: + image: dpage/pgadmin4 + restart: unless-stopped + environment: + PGADMIN_DEFAULT_EMAIL: admin@example.com + PGADMIN_DEFAULT_PASSWORD: admin + ports: + - "5050:80" + wallet: build: context: . dockerfile: ./docker/wallet/Dockerfile network: host # docker build sometimes has problems fetching from alpine's CDN + restart: unless-stopped ports: - "3000:3000" - restart: unless-stopped env_file: - ./.env environment: @@ -30,6 +40,7 @@ services: emulator: image: gcr.io/flow-container-registry/emulator:v0.23.0 + restart: unless-stopped command: emulator -v ports: - "3569:3569" diff --git a/main.go b/main.go index 0f681a0b..e40cdd3e 100644 --- a/main.go +++ b/main.go @@ -69,6 +69,7 @@ func runServer(cfg *configs.Config) { // Application wide logger ls := log.New(os.Stdout, "[SERVER] ", log.LstdFlags|log.Lshortfile) lj := log.New(os.Stdout, "[JOBS] ", log.LstdFlags|log.Lshortfile) + le := log.New(os.Stdout, "[EVENT-POLLER] ", log.LstdFlags|log.Lshortfile) ls.Printf("Starting server (v%s)...\n", version) @@ -235,11 +236,7 @@ func runServer(cfg *configs.Config) { // Chain event listener if !cfg.DisableChainEvents { - ls.Println("Starting chain event listener..") - store := chain_events.NewGormStore(db) - maxDiff := uint64(100) // maximum number of blocks to check each iteration, TODO: make this configurable - interval := 10 * time.Second // TODO: make this configurable getTypes := func() []string { // Get all enabled tokens tt, err := templateService.ListTokens(templates.NotSpecified) @@ -258,12 +255,14 @@ func runServer(cfg *configs.Config) { return event_types } - listener := chain_events.NewListener(fc, store, maxDiff, interval, getTypes) + listener := chain_events.NewListener( + le, fc, store, getTypes, + cfg.ChainListenerMaxBlocks, + cfg.ChainListenerInterval, + cfg.ChainListenerStartingHeight, + ) - defer func() { - ls.Println("Stopping event listener..") - listener.Stop() - }() + defer listener.Stop() // Register a handler for chain events chain_events.Event.Register(&tokens.ChainEventHandler{ From 7f703c598559eb9c446f8cface2e515baa8bc006 Mon Sep 17 00:00:00 2001 From: Lauri Junkkari Date: Thu, 2 Sep 2021 12:31:05 +0300 Subject: [PATCH 3/4] More generalized handling of lock errors --- chain_events/listener.go | 10 ++++++---- chain_events/store.go | 8 ++++++++ chain_events/store_gorm.go | 11 ++++++++++- 3 files changed, 24 insertions(+), 5 deletions(-) diff --git a/chain_events/listener.go b/chain_events/listener.go index 865a01a3..b9659708 100644 --- a/chain_events/listener.go +++ b/chain_events/listener.go @@ -94,8 +94,9 @@ func (l *Listener) Start() *Listener { } if err := l.initHeight(); err != nil { - if strings.Contains(err.Error(), "could not obtain lock on row") { - // Skip as another listener is already handling this + _, ok := err.(*LockError) + if ok { + // Skip LockError as it means another listener is already handling this } else { panic(err) } @@ -132,8 +133,9 @@ func (l *Listener) Start() *Listener { }) if lockErr != nil { - if strings.Contains(lockErr.Error(), "could not obtain lock on row") { - // Skip as another listener is already handling this round + _, ok := lockErr.(*LockError) + if ok { + // Skip on LockError as it means another listener is already handling this round continue } l.handleError(lockErr) diff --git a/chain_events/store.go b/chain_events/store.go index f6d7af12..25fd0cfc 100644 --- a/chain_events/store.go +++ b/chain_events/store.go @@ -4,3 +4,11 @@ package chain_events type Store interface { LockedStatus(func(*ListenerStatus) error) error } + +type LockError struct { + Err error +} + +func (e *LockError) Error() string { + return e.Err.Error() +} diff --git a/chain_events/store_gorm.go b/chain_events/store_gorm.go index f1eb4f73..f5fec8a3 100644 --- a/chain_events/store_gorm.go +++ b/chain_events/store_gorm.go @@ -1,6 +1,8 @@ package chain_events import ( + "strings" + "gorm.io/gorm" "gorm.io/gorm/clause" ) @@ -18,7 +20,7 @@ func NewGormStore(db *gorm.DB) *GormStore { // It takes a function 'fn' as argument. In the context of 'fn' 'status' is locked. // Uses NOWAIT modifier on the lock so simultaneous requests can be ignored. func (s *GormStore) LockedStatus(fn func(status *ListenerStatus) error) error { - return s.db.Transaction(func(tx *gorm.DB) error { + txErr := s.db.Transaction(func(tx *gorm.DB) error { status := ListenerStatus{} if err := tx.Clauses(clause.Locking{Strength: "UPDATE", Options: "NOWAIT"}).FirstOrCreate(&status).Error; err != nil { @@ -35,4 +37,11 @@ func (s *GormStore) LockedStatus(fn func(status *ListenerStatus) error) error { return nil // commit }) + + // Need to handle implementation specific error message + if txErr != nil && strings.Contains(txErr.Error(), "could not obtain lock on row") { + return &LockError{Err: txErr} + } + + return txErr } From 6c574dfbf7d67fbe550c746cf1c65e6ca74e8489 Mon Sep 17 00:00:00 2001 From: Lauri Junkkari Date: Thu, 2 Sep 2021 12:34:22 +0300 Subject: [PATCH 4/4] Clean up --- chain_events/listener.go | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/chain_events/listener.go b/chain_events/listener.go index b9659708..a88aaea1 100644 --- a/chain_events/listener.go +++ b/chain_events/listener.go @@ -95,11 +95,10 @@ func (l *Listener) Start() *Listener { if err := l.initHeight(); err != nil { _, ok := err.(*LockError) - if ok { - // Skip LockError as it means another listener is already handling this - } else { + if !ok { panic(err) } + // Skip LockError as it means another listener is already handling this } // TODO (latenssi): should use random intervals instead @@ -134,11 +133,10 @@ func (l *Listener) Start() *Listener { if lockErr != nil { _, ok := lockErr.(*LockError) - if ok { - // Skip on LockError as it means another listener is already handling this round - continue + if !ok { + l.handleError(lockErr) } - l.handleError(lockErr) + // Skip on LockError as it means another listener is already handling this round } } }