@@ -14,9 +14,11 @@ import (
14
14
"github.com/ThreeDotsLabs/watermill/message"
15
15
"github.com/google/uuid"
16
16
"github.com/rs/zerolog"
17
+ "go.opentelemetry.io/otel"
17
18
18
19
"github.com/mindersec/minder/internal/db"
19
20
remindermessages "github.com/mindersec/minder/internal/reminder/messages"
21
+ "github.com/mindersec/minder/internal/reminder/metrics"
20
22
reminderconfig "github.com/mindersec/minder/pkg/config/reminder"
21
23
"github.com/mindersec/minder/pkg/eventer/constants"
22
24
)
@@ -42,6 +44,8 @@ type reminder struct {
42
44
ticker * time.Ticker
43
45
44
46
eventPublisher message.Publisher
47
+
48
+ metrics * metrics.Metrics
45
49
}
46
50
47
51
// NewReminder creates a new reminder instance
@@ -74,21 +78,42 @@ func (r *reminder) Start(ctx context.Context) error {
74
78
return errors .New ("reminder stopped, cannot start again" )
75
79
default :
76
80
}
81
+ // Reminder only stops in case of error or context cancellation
82
+ // An errored out reminder cannot be started again, so it is stopped here
83
+ // This also prevents resource leaks if user doesn't explicitly stop the reminder
84
+ defer r .Stop ()
77
85
78
86
interval := r .cfg .RecurrenceConfig .Interval
79
87
if interval <= 0 {
80
88
return fmt .Errorf ("invalid interval: %s" , r .cfg .RecurrenceConfig .Interval )
81
89
}
82
90
91
+ metricsServerDone := make (chan struct {})
92
+
93
+ if r .cfg .MetricsConfig .Enabled {
94
+ if err := r .startMetricServer (ctx , metricsServerDone ); err != nil {
95
+ logger .Err (err ).Msg ("failed to start metrics server" )
96
+ }
97
+
98
+ var err error
99
+ r .metrics , err = metrics .NewMetrics (otel .Meter ("reminder" ))
100
+ if err != nil {
101
+ return err
102
+ }
103
+ } else {
104
+ close (metricsServerDone )
105
+ }
106
+
83
107
r .ticker = time .NewTicker (interval )
84
- defer r .Stop ()
85
108
86
109
for {
87
110
select {
88
111
case <- ctx .Done ():
112
+ <- metricsServerDone
89
113
logger .Info ().Msg ("reminder stopped" )
90
114
return nil
91
115
case <- r .stop :
116
+ <- metricsServerDone
92
117
logger .Info ().Msg ("reminder stopped" )
93
118
return nil
94
119
case <- r .ticker .C :
@@ -126,7 +151,7 @@ func (r *reminder) sendReminders(ctx context.Context) error {
126
151
logger := zerolog .Ctx (ctx )
127
152
128
153
// Fetch a batch of repositories
129
- repos , err := r .getRepositoryBatch (ctx )
154
+ repos , repoToLastUpdated , err := r .getRepositoryBatch (ctx )
130
155
if err != nil {
131
156
return fmt .Errorf ("error fetching repository batch: %w" , err )
132
157
}
@@ -143,6 +168,10 @@ func (r *reminder) sendReminders(ctx context.Context) error {
143
168
return fmt .Errorf ("error creating reminder messages: %w" , err )
144
169
}
145
170
171
+ if r .metrics != nil {
172
+ r .metrics .BatchSize .Record (ctx , int64 (len (repos )))
173
+ }
174
+
146
175
err = r .eventPublisher .Publish (constants .TopicQueueRepoReminder , messages ... )
147
176
if err != nil {
148
177
return fmt .Errorf ("error publishing messages: %w" , err )
@@ -151,13 +180,16 @@ func (r *reminder) sendReminders(ctx context.Context) error {
151
180
repoIds := make ([]uuid.UUID , len (repos ))
152
181
for _ , repo := range repos {
153
182
repoIds = append (repoIds , repo .ID )
154
- }
183
+ if r .metrics != nil {
184
+ sendDelay := time .Since (repoToLastUpdated [repo .ID ]) - r .cfg .RecurrenceConfig .MinElapsed
155
185
156
- // TODO: Collect Metrics
157
- // Potential metrics:
158
- // - Gauge: Number of reminders in the current batch
159
- // - UpDownCounter: Average reminders sent per batch
160
- // - Histogram: reminder_last_sent time distribution
186
+ recorder := r .metrics .SendDelay
187
+ if ! repo .ReminderLastSent .Valid {
188
+ recorder = r .metrics .NewSendDelay
189
+ }
190
+ recorder .Record (ctx , sendDelay .Seconds ())
191
+ }
192
+ }
161
193
162
194
err = r .store .UpdateReminderLastSentForRepositories (ctx , repoIds )
163
195
if err != nil {
@@ -167,7 +199,7 @@ func (r *reminder) sendReminders(ctx context.Context) error {
167
199
return nil
168
200
}
169
201
170
- func (r * reminder ) getRepositoryBatch (ctx context.Context ) ([]db.Repository , error ) {
202
+ func (r * reminder ) getRepositoryBatch (ctx context.Context ) ([]db.Repository , map [uuid. UUID ]time. Time , error ) {
171
203
logger := zerolog .Ctx (ctx )
172
204
173
205
logger .Debug ().Msgf ("fetching repositories after cursor: %s" , r .repositoryCursor )
@@ -176,21 +208,23 @@ func (r *reminder) getRepositoryBatch(ctx context.Context) ([]db.Repository, err
176
208
Limit : int64 (r .cfg .RecurrenceConfig .BatchSize ),
177
209
})
178
210
if err != nil {
179
- return nil , err
211
+ return nil , nil , err
180
212
}
181
213
182
- eligibleRepos , err := r .getEligibleRepositories (ctx , repos )
214
+ eligibleRepos , eligibleReposLastUpdated , err := r .getEligibleRepositories (ctx , repos )
183
215
if err != nil {
184
- return nil , err
216
+ return nil , nil , err
185
217
}
186
218
logger .Debug ().Msgf ("%d/%d repositories are eligible for reminders" , len (eligibleRepos ), len (repos ))
187
219
188
220
r .updateRepositoryCursor (ctx , repos )
189
221
190
- return eligibleRepos , nil
222
+ return eligibleRepos , eligibleReposLastUpdated , nil
191
223
}
192
224
193
- func (r * reminder ) getEligibleRepositories (ctx context.Context , repos []db.Repository ) ([]db.Repository , error ) {
225
+ func (r * reminder ) getEligibleRepositories (ctx context.Context , repos []db.Repository ) (
226
+ []db.Repository , map [uuid.UUID ]time.Time , error ,
227
+ ) {
194
228
eligibleRepos := make ([]db.Repository , 0 , len (repos ))
195
229
196
230
// We have a slice of repositories, but the sqlc-generated code wants a slice of UUIDs,
@@ -202,11 +236,11 @@ func (r *reminder) getEligibleRepositories(ctx context.Context, repos []db.Repos
202
236
}
203
237
oldestRuleEvals , err := r .store .ListOldestRuleEvaluationsByRepositoryId (ctx , repoIds )
204
238
if err != nil {
205
- return nil , err
239
+ return nil , nil , err
206
240
}
207
241
idToLastUpdate := make (map [uuid.UUID ]time.Time , len (oldestRuleEvals ))
208
- for _ , times := range oldestRuleEvals {
209
- idToLastUpdate [times .RepositoryID ] = times .OldestLastUpdated
242
+ for _ , ruleEval := range oldestRuleEvals {
243
+ idToLastUpdate [ruleEval .RepositoryID ] = ruleEval .OldestLastUpdated
210
244
}
211
245
212
246
cutoff := time .Now ().Add (- 1 * r .cfg .RecurrenceConfig .MinElapsed )
@@ -216,7 +250,7 @@ func (r *reminder) getEligibleRepositories(ctx context.Context, repos []db.Repos
216
250
}
217
251
}
218
252
219
- return eligibleRepos , nil
253
+ return eligibleRepos , idToLastUpdate , nil
220
254
}
221
255
222
256
func (r * reminder ) updateRepositoryCursor (ctx context.Context , repos []db.Repository ) {
0 commit comments