diff --git a/.env.example b/.env.example index 0efb6611..1837071f 100644 --- a/.env.example +++ b/.env.example @@ -29,4 +29,13 @@ ENABLED_TOKENS=FUSD:0xf8d6e0586b0a20c7:fusd,FlowToken:0x0ae53cb6e3f42a79:flowTok # This sets the number of proposal keys to be used on the admin account. # You can increase transaction throughput by using multiple proposal keys for # parallel transaction execution. -ADMIN_PROPOSAL_KEY_COUNT=50 \ No newline at end of file +ADMIN_PROPOSAL_KEY_COUNT=50 + +# Defines the maximum number of active jobs that can be queued before +# new jobs are rejected. +# WORKER_QUEUE_CAPACITY=1000 (default) + +# Number of concurrent workers handling incoming jobs. +# You can increase the number of workers if you're sending +# too many transactions and find that the queue is often backlogged. +# WORKER_COUNT=100 (default) diff --git a/jobs/jobs.go b/jobs/jobs.go index 253bf2ac..fae910ec 100644 --- a/jobs/jobs.go +++ b/jobs/jobs.go @@ -12,15 +12,12 @@ import ( ) type WorkerPool struct { - log *log.Logger - wg *sync.WaitGroup - workers []*Worker - store Store -} - -type Worker struct { - pool *WorkerPool - jobChan *chan *Job + log *log.Logger + wg *sync.WaitGroup + store Store + jobChan chan *Job + capacity uint + workerCount uint } type Job struct { @@ -52,19 +49,30 @@ func (j *Job) Wait(wait bool) error { return nil } -func NewWorkerPool(l *log.Logger, db Store) *WorkerPool { - return &WorkerPool{l, &sync.WaitGroup{}, []*Worker{}, db} +func NewWorkerPool(l *log.Logger, db Store, capacity uint, workerCount uint) *WorkerPool { + wg := &sync.WaitGroup{} + jobChan := make(chan *Job, capacity) + + pool := &WorkerPool{l, wg, db, jobChan, capacity, workerCount} + + pool.startWorkers() + + return pool } -func (p *WorkerPool) AddWorker(capacity uint) { - if len(p.workers) > 0 { - panic("multiple workers not supported yet") +func (p *WorkerPool) startWorkers() { + for i := uint(0); i < p.workerCount; i++ { + p.wg.Add(1) + go func() { + defer p.wg.Done() + for job := range p.jobChan { + if job == nil { + break + } + p.process(job) + } + }() } - p.wg.Add(1) - jobChan := make(chan *Job, capacity) - worker := &Worker{p, &jobChan} - p.workers = append(p.workers, worker) - go worker.start() } func (p *WorkerPool) AddJob(do func() (string, error)) (*Job, error) { @@ -73,16 +81,7 @@ func (p *WorkerPool) AddJob(do func() (string, error)) (*Job, error) { return job, err } - worker, err := p.AvailableWorker() - if err != nil { - job.Status = NoAvailableWorkers - if err := p.store.UpdateJob(job); err != nil { - p.log.Println("WARNING: Could not update DB entry for Job", job.ID) - } - return job, &errors.JobQueueFull{Err: fmt.Errorf(job.Status.String())} - } - - if !worker.tryEnqueue(job) { + if !p.tryEnqueue(job) { job.Status = QueueFull if err := p.store.UpdateJob(job); err != nil { p.log.Println("WARNING: Could not update DB entry for Job", job.ID) @@ -98,56 +97,36 @@ func (p *WorkerPool) AddJob(do func() (string, error)) (*Job, error) { return job, nil } -func (p *WorkerPool) AvailableWorker() (*Worker, error) { - // TODO: support multiple workers, use load balancing - if len(p.workers) < 1 { - return nil, fmt.Errorf("no available workers") - } - return p.workers[0], nil -} - func (p *WorkerPool) Stop() { - for _, w := range p.workers { - close(*w.jobChan) - } + close(p.jobChan) p.wg.Wait() } -func (w *Worker) start() { - defer w.pool.wg.Done() - for job := range *w.jobChan { - if job == nil { - return - } - w.process(job) - } -} - -func (w *Worker) tryEnqueue(job *Job) bool { +func (p *WorkerPool) tryEnqueue(job *Job) bool { select { - case *w.jobChan <- job: + case p.jobChan <- job: return true default: return false } } -func (w *Worker) process(job *Job) { +func (p *WorkerPool) process(job *Job) { result, err := job.Do() job.Result = result if err != nil { - if w.pool.log != nil { - w.pool.log.Printf("[Job %s] Error while processing job: %s\n", job.ID, err) + if p.log != nil { + p.log.Printf("[Job %s] Error while processing job: %s\n", job.ID, err) } job.Status = Error job.Error = err.Error() - if err := w.pool.store.UpdateJob(job); err != nil { - w.pool.log.Println("WARNING: Could not update DB entry for Job", job.ID) + if err := p.store.UpdateJob(job); err != nil { + p.log.Println("WARNING: Could not update DB entry for Job", job.ID) } return } job.Status = Complete - if err := w.pool.store.UpdateJob(job); err != nil { - w.pool.log.Println("WARNING: Could not update DB entry for Job", job.ID) + if err := p.store.UpdateJob(job); err != nil { + p.log.Println("WARNING: Could not update DB entry for Job", job.ID) } } diff --git a/main.go b/main.go index cdb8ad8b..f8e881bd 100644 --- a/main.go +++ b/main.go @@ -41,6 +41,9 @@ type Config struct { Port int `env:"PORT" envDefault:"3000"` AccessAPIHost string `env:"ACCESS_API_HOST,notEmpty"` ChainID flow.ChainID `env:"CHAIN_ID" envDefault:"flow-emulator"` + + WorkerQueueCapacity uint `env:"WORKER_QUEUE_CAPACITY" envDefault:"1000"` + WorkerCount uint `env:"WORKER_COUNT" envDefault:"100"` } func main() { @@ -119,15 +122,12 @@ func runServer(disableRawTx, disableFt, disableNft, disableChainEvents bool) { tokenStore := tokens.NewGormStore(db) // Create a worker pool - wp := jobs.NewWorkerPool(lj, jobStore) + wp := jobs.NewWorkerPool(lj, jobStore, cfg.WorkerQueueCapacity, cfg.WorkerCount) defer func() { ls.Println("Stopping worker pool..") wp.Stop() }() - // TODO: make this configurable - wp.AddWorker(100) // Add a worker with capacity of 100 - // Key manager km := basic.NewKeyManager(keyStore, fc) diff --git a/main_test.go b/main_test.go index 382a36fb..59168381 100644 --- a/main_test.go +++ b/main_test.go @@ -167,9 +167,8 @@ func TestAccountServices(t *testing.T) { km := basic.NewKeyManager(keyStore, fc) - wp := jobs.NewWorkerPool(nil, jobStore) + wp := jobs.NewWorkerPool(nil, jobStore, 100, 1) defer wp.Stop() - wp.AddWorker(1) service := accounts.NewService(accountStore, km, fc, wp, nil, templateService) txService := transactions.NewService(txStore, km, fc, wp) @@ -283,9 +282,8 @@ func TestAccountHandlers(t *testing.T) { km := basic.NewKeyManager(keyStore, fc) - wp := jobs.NewWorkerPool(nil, jobStore) + wp := jobs.NewWorkerPool(nil, jobStore, 100, 1) defer wp.Stop() - wp.AddWorker(1) store := accounts.NewGormStore(db) service := accounts.NewService(store, km, fc, wp, nil, templateService) @@ -434,9 +432,8 @@ func TestTransactionHandlers(t *testing.T) { km := basic.NewKeyManager(keyStore, fc) - wp := jobs.NewWorkerPool(nil, jobStore) + wp := jobs.NewWorkerPool(nil, jobStore, 100, 1) defer wp.Stop() - wp.AddWorker(1) store := transactions.NewGormStore(db) service := transactions.NewService(store, km, fc, wp) @@ -792,9 +789,8 @@ func TestTokenServices(t *testing.T) { km := basic.NewKeyManager(keyStore, fc) - wp := jobs.NewWorkerPool(nil, jobStore) + wp := jobs.NewWorkerPool(nil, jobStore, 100, 1) defer wp.Stop() - wp.AddWorker(1) transactionService := transactions.NewService(transactionStore, km, fc, wp) accountService := accounts.NewService(accountStore, km, fc, wp, transactionService, templateService) @@ -984,9 +980,8 @@ func TestTokenHandlers(t *testing.T) { km := basic.NewKeyManager(keyStore, fc) - wp := jobs.NewWorkerPool(nil, jobStore) + wp := jobs.NewWorkerPool(nil, jobStore, 100, 1) defer wp.Stop() - wp.AddWorker(1) transactionService := transactions.NewService(transactionStore, km, fc, wp) accountService := accounts.NewService(accountStore, km, fc, wp, transactionService, templateService) @@ -1526,9 +1521,8 @@ func TestNFTDeployment(t *testing.T) { km := basic.NewKeyManager(keyStore, fc) - wp := jobs.NewWorkerPool(nil, jobStore) + wp := jobs.NewWorkerPool(nil, jobStore, 100, 1) defer wp.Stop() - wp.AddWorker(1) transactionService := transactions.NewService(transactionStore, km, fc, wp) accountService := accounts.NewService(accountStore, km, fc, wp, transactionService, templateService)