Skip to content

Commit

Permalink
Refactor failed visit handling
Browse files Browse the repository at this point in the history
  • Loading branch information
mariuswilms committed Feb 21, 2025
1 parent a836c1c commit 172b1e2
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 50 deletions.
2 changes: 1 addition & 1 deletion api.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"fmt"
"log/slog"
"net/url"
"os"
"slices"

"github.com/google/uuid"
)
Expand Down
109 changes: 60 additions & 49 deletions visitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package main
import (
"context"
"errors"
"fmt"
"log/slog"
"sync"
"sync/atomic"
Expand All @@ -26,6 +27,8 @@ const (
MaxJobRetries = 3
)

// Visitor is the main "work horse" of the crawler. It consumes jobs from the
// work queue and processes them with the help of the Collector.
type Visitor struct {
id int
runs *RunManager
Expand All @@ -35,6 +38,17 @@ type Visitor struct {
logger *slog.Logger
}

// Code is used when handling failed visits.
type Code uint32

const (
CodeUnknown Code = iota
CodeUnhandled
CodePermanent
CodeTemporary
CodeIgnore
)

// Start launches the worker in a new goroutine.
func (w *Visitor) Start(ctx context.Context, wg *sync.WaitGroup) {
go func() {
Expand Down Expand Up @@ -135,79 +149,76 @@ func (w *Visitor) process(ctx context.Context, job *ctrlq.VisitJob) error {
return nil
}

if !w.handleError(jctx, span, job, res, err, p, jlogger) {
span.AddEvent("Visitor: URL visit failed.", t)
jlogger.Debug("Visitor: URL visit failed, handling ...", "error", err)

code, herr := w.failed(jctx, job, res, err)
switch code {
case CodeIgnore:
p.Update(jctx, ProgressStateCancelled)
span.End()
return nil
case CodeTemporary:
// Leave job in "Crawling" state, and process it later.
span.End()
return nil // continue processing next job.
default: // covers CodeUnknown, CodeUnhandled, CodePermanent
jlogger.Error(
"Visitor: Handling the failed visit error'ed.",
"code", code,
"fail", err,
"error", herr,
)
p.Update(jctx, ProgressStateErrored)
span.RecordError(herr)
span.End()
return herr // stop retrying processing this job.
}

p.Update(jctx, ProgressStateErrored)
jlogger.Error("Visitor: Error visiting URL.", "error", err)
span.RecordError(err)

return nil
}

// handleError handles errors that occur during URL visits
func (w *Visitor) handleError(jctx context.Context, span trace.Span, job *ctrlq.VisitJob, res *collector.Response, err error, p *Progress, jlogger *slog.Logger) bool {
t := trace.WithAttributes(attribute.String("Url", job.URL))
// failed handles failed URL visits. This handler has two return values.
// One returns the status code indicating *how* and if the fail was handled. The other return value
// is an error, if one occurred during handling. An error means the fail was not handled
// properly and the handling failed.
func (w *Visitor) failed(ctx context.Context, job *ctrlq.VisitJob, res *collector.Response, err error) (Code, error) {
attemptRepublish := func() (Code, error) {
if job.Retries >= MaxJobRetries {
return CodeUnhandled, fmt.Errorf("maximum retries reached")
}
if err := w.queue.Republish(ctx, job); err != nil {
return CodeUnhandled, fmt.Errorf("failed to republish job: %w", err)
}
return CodeTemporary, nil
}

if res != nil {
// We have response information, use it to determine the correct error handling in detail.

switch res.StatusCode {
case 302: // Redirect
// When a redirect is encountered, the visit errors out. This is in fact
// no an actual error, but just a skip.
p.Update(jctx, ProgressStateCancelled)

jlogger.Info("Visitor: Skipped URL, got redirected.")
span.AddEvent("Cancelled visiting URL", t)

span.End()
return true
return CodeIgnore, nil
case 404:
// FIXME: Probably want to lower the log level to Info for 404s here.
return CodeIgnore, nil
case 429: // Too Many Requests
fallthrough
return attemptRepublish()
case 503: // Service Unavailable
// Additionally want to retrieve the Retry-After header here and wait for that amount of time, if
// we just have to wait than don't error out but reschedule the job and wait. In order to not do
// that infinitely, we should have a maximum number of retries.

// Handling of Retry-After header is optional, so errors here
// that infinitely, we should have a maximum number of retries. Handling of Retry-After header is optional, so errors here
// are not critical.
if v := res.Headers.Get("Retry-After"); v != "" {
d, _ := time.ParseDuration(v + "s")
w.queue.Pause(jctx, res.Request.URL.String(), d)
}

if job.Retries < MaxJobRetries {
if err := w.queue.Republish(jctx, job); err != nil {
jlogger.Warn("Visitor: Republish failed, stopping retrying.", "error", err)
} else {
// Leave job in "Crawling" state.
span.End()
return true
}
} else {
jlogger.Warn("Visitor: Maximum number of retries reached.")
w.queue.Pause(ctx, res.Request.URL.String(), d)
}
return attemptRepublish()
default:
// Noop, fallthrough to generic error handling.
return CodeUnhandled, fmt.Errorf("unhandled failed visit, unknown status code: %d", res.StatusCode)
}
} else if errors.Is(err, context.DeadlineExceeded) {
// We react to timeouts as a temporary issue and retry the job, similary
// to 429 and 503 errors.
if job.Retries < MaxJobRetries {
if err := w.queue.Republish(jctx, job); err != nil {
jlogger.Warn("Visitor: Republish failed, stopping retrying.", "error", err)
} else {
// Leave job in "Crawling" state.
span.End()
return true
}
} else {
jlogger.Warn("Visitor: Maximum number of retries reached.")
}
return attemptRepublish()
}
return true
return CodeUnhandled, fmt.Errorf("unhandled failed visit")
}

0 comments on commit 172b1e2

Please sign in to comment.