Skip to content

Commit af432f2

Browse files
committed
Add hook HookQueueStateCount + read middleware as hooks and vice versa
Here, add a new hook called `HookQueueStateCount` which gets invoked to produce job queue count statistics. We do this by adding a new maintenance service which like other maintenance services, runs only on the leader, so we only have one client performing counts at any given time. Furthermore, in order to not introduce a potential operational problem without opt-in from River users, the counts only run if a `HookQueueStateCount` hook/middleware has been added to the client. The reason we do all this to to implement a feature requested by one of users: for `otelriver` in contrib to be able to emit queue count metrics, which seems like a pretty reasonable ask for the package to be able to do, and something that every River user would likely want access to in their ops charts. A slight oddity, but which I think is _probably_ okay, is that the new hook ideally stays a hook, but the existing `otelriver` middleware is a middleware. It'd be nice not to have to put `otelriver.Middleware` into both a client's `Hooks` and `Middleware` configuration, so we modify client to allow for hooks that middleware and middleware which are hooks. This lets `otelriver.Middleware` continue doing what it was already doing, but also to start producing new counts as a hook.
1 parent 0143b85 commit af432f2

File tree

17 files changed

+556
-251
lines changed

17 files changed

+556
-251
lines changed

client.go

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -657,6 +657,7 @@ type clientTestSignals struct {
657657
periodicJobEnqueuer *maintenance.PeriodicJobEnqueuerTestSignals
658658
queueCleaner *maintenance.QueueCleanerTestSignals
659659
queueMaintainerLeader *maintenance.QueueMaintainerLeaderTestSignals
660+
queueStateCounter *maintenance.QueueStateCounterTestSignals
660661
reindexer *maintenance.ReindexerTestSignals
661662
}
662663

@@ -679,6 +680,9 @@ func (ts *clientTestSignals) Init(tb testutil.TestingTB) {
679680
if ts.queueMaintainerLeader != nil {
680681
ts.queueMaintainerLeader.Init(tb)
681682
}
683+
if ts.queueStateCounter != nil {
684+
ts.queueStateCounter.Init(tb)
685+
}
682686
if ts.reindexer != nil {
683687
ts.reindexer.Init(tb)
684688
}
@@ -759,7 +763,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
759763
config: config,
760764
driver: driver,
761765
hookLookupByJob: hooklookup.NewJobHookLookup(),
762-
hookLookupGlobal: hooklookup.NewHookLookup(config.Hooks),
766+
hookLookupGlobal: nil, // initialized below after cross-referencing with middleware
763767
producersByQueueName: make(map[string]*producer),
764768
testSignals: clientTestSignals{},
765769
workCancel: func(cause error) {}, // replaced on start, but here in case StopAndCancel is called before start up
@@ -807,6 +811,26 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
807811
}
808812
}
809813

814+
// Cross-reference hooks and middleware: any middleware that also
815+
// implements Hook is added to hooks, and any hook that also implements
816+
// Middleware is added to middleware. It may have been better to have
817+
// Client only take one combined set, but we only realized that we may
818+
// need a combined hook/middleware implementation after that ship sailed.
819+
hooks := config.Hooks
820+
821+
for _, mw := range middleware {
822+
if hook, ok := mw.(rivertype.Hook); ok {
823+
hooks = append(hooks, hook)
824+
}
825+
}
826+
827+
for _, hook := range config.Hooks {
828+
if mw, ok := hook.(rivertype.Middleware); ok {
829+
middleware = append(middleware, mw)
830+
}
831+
}
832+
833+
client.hookLookupGlobal = hooklookup.NewHookLookup(hooks)
810834
client.middlewareLookupGlobal = middlewarelookup.NewMiddlewareLookup(middleware)
811835
}
812836

@@ -961,6 +985,16 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
961985
client.testSignals.queueCleaner = &queueCleaner.TestSignals
962986
}
963987

988+
{
989+
queueStateCounter := maintenance.NewQueueStateCounter(archetype, &maintenance.QueueStateCounterConfig{
990+
HookLookupGlobal: client.hookLookupGlobal,
991+
QueueNames: maputil.Keys(config.Queues),
992+
Schema: config.Schema,
993+
}, driver.GetExecutor())
994+
maintenanceServices = append(maintenanceServices, queueStateCounter)
995+
client.testSignals.queueStateCounter = &queueStateCounter.TestSignals
996+
}
997+
964998
{
965999
var scheduleFunc func(time.Time) time.Time
9661000
if config.ReindexerSchedule != nil {

client_test.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/tidwall/sjson"
2525

2626
"github.com/riverqueue/river/internal/dbunique"
27+
"github.com/riverqueue/river/internal/hooklookup"
2728
"github.com/riverqueue/river/internal/jobexecutor"
2829
"github.com/riverqueue/river/internal/maintenance"
2930
"github.com/riverqueue/river/internal/middlewarelookup"
@@ -7626,6 +7627,24 @@ func Test_NewClient_Validations(t *testing.T) {
76267627
require.Len(t, client.middlewareLookupGlobal.ByMiddlewareKind(middlewarelookup.MiddlewareKindWorker), 1)
76277628
},
76287629
},
7630+
{
7631+
name: "Middleware implementing Hook is available in hook lookup",
7632+
configFunc: func(config *Config) {
7633+
config.Middleware = []rivertype.Middleware{&middlewareWithHook{}}
7634+
},
7635+
validateResult: func(t *testing.T, client *Client[pgx.Tx]) { //nolint:thelper
7636+
require.Len(t, client.hookLookupGlobal.ByHookKind(hooklookup.HookKindWorkBegin), 1)
7637+
},
7638+
},
7639+
{
7640+
name: "Hook implementing Middleware is available in middleware lookup",
7641+
configFunc: func(config *Config) {
7642+
config.Hooks = []rivertype.Hook{&hookWithMiddleware{}}
7643+
},
7644+
validateResult: func(t *testing.T, client *Client[pgx.Tx]) { //nolint:thelper
7645+
require.Len(t, client.middlewareLookupGlobal.ByMiddlewareKind(middlewarelookup.MiddlewareKindWorker), 1)
7646+
},
7647+
},
76297648
{
76307649
name: "Middleware not allowed with JobInsertMiddleware",
76317650
configFunc: func(config *Config) {
@@ -8625,3 +8644,31 @@ func (f JobArgsWithHooksFunc) Hooks() []rivertype.Hook {
86258644
func (JobArgsWithHooksFunc) MarshalJSON() ([]byte, error) { return []byte("{}"), nil }
86268645

86278646
func (JobArgsWithHooksFunc) UnmarshalJSON([]byte) error { return nil }
8647+
8648+
// middlewareWithHook is a middleware that also implements HookWorkBegin,
8649+
// used to test cross-pollination from middleware to hooks.
8650+
type middlewareWithHook struct {
8651+
MiddlewareDefaults
8652+
}
8653+
8654+
func (m *middlewareWithHook) Work(ctx context.Context, job *rivertype.JobRow, doInner func(ctx context.Context) error) error {
8655+
return doInner(ctx)
8656+
}
8657+
8658+
func (m *middlewareWithHook) IsHook() bool { return true }
8659+
8660+
func (m *middlewareWithHook) WorkBegin(ctx context.Context, job *rivertype.JobRow) error {
8661+
return nil
8662+
}
8663+
8664+
// hookWithMiddleware is a hook that also implements WorkerMiddleware,
8665+
// used to test cross-pollination from hooks to middleware.
8666+
type hookWithMiddleware struct {
8667+
HookDefaults
8668+
}
8669+
8670+
func (h *hookWithMiddleware) IsMiddleware() bool { return true }
8671+
8672+
func (h *hookWithMiddleware) Work(ctx context.Context, job *rivertype.JobRow, doInner func(ctx context.Context) error) error {
8673+
return doInner(ctx)
8674+
}

hook_defaults_funcs.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,16 @@ func (f HookPeriodicJobsStartFunc) Start(ctx context.Context, params *rivertype.
3333
return f(ctx, params)
3434
}
3535

36+
// HookQueueStateCountFunc is a convenience helper for implementing
37+
// rivertype.HookQueueStateCount using a simple function instead of a struct.
38+
type HookQueueStateCountFunc func(ctx context.Context, params *rivertype.HookQueueStateCountParams)
39+
40+
func (f HookQueueStateCountFunc) IsHook() bool { return true }
41+
42+
func (f HookQueueStateCountFunc) QueueStateCount(ctx context.Context, params *rivertype.HookQueueStateCountParams) {
43+
f(ctx, params)
44+
}
45+
3646
// HookWorkBeginFunc is a convenience helper for implementing
3747
// rivertype.HookWorkBegin using a simple function instead of a struct.
3848
type HookWorkBeginFunc func(ctx context.Context, job *rivertype.JobRow) error

internal/hooklookup/hook_lookup.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ type HookKind string
1515
const (
1616
HookKindInsertBegin HookKind = "insert_begin"
1717
HookKindPeriodicJobsStart HookKind = "periodic_job_start"
18+
HookKindQueueStateCount HookKind = "queue_state_count"
1819
HookKindWorkBegin HookKind = "work_begin"
1920
HookKindWorkEnd HookKind = "work_end"
2021
)
@@ -90,6 +91,12 @@ func (c *hookLookup) ByHookKind(kind HookKind) []rivertype.Hook {
9091
c.hooksByKind[kind] = append(c.hooksByKind[kind], typedHook)
9192
}
9293
}
94+
case HookKindQueueStateCount:
95+
for _, hook := range c.hooks {
96+
if typedHook, ok := hook.(rivertype.HookQueueStateCount); ok {
97+
c.hooksByKind[kind] = append(c.hooksByKind[kind], typedHook)
98+
}
99+
}
93100
case HookKindWorkBegin:
94101
for _, hook := range c.hooks {
95102
if typedHook, ok := hook.(rivertype.HookWorkBegin); ok {
Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
package maintenance
2+
3+
import (
4+
"cmp"
5+
"context"
6+
"errors"
7+
"log/slog"
8+
"time"
9+
10+
"github.com/riverqueue/river/internal/hooklookup"
11+
"github.com/riverqueue/river/riverdriver"
12+
"github.com/riverqueue/river/rivershared/baseservice"
13+
"github.com/riverqueue/river/rivershared/riversharedmaintenance"
14+
"github.com/riverqueue/river/rivershared/startstop"
15+
"github.com/riverqueue/river/rivershared/testsignal"
16+
"github.com/riverqueue/river/rivershared/util/testutil"
17+
"github.com/riverqueue/river/rivershared/util/timeutil"
18+
"github.com/riverqueue/river/rivertype"
19+
)
20+
21+
const QueueStateCounterIntervalDefault = 10 * time.Second
22+
23+
var jobStateAll = rivertype.JobStates() //nolint:gochecknoglobals
24+
25+
// QueueStateCounterTestSignals are internal signals used exclusively in tests.
26+
type QueueStateCounterTestSignals struct {
27+
CountedOnce testsignal.TestSignal[struct{}] // notifies when a count pass finishes
28+
}
29+
30+
func (ts *QueueStateCounterTestSignals) Init(tb testutil.TestingTB) {
31+
ts.CountedOnce.Init(tb)
32+
}
33+
34+
type QueueStateCounterConfig struct {
35+
// HookLookupGlobal provides access to globally registered hooks.
36+
HookLookupGlobal hooklookup.HookLookupInterface
37+
38+
// Interval is the amount of time between count runs.
39+
Interval time.Duration
40+
41+
// QueueNames is the list of configured queue names. Counts are emitted for
42+
// all of these queues even if they have no jobs, with zero counts for all
43+
// states.
44+
QueueNames []string
45+
46+
// Schema where River tables are located. Empty string omits schema, causing
47+
// Postgres to default to `search_path`.
48+
Schema string
49+
}
50+
51+
func (c *QueueStateCounterConfig) mustValidate() *QueueStateCounterConfig {
52+
if c.Interval <= 0 {
53+
panic("QueueStateCounterConfig.Interval must be above zero")
54+
}
55+
56+
return c
57+
}
58+
59+
// QueueStateCounter periodically counts jobs by queue and state, logging the
60+
// results. This provides visibility into queue health without requiring
61+
// external monitoring queries. The maintenance service only runs if there is a
62+
// HookQueueStateCount hook registered that consumes the counts.
63+
type QueueStateCounter struct {
64+
riversharedmaintenance.QueueMaintainerServiceBase
65+
startstop.BaseStartStop
66+
67+
// exported for test purposes
68+
Config *QueueStateCounterConfig
69+
TestSignals QueueStateCounterTestSignals
70+
71+
exec riverdriver.Executor
72+
}
73+
74+
func NewQueueStateCounter(archetype *baseservice.Archetype, config *QueueStateCounterConfig, exec riverdriver.Executor) *QueueStateCounter {
75+
return baseservice.Init(archetype, &QueueStateCounter{
76+
Config: (&QueueStateCounterConfig{
77+
HookLookupGlobal: config.HookLookupGlobal,
78+
Interval: cmp.Or(config.Interval, QueueStateCounterIntervalDefault),
79+
QueueNames: config.QueueNames,
80+
Schema: config.Schema,
81+
}).mustValidate(),
82+
exec: exec,
83+
})
84+
}
85+
86+
func (s *QueueStateCounter) Start(ctx context.Context) error {
87+
ctx, shouldStart, started, stopped := s.StartInit(ctx)
88+
if !shouldStart {
89+
return nil
90+
}
91+
92+
s.StaggerStart(ctx)
93+
94+
go func() {
95+
started()
96+
defer stopped() // this defer should come first so it's last out
97+
98+
s.Logger.DebugContext(ctx, s.Name+riversharedmaintenance.LogPrefixRunLoopStarted)
99+
defer s.Logger.DebugContext(ctx, s.Name+riversharedmaintenance.LogPrefixRunLoopStopped)
100+
101+
// If no hooks are registered, there's no one to send counts to, so
102+
// start, but don't do anything.
103+
if len(s.Config.HookLookupGlobal.ByHookKind(hooklookup.HookKindQueueStateCount)) < 1 {
104+
<-ctx.Done()
105+
return
106+
}
107+
108+
ticker := timeutil.NewTickerWithInitialTick(ctx, s.Config.Interval)
109+
for {
110+
select {
111+
case <-ctx.Done():
112+
return
113+
case <-ticker.C:
114+
}
115+
116+
if err := s.runOnce(ctx); err != nil {
117+
if !errors.Is(err, context.Canceled) {
118+
s.Logger.ErrorContext(ctx, s.Name+": Error counting queue states", slog.String("error", err.Error()))
119+
}
120+
}
121+
}
122+
}()
123+
124+
return nil
125+
}
126+
127+
func (s *QueueStateCounter) runOnce(ctx context.Context) error {
128+
ctx, cancelFunc := context.WithTimeout(ctx, riversharedmaintenance.TimeoutDefault)
129+
defer cancelFunc()
130+
131+
rawCounts, err := s.exec.JobCountByQueueAndState(ctx, &riverdriver.JobCountByQueueAndStateParams{
132+
QueueNames: s.Config.QueueNames,
133+
Schema: s.Config.Schema,
134+
})
135+
if err != nil {
136+
return err
137+
}
138+
139+
byQueue := s.buildResults(ctx, rawCounts)
140+
141+
for _, hook := range s.Config.HookLookupGlobal.ByHookKind(hooklookup.HookKindQueueStateCount) {
142+
hook.(rivertype.HookQueueStateCount).QueueStateCount(ctx, &rivertype.HookQueueStateCountParams{ //nolint:forcetypeassert
143+
ByQueue: byQueue,
144+
})
145+
}
146+
147+
s.TestSignals.CountedOnce.Signal(struct{}{})
148+
149+
return nil
150+
}
151+
152+
// buildResults converts raw driver counts into results with all configured
153+
// queues and all job states filled in (zeroed where needed), logging one line
154+
// per queue.
155+
func (s *QueueStateCounter) buildResults(ctx context.Context, rawCounts map[string]map[rivertype.JobState]int) map[string]rivertype.HookQueueStateCountResult {
156+
// Ensure all configured queues are present, even if they have no jobs.
157+
for _, queue := range s.Config.QueueNames {
158+
if rawCounts[queue] == nil {
159+
rawCounts[queue] = make(map[rivertype.JobState]int)
160+
}
161+
}
162+
163+
countsByQueue := make(map[string]rivertype.HookQueueStateCountResult, len(rawCounts))
164+
165+
for queue, stateCounts := range rawCounts {
166+
attrs := make([]slog.Attr, 0, 2+len(jobStateAll))
167+
attrs = append(attrs, slog.String("queue", queue))
168+
total := 0
169+
170+
for _, state := range jobStateAll {
171+
if _, ok := stateCounts[state]; !ok {
172+
stateCounts[state] = 0
173+
}
174+
total += stateCounts[state]
175+
attrs = append(attrs, slog.Int(string(state), stateCounts[state]))
176+
}
177+
178+
attrs = append(attrs, slog.Int("total", total))
179+
s.Logger.LogAttrs(ctx, slog.LevelInfo, s.Name+": Queue state counts", attrs...)
180+
181+
countsByQueue[queue] = rivertype.HookQueueStateCountResult{
182+
ByState: stateCounts,
183+
Total: total,
184+
}
185+
}
186+
187+
return countsByQueue
188+
}

0 commit comments

Comments
 (0)