Skip to content

Commit

Permalink
Merge pull request #244 from flow-hydraulics/feature/227-record-retry…
Browse files Browse the repository at this point in the history
…-errors

Record job execution errors on retries
  • Loading branch information
nanuuki authored Dec 21, 2021
2 parents 030810b + c4481b8 commit 9b20b3c
Show file tree
Hide file tree
Showing 7 changed files with 178 additions and 19 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ require (
github.com/gorilla/mux v1.8.0
github.com/joho/godotenv v1.4.0
github.com/jpillora/backoff v1.0.0
github.com/lib/pq v1.10.2
github.com/logrusorgru/aurora v2.0.3+incompatible // indirect
github.com/onflow/cadence v0.20.0
github.com/onflow/flow-go-sdk v0.23.0
Expand Down
4 changes: 4 additions & 0 deletions jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"time"

"github.com/google/uuid"
"github.com/lib/pq"
log "github.com/sirupsen/logrus"
"gorm.io/datatypes"
"gorm.io/gorm"
Expand Down Expand Up @@ -32,6 +33,7 @@ type Job struct {
Type string `gorm:"column:type"`
State State `gorm:"column:state;default:INIT"`
Error string `gorm:"column:error"`
Errors pq.StringArray `gorm:"column:errors;type:text[]"`
Result string `gorm:"column:result"`
TransactionID string `gorm:"column:transaction_id"`
ExecCount int `gorm:"column:exec_count;default:0"`
Expand Down Expand Up @@ -61,6 +63,7 @@ type JSONResponse struct {
Type string `json:"type"`
State State `json:"state"`
Error string `json:"error"`
Errors []string `json:"errors"`
Result string `json:"result"`
TransactionID string `json:"transactionId"`
CreatedAt time.Time `json:"createdAt"`
Expand All @@ -73,6 +76,7 @@ func (j Job) ToJSONResponse() JSONResponse {
Type: j.Type,
State: j.State,
Error: j.Error,
Errors: []string(j.Errors),
Result: j.Result,
TransactionID: j.TransactionID,
CreatedAt: j.CreatedAt,
Expand Down
109 changes: 98 additions & 11 deletions jobs/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"reflect"

"net/http"
"net/http/httptest"
Expand All @@ -22,7 +23,10 @@ func (*dummyStore) Jobs(datastore.ListOptions) ([]Job, error) { return nil, nil
func (*dummyStore) Job(id uuid.UUID) (Job, error) { return Job{}, nil }
func (*dummyStore) InsertJob(*Job) error { return nil }
func (*dummyStore) UpdateJob(*Job) error { return nil }
func (*dummyStore) IncreaseExecCount(j *Job) error { return nil }
func (*dummyStore) IncreaseExecCount(j *Job) error {
j.ExecCount = j.ExecCount + 1
return nil
}
func (*dummyStore) SchedulableJobs(acceptedGracePeriod, reSchedulableGracePeriod time.Duration, o datastore.ListOptions) ([]Job, error) {
return nil, nil
}
Expand Down Expand Up @@ -192,11 +196,12 @@ func TestExecuteSendNotification(t *testing.T) {

ctx, cancel := context.WithCancel(context.Background())
wp := WorkerPool{
context: ctx,
cancelContext: cancel,
executors: make(map[string]ExecutorFunc),
jobChan: make(chan *Job, 1),
store: &dummyStore{},
context: ctx,
cancelContext: cancel,
executors: make(map[string]ExecutorFunc),
jobChan: make(chan *Job, 1),
store: &dummyStore{},
maxJobErrorCount: 1,
}

WithJobStatusWebhook("http://localhost", time.Minute)(&wp)
Expand Down Expand Up @@ -231,11 +236,12 @@ func TestExecuteSendNotification(t *testing.T) {

ctx, cancel := context.WithCancel(context.Background())
wp := WorkerPool{
context: ctx,
cancelContext: cancel,
executors: make(map[string]ExecutorFunc),
jobChan: make(chan *Job, 1),
store: &dummyStore{},
context: ctx,
cancelContext: cancel,
executors: make(map[string]ExecutorFunc),
jobChan: make(chan *Job, 1),
store: &dummyStore{},
maxJobErrorCount: 1,
}

WithJobStatusWebhook(svr.URL, time.Minute)(&wp)
Expand Down Expand Up @@ -266,3 +272,84 @@ func TestExecuteSendNotification(t *testing.T) {
}
})
}

func TestJobErrorMessages(t *testing.T) {
t.Run("all error messages are stored & published when retries occur", func(t *testing.T) {
retryCount := 3
logger, hook := test.NewNullLogger()
expectedErrorMessages := []string{}

svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var resp JSONResponse
err := json.NewDecoder(r.Body).Decode(&resp)
if err != nil {
panic(err)
}

if !reflect.DeepEqual(resp.Errors, expectedErrorMessages) {
t.Errorf("error messages don't match the expected error messages, expected: %v, got: %v", expectedErrorMessages, resp.Errors)
}
}))
defer svr.Close()

ctx, cancel := context.WithCancel(context.Background())
wp := WorkerPool{
context: ctx,
cancelContext: cancel,
executors: make(map[string]ExecutorFunc),
jobChan: make(chan *Job, 1),
store: &dummyStore{},
maxJobErrorCount: retryCount,
}

WithJobStatusWebhook(svr.URL, time.Minute)(&wp)
WithLogger(logger)(&wp)

wp.RegisterExecutor(SendJobStatusJobType, wp.executeSendJobStatus)

wp.RegisterExecutor("TestJobType", func(ctx context.Context, j *Job) error {
j.ShouldSendNotification = true

// Fail the first n times, n = retryCount
if j.ExecCount <= retryCount {
errorMessage := fmt.Sprintf("error message %d", j.ExecCount)
expectedErrorMessages = append(expectedErrorMessages, errorMessage)
return fmt.Errorf(errorMessage)
}

j.Result = "done"

return nil
})

job, err := wp.CreateJob("TestJobType", "")
if err != nil {
t.Fatal(err)
}

// Explicitly retry to trigger n errors and a final successful execution, n = retryCount
for n := 0; n < retryCount+1; n++ {
wp.process(job)
}

// Send the notification
sendNotificationJob := <-wp.jobChan
wp.process(sendNotificationJob)

// Check log entries
if len(hook.Entries) != retryCount {
t.Errorf("expected there to be %d warning(s), got %d", retryCount, len(hook.Entries))
}

// Final value for "Error" should be blank
if job.Error != "" {
t.Errorf("expected job.Error to be blank, got: %#v", job.Error)
}

// Check stored error messages
errorMessages := []string(job.Errors)
if !reflect.DeepEqual([]string(job.Errors), expectedErrorMessages) {
t.Errorf("error messages don't match the expected error messages, expected: %v, got: %v", expectedErrorMessages, errorMessages)
}
})
}
3 changes: 2 additions & 1 deletion jobs/workerpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,12 +337,13 @@ func (wp *WorkerPool) process(job *Job) {
job.State = Error
}
job.Error = err.Error()
job.Errors = append(job.Errors, err.Error())
entry.
WithFields(log.Fields{"error": err}).
Warn("Job execution resulted with error")
} else {
job.State = Complete
job.Error = ""
job.Error = "" // Clear the error message for the final & successful execution
}

if err := wp.store.UpdateJob(job); err != nil {
Expand Down
53 changes: 53 additions & 0 deletions migrations/internal/m20211221_1/migration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// m20211221_1 handles adding the `Errors` field to Job
package m20211221_1

import (
"time"

"github.com/google/uuid"
"github.com/lib/pq"
"gorm.io/datatypes"
"gorm.io/gorm"
)

const ID = "20211221_1"

// State is a type for Job state.
type State string

// Job database model
type Job struct {
ID uuid.UUID `gorm:"column:id;primary_key;type:uuid;"`
Type string `gorm:"column:type"`
State State `gorm:"column:state;default:INIT"`
Error string `gorm:"column:error"`
Errors pq.StringArray `gorm:"column:errors;type:text[]"`
Result string `gorm:"column:result"`
TransactionID string `gorm:"column:transaction_id"`
ExecCount int `gorm:"column:exec_count;default:0"`
CreatedAt time.Time `gorm:"column:created_at"`
UpdatedAt time.Time `gorm:"column:updated_at"`
DeletedAt gorm.DeletedAt `gorm:"column:deleted_at;index"`
ShouldSendNotification bool `gorm:"-"` // Whether or not to notify admin (via webhook for example)
Attributes datatypes.JSON `gorm:"attributes"`
}

func (Job) TableName() string {
return "jobs"
}

func Migrate(tx *gorm.DB) error {
if err := tx.AutoMigrate(&Job{}); err != nil {
return err
}

return nil
}

func Rollback(tx *gorm.DB) error {
if err := tx.Migrator().DropColumn(&Job{}, "errors"); err != nil {
return err
}

return nil
}
6 changes: 6 additions & 0 deletions migrations/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/flow-hydraulics/flow-wallet-api/migrations/internal/m20211130"
"github.com/flow-hydraulics/flow-wallet-api/migrations/internal/m20211202"
"github.com/flow-hydraulics/flow-wallet-api/migrations/internal/m20211220"
"github.com/flow-hydraulics/flow-wallet-api/migrations/internal/m20211221_1"
"github.com/go-gormigrate/gormigrate/v2"
)

Expand Down Expand Up @@ -48,6 +49,11 @@ func List() []*gormigrate.Migration {
Migrate: m20211220.Migrate,
Rollback: m20211220.Rollback,
},
{
ID: m20211221_1.ID,
Migrate: m20211221_1.Migrate,
Rollback: m20211221_1.Rollback,
},
}
return ms
}
21 changes: 14 additions & 7 deletions openapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -879,12 +879,12 @@ components:
type: string
example: ACCEPTED
enum:
- INIT
- ACCEPTED
- NO_AVAILABLE_WORKERS
- ERROR
- COMPLETE
- FAILED
- INIT
- ACCEPTED
- NO_AVAILABLE_WORKERS
- ERROR
- COMPLETE
- FAILED
debugInfo:
type: string
example: |
Expand Down Expand Up @@ -924,7 +924,7 @@ components:
$ref: '#/components/schemas/key'
type:
type: string
example: 'custodial'
example: custodial
createdAt:
type: string
minLength: 1
Expand Down Expand Up @@ -1063,6 +1063,13 @@ components:
error:
type: string
example: ''
description: 'Final error message, blank if job completed successfully'
errors:
type: array
description: Error messages recorded during all job executions including retries
items:
type: string
example: an error occurred
result:
type: string
example: ''
Expand Down

0 comments on commit 9b20b3c

Please sign in to comment.