@@ -14,9 +14,11 @@ import (
14
14
"github.com/ThreeDotsLabs/watermill/message"
15
15
"github.com/google/uuid"
16
16
"github.com/rs/zerolog"
17
+ "go.opentelemetry.io/otel"
17
18
18
19
"github.com/mindersec/minder/internal/db"
19
20
remindermessages "github.com/mindersec/minder/internal/reminder/messages"
21
+ "github.com/mindersec/minder/internal/reminder/metrics"
20
22
reminderconfig "github.com/mindersec/minder/pkg/config/reminder"
21
23
"github.com/mindersec/minder/pkg/eventer/constants"
22
24
)
@@ -42,14 +44,18 @@ type reminder struct {
42
44
ticker * time.Ticker
43
45
44
46
eventPublisher message.Publisher
47
+
48
+ metrics * metrics.Metrics
49
+ metricsServerDone chan struct {}
45
50
}
46
51
47
52
// NewReminder creates a new reminder instance
48
53
func NewReminder (ctx context.Context , store db.Store , config * reminderconfig.Config ) (Interface , error ) {
49
54
r := & reminder {
50
- store : store ,
51
- cfg : config ,
52
- stop : make (chan struct {}),
55
+ store : store ,
56
+ cfg : config ,
57
+ stop : make (chan struct {}),
58
+ metricsServerDone : make (chan struct {}),
53
59
}
54
60
55
61
// Set to a random UUID to start
@@ -74,21 +80,40 @@ func (r *reminder) Start(ctx context.Context) error {
74
80
return errors .New ("reminder stopped, cannot start again" )
75
81
default :
76
82
}
83
+ // Reminder only stops in case of error or context cancellation
84
+ // An errored out reminder cannot be started again, so it is stopped here
85
+ // This also prevents resource leaks if user doesn't explicitly stop the reminder
86
+ defer r .Stop ()
77
87
78
88
interval := r .cfg .RecurrenceConfig .Interval
79
89
if interval <= 0 {
80
90
return fmt .Errorf ("invalid interval: %s" , r .cfg .RecurrenceConfig .Interval )
81
91
}
82
92
93
+ if r .cfg .MetricsConfig .Enabled {
94
+ if err := r .startMetricServer (ctx ); err != nil {
95
+ logger .Err (err ).Msg ("failed to start metrics server" )
96
+ }
97
+
98
+ var err error
99
+ r .metrics , err = metrics .NewMetrics (otel .Meter ("reminder" ))
100
+ if err != nil {
101
+ return err
102
+ }
103
+ } else {
104
+ close (r .metricsServerDone )
105
+ }
106
+
83
107
r .ticker = time .NewTicker (interval )
84
- defer r .Stop ()
85
108
86
109
for {
87
110
select {
88
111
case <- ctx .Done ():
112
+ <- r .metricsServerDone
89
113
logger .Info ().Msg ("reminder stopped" )
90
114
return nil
91
115
case <- r .stop :
116
+ <- r .metricsServerDone
92
117
logger .Info ().Msg ("reminder stopped" )
93
118
return nil
94
119
case <- r .ticker .C :
@@ -120,13 +145,15 @@ func (r *reminder) Stop() {
120
145
zerolog .Ctx (context .Background ()).Error ().Err (err ).Msg ("error closing event publisher" )
121
146
}
122
147
})
148
+ // Wait for the metrics server to stop
149
+ <- r .metricsServerDone
123
150
}
124
151
125
152
func (r * reminder ) sendReminders (ctx context.Context ) error {
126
153
logger := zerolog .Ctx (ctx )
127
154
128
155
// Fetch a batch of repositories
129
- repos , err := r .getRepositoryBatch (ctx )
156
+ repos , repoToLastUpdated , err := r .getRepositoryBatch (ctx )
130
157
if err != nil {
131
158
return fmt .Errorf ("error fetching repository batch: %w" , err )
132
159
}
@@ -143,6 +170,10 @@ func (r *reminder) sendReminders(ctx context.Context) error {
143
170
return fmt .Errorf ("error creating reminder messages: %w" , err )
144
171
}
145
172
173
+ if r .metrics != nil {
174
+ r .metrics .BatchSize .Record (ctx , int64 (len (repos )))
175
+ }
176
+
146
177
err = r .eventPublisher .Publish (constants .TopicQueueRepoReminder , messages ... )
147
178
if err != nil {
148
179
return fmt .Errorf ("error publishing messages: %w" , err )
@@ -151,13 +182,16 @@ func (r *reminder) sendReminders(ctx context.Context) error {
151
182
repoIds := make ([]uuid.UUID , len (repos ))
152
183
for _ , repo := range repos {
153
184
repoIds = append (repoIds , repo .ID )
154
- }
185
+ if r .metrics != nil {
186
+ sendDelay := time .Since (repoToLastUpdated [repo .ID ]) - r .cfg .RecurrenceConfig .MinElapsed
155
187
156
- // TODO: Collect Metrics
157
- // Potential metrics:
158
- // - Gauge: Number of reminders in the current batch
159
- // - UpDownCounter: Average reminders sent per batch
160
- // - Histogram: reminder_last_sent time distribution
188
+ recorder := r .metrics .SendDelay
189
+ if ! repo .ReminderLastSent .Valid {
190
+ recorder = r .metrics .NewSendDelay
191
+ }
192
+ recorder .Record (ctx , sendDelay .Seconds ())
193
+ }
194
+ }
161
195
162
196
err = r .store .UpdateReminderLastSentForRepositories (ctx , repoIds )
163
197
if err != nil {
@@ -167,7 +201,7 @@ func (r *reminder) sendReminders(ctx context.Context) error {
167
201
return nil
168
202
}
169
203
170
- func (r * reminder ) getRepositoryBatch (ctx context.Context ) ([]db.Repository , error ) {
204
+ func (r * reminder ) getRepositoryBatch (ctx context.Context ) ([]db.Repository , map [uuid. UUID ]time. Time , error ) {
171
205
logger := zerolog .Ctx (ctx )
172
206
173
207
logger .Debug ().Msgf ("fetching repositories after cursor: %s" , r .repositoryCursor )
@@ -176,21 +210,23 @@ func (r *reminder) getRepositoryBatch(ctx context.Context) ([]db.Repository, err
176
210
Limit : int64 (r .cfg .RecurrenceConfig .BatchSize ),
177
211
})
178
212
if err != nil {
179
- return nil , err
213
+ return nil , nil , err
180
214
}
181
215
182
- eligibleRepos , err := r .getEligibleRepositories (ctx , repos )
216
+ eligibleRepos , eligibleReposLastUpdated , err := r .getEligibleRepositories (ctx , repos )
183
217
if err != nil {
184
- return nil , err
218
+ return nil , nil , err
185
219
}
186
220
logger .Debug ().Msgf ("%d/%d repositories are eligible for reminders" , len (eligibleRepos ), len (repos ))
187
221
188
222
r .updateRepositoryCursor (ctx , repos )
189
223
190
- return eligibleRepos , nil
224
+ return eligibleRepos , eligibleReposLastUpdated , nil
191
225
}
192
226
193
- func (r * reminder ) getEligibleRepositories (ctx context.Context , repos []db.Repository ) ([]db.Repository , error ) {
227
+ func (r * reminder ) getEligibleRepositories (ctx context.Context , repos []db.Repository ) (
228
+ []db.Repository , map [uuid.UUID ]time.Time , error ,
229
+ ) {
194
230
eligibleRepos := make ([]db.Repository , 0 , len (repos ))
195
231
196
232
// We have a slice of repositories, but the sqlc-generated code wants a slice of UUIDs,
@@ -202,11 +238,11 @@ func (r *reminder) getEligibleRepositories(ctx context.Context, repos []db.Repos
202
238
}
203
239
oldestRuleEvals , err := r .store .ListOldestRuleEvaluationsByRepositoryId (ctx , repoIds )
204
240
if err != nil {
205
- return nil , err
241
+ return nil , nil , err
206
242
}
207
243
idToLastUpdate := make (map [uuid.UUID ]time.Time , len (oldestRuleEvals ))
208
- for _ , times := range oldestRuleEvals {
209
- idToLastUpdate [times .RepositoryID ] = times .OldestLastUpdated
244
+ for _ , ruleEval := range oldestRuleEvals {
245
+ idToLastUpdate [ruleEval .RepositoryID ] = ruleEval .OldestLastUpdated
210
246
}
211
247
212
248
cutoff := time .Now ().Add (- 1 * r .cfg .RecurrenceConfig .MinElapsed )
@@ -216,7 +252,7 @@ func (r *reminder) getEligibleRepositories(ctx context.Context, repos []db.Repos
216
252
}
217
253
}
218
254
219
- return eligibleRepos , nil
255
+ return eligibleRepos , idToLastUpdate , nil
220
256
}
221
257
222
258
func (r * reminder ) updateRepositoryCursor (ctx context.Context , repos []db.Repository ) {
0 commit comments