Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ jobs:
name: Go lint
runs-on: ubuntu-latest
env:
GOLANGCI_LINT_VERSION: v2.4.0
GOLANGCI_LINT_VERSION: v2.5.0
GOPROXY: https://proxy.golang.org,https://u:${{ secrets.RIVERPRO_GO_MOD_CREDENTIAL }}@riverqueue.com/goproxy,direct
permissions:
contents: read
Expand Down
22 changes: 21 additions & 1 deletion docs/health_checks.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,24 @@ When setting this command in ECS tasks for healtechecks it would something like
}
]
}
```
```

### Silencing request logs for health checks

If you run the bundled `riverui` server and want to reduce log noise from frequent health probes, use the `-silent-healthchecks` flag. This will configure the HTTP logging middleware to skip logs for health endpoints under the configured prefix.

```text
/bin/riverui -prefix=/my-prefix -silent-healthchecks
```

If you embed the UI in your own server, you can apply a similar filter to your logging middleware. For example with `slog-http`:

```go
// assuming prefix has been normalized (e.g., "/my-prefix")
apiHealthPrefix := strings.TrimSuffix(prefix, "/") + "/api/health-checks"
logHandler := sloghttp.NewWithConfig(logger, sloghttp.Config{
Filters: []sloghttp.Filter{sloghttp.IgnorePathPrefix(apiHealthPrefix)},
WithSpanID: otelEnabled,
WithTraceID: otelEnabled,
})
```
6 changes: 5 additions & 1 deletion internal/riveruicmd/auth_middleware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@ func TestAuthMiddleware(t *testing.T) { //nolint:tparallel

setup := func(t *testing.T, prefix string) http.Handler {
t.Helper()
initRes, err := initServer(ctx, riversharedtest.Logger(t), prefix,
initRes, err := initServer(ctx,
&initServerOpts{
logger: riversharedtest.Logger(t),
pathPrefix: prefix,
},
func(dbPool *pgxpool.Pool) (*river.Client[pgx.Tx], error) {
return river.NewClient(riverpgxv5.New(dbPool), &river.Config{})
},
Expand Down
40 changes: 31 additions & 9 deletions internal/riveruicmd/riveruicmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ func Run[TClient any](createClient func(*pgxpool.Pool) (TClient, error), createB
var healthCheckName string
flag.StringVar(&healthCheckName, "healthcheck", "", "the name of the health checks: minimal or complete")

var silentHealthChecks bool
flag.BoolVar(&silentHealthChecks, "silent-healthchecks", false, "silence request logs for health check routes")

flag.Parse()

if healthCheckName != "" {
Expand All @@ -51,7 +54,11 @@ func Run[TClient any](createClient func(*pgxpool.Pool) (TClient, error), createB
os.Exit(0)
}

initRes, err := initServer(ctx, logger, pathPrefix, createClient, createBundle)
initRes, err := initServer(ctx, &initServerOpts{
logger: logger,
pathPrefix: pathPrefix,
silentHealthChecks: silentHealthChecks,
}, createClient, createBundle)
if err != nil {
logger.ErrorContext(ctx, "Error initializing server", slog.String("error", err.Error()))
os.Exit(1)
Expand Down Expand Up @@ -129,12 +136,21 @@ type initServerResult struct {
uiHandler *riverui.Handler // River UI handler
}

func initServer[TClient any](ctx context.Context, logger *slog.Logger, pathPrefix string, createClient func(*pgxpool.Pool) (TClient, error), createBundle func(TClient) uiendpoints.Bundle) (*initServerResult, error) {
if !strings.HasPrefix(pathPrefix, "/") || pathPrefix == "" {
return nil, fmt.Errorf("invalid path prefix: %s", pathPrefix)
type initServerOpts struct {
logger *slog.Logger
pathPrefix string
silentHealthChecks bool
}

func initServer[TClient any](ctx context.Context, opts *initServerOpts, createClient func(*pgxpool.Pool) (TClient, error), createBundle func(TClient) uiendpoints.Bundle) (*initServerResult, error) {
if opts == nil {
return nil, errors.New("opts is required")
}
if !strings.HasPrefix(opts.pathPrefix, "/") || opts.pathPrefix == "" {
return nil, fmt.Errorf("invalid path prefix: %s", opts.pathPrefix)
}

pathPrefix = riverui.NormalizePathPrefix(pathPrefix)
opts.pathPrefix = riverui.NormalizePathPrefix(opts.pathPrefix)

var (
basicAuthUsername = os.Getenv("RIVER_BASIC_AUTH_USER")
Expand Down Expand Up @@ -173,8 +189,8 @@ func initServer[TClient any](ctx context.Context, logger *slog.Logger, pathPrefi
Endpoints: createBundle(client),
JobListHideArgsByDefault: jobListHideArgsByDefault,
LiveFS: liveFS,
Logger: logger,
Prefix: pathPrefix,
Logger: opts.logger,
Prefix: opts.pathPrefix,
})
if err != nil {
return nil, err
Expand All @@ -184,7 +200,13 @@ func initServer[TClient any](ctx context.Context, logger *slog.Logger, pathPrefi
AllowedMethods: []string{"GET", "HEAD", "POST", "PUT"},
AllowedOrigins: corsOrigins,
})
logHandler := sloghttp.NewWithConfig(logger, sloghttp.Config{
filters := []sloghttp.Filter{}
if opts.silentHealthChecks {
apiHealthPrefix := strings.TrimSuffix(opts.pathPrefix, "/") + "/api/health-checks"
filters = append(filters, sloghttp.IgnorePathPrefix(apiHealthPrefix))
}
logHandler := sloghttp.NewWithConfig(opts.logger, sloghttp.Config{
Filters: filters,
WithSpanID: otelEnabled,
WithTraceID: otelEnabled,
})
Expand All @@ -205,7 +227,7 @@ func initServer[TClient any](ctx context.Context, logger *slog.Logger, pathPrefi
Handler: middlewareStack.Mount(uiHandler),
ReadHeaderTimeout: 5 * time.Second,
},
logger: logger,
logger: opts.logger,
uiHandler: uiHandler,
}, nil
}
Expand Down
100 changes: 96 additions & 4 deletions internal/riveruicmd/riveruicmd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"cmp"
"context"
"encoding/json"
"log/slog"
"net/http"
"net/http/httptest"
"net/url"
Expand Down Expand Up @@ -37,7 +38,10 @@ func TestInitServer(t *testing.T) { //nolint:tparallel
setup := func(t *testing.T) (*initServerResult, *testBundle) {
t.Helper()

initRes, err := initServer(ctx, riversharedtest.Logger(t), "/",
initRes, err := initServer(ctx, &initServerOpts{
logger: riversharedtest.Logger(t),
pathPrefix: "/",
},
func(dbPool *pgxpool.Pool) (*river.Client[pgx.Tx], error) {
return river.NewClient(riverpgxv5.New(dbPool), &river.Config{})
},
Expand All @@ -59,7 +63,7 @@ func TestInitServer(t *testing.T) { //nolint:tparallel
require.NoError(t, err)
})

t.Run("WithPGEnvVars", func(t *testing.T) { //nolint:paralleltest
t.Run("WithPGEnvVars", func(t *testing.T) {
// Cannot be parallelized because of Setenv calls.
t.Setenv("DATABASE_URL", "")

Expand Down Expand Up @@ -98,7 +102,7 @@ func TestInitServer(t *testing.T) { //nolint:tparallel
require.False(t, resp.JobListHideArgsByDefault)
})

t.Run("SetToTrueWithTrue", func(t *testing.T) { //nolint:paralleltest
t.Run("SetToTrueWithTrue", func(t *testing.T) {
// Cannot be parallelized because of Setenv calls.
t.Setenv("RIVER_JOB_LIST_HIDE_ARGS_BY_DEFAULT", "true")
initRes, _ := setup(t)
Expand All @@ -115,7 +119,7 @@ func TestInitServer(t *testing.T) { //nolint:tparallel
require.True(t, resp.JobListHideArgsByDefault)
})

t.Run("SetToTrueWith1", func(t *testing.T) { //nolint:paralleltest
t.Run("SetToTrueWith1", func(t *testing.T) {
// Cannot be parallelized because of Setenv calls.
t.Setenv("RIVER_JOB_LIST_HIDE_ARGS_BY_DEFAULT", "1")
initRes, _ := setup(t)
Expand All @@ -133,3 +137,91 @@ func TestInitServer(t *testing.T) { //nolint:tparallel
})
})
}

// inMemoryHandler is a simple slog.Handler that records all emitted records.
type inMemoryHandler struct {
records []slog.Record
}

func (h *inMemoryHandler) Enabled(context.Context, slog.Level) bool { return true }

func (h *inMemoryHandler) Handle(_ context.Context, r slog.Record) error {
// clone record to avoid later mutation issues
cloned := slog.Record{}
cloned.Level = r.Level
cloned.Time = r.Time
cloned.Message = r.Message
r.Attrs(func(a slog.Attr) bool {
cloned.AddAttrs(a)
return true
})
h.records = append(h.records, cloned)
return nil
}

func (h *inMemoryHandler) WithAttrs(attrs []slog.Attr) slog.Handler { return h }
func (h *inMemoryHandler) WithGroup(name string) slog.Handler { return h }

func TestSilentHealthchecks_SuppressesLogs(t *testing.T) {
// Cannot be parallelized because of Setenv calls.
var (
ctx = context.Background()
databaseURL = cmp.Or(os.Getenv("TEST_DATABASE_URL"), "postgres://localhost/river_test")
)

t.Setenv("DEV", "true")
t.Setenv("DATABASE_URL", databaseURL)

memoryHandler := &inMemoryHandler{}
logger := slog.New(memoryHandler)

makeServer := func(t *testing.T, prefix string, silent bool) *initServerResult {
t.Helper()
initRes, err := initServer(ctx, &initServerOpts{
logger: logger,
pathPrefix: prefix,
silentHealthChecks: silent,
},
func(dbPool *pgxpool.Pool) (*river.Client[pgx.Tx], error) {
return river.NewClient(riverpgxv5.New(dbPool), &river.Config{})
},
func(client *river.Client[pgx.Tx]) uiendpoints.Bundle {
return riverui.NewEndpoints(client, nil)
},
)
require.NoError(t, err)
t.Cleanup(initRes.dbPool.Close)
return initRes
}

// silent=true should suppress health logs but not others
initRes := makeServer(t, "/", true)

recorder := httptest.NewRecorder()
initRes.httpServer.Handler.ServeHTTP(recorder, httptest.NewRequest(http.MethodGet, "/api/health-checks/minimal", nil))
require.Equal(t, http.StatusOK, recorder.Code)
require.Empty(t, memoryHandler.records)

recorder = httptest.NewRecorder()
initRes.httpServer.Handler.ServeHTTP(recorder, httptest.NewRequest(http.MethodGet, "/api/features", nil))
require.Equal(t, http.StatusOK, recorder.Code)
require.NotEmpty(t, memoryHandler.records)

// reset and test with non-root prefix
memoryHandler.records = nil
initRes = makeServer(t, "/pfx", true)

recorder = httptest.NewRecorder()
initRes.httpServer.Handler.ServeHTTP(recorder, httptest.NewRequest(http.MethodGet, "/pfx/api/health-checks/minimal", nil))
require.Equal(t, http.StatusOK, recorder.Code)
require.Empty(t, memoryHandler.records)

// now silent=false should log health
memoryHandler.records = nil
initRes = makeServer(t, "/", false)

recorder = httptest.NewRecorder()
initRes.httpServer.Handler.ServeHTTP(recorder, httptest.NewRequest(http.MethodGet, "/api/health-checks/minimal", nil))
require.Equal(t, http.StatusOK, recorder.Code)
require.NotEmpty(t, memoryHandler.records)
}
2 changes: 1 addition & 1 deletion internal/uicommontest/uicommontest.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func MustMarshalJSON(t *testing.T, v any) []byte {
return data
}

// Requires that err is an equivalent API error to expectedErr.
// RequireAPIError requires that err is an equivalent API error to expectedErr.
//
// TError is a pointer to an API error type like *apierror.NotFound.
func RequireAPIError[TError error](t *testing.T, expectedErr TError, err error) {
Expand Down
Loading