Skip to content

Commit ccde7be

Browse files
committed
Record Metrics for Reminder
Signed-off-by: Vyom Yadav <[email protected]>
1 parent a6d9b22 commit ccde7be

File tree

5 files changed

+242
-20
lines changed

5 files changed

+242
-20
lines changed

internal/reminder/metrics/metrics.go

+105
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
// SPDX-FileCopyrightText: Copyright 2024 The Minder Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
// Package metrics provides metrics for the reminder service
5+
package metrics
6+
7+
import (
8+
"context"
9+
10+
"go.opentelemetry.io/otel/metric"
11+
)
12+
13+
// Default bucket boundaries in seconds for the delay histograms
14+
var delayBuckets = []float64{
15+
60, // 1 minute
16+
300, // 5 minutes
17+
600, // 10 minutes
18+
1800, // 30 minutes
19+
3600, // 1 hour
20+
7200, // 2 hours
21+
10800, // 3 hours
22+
18000, // 5 hours
23+
25200, // 7 hours
24+
36000, // 10 hours
25+
}
26+
27+
// Metrics contains all the metrics for the reminder service
28+
type Metrics struct {
29+
// Time between when a reminder became eligible and when it was sent
30+
SendDelay metric.Float64Histogram
31+
32+
// Time between when a reminder became eligible and when it was sent
33+
NewSendDelay metric.Float64Histogram
34+
35+
// Current number of reminders in the batch
36+
BatchSize metric.Int64Histogram
37+
38+
// Total number of batches processed
39+
TotalBatches metric.Int64Counter
40+
41+
// Total number of reminders processed (total entities reconciled)
42+
TotalReminders metric.Int64Counter
43+
}
44+
45+
// NewMetrics creates a new metrics instance
46+
func NewMetrics(meter metric.Meter) (*Metrics, error) {
47+
sendDelay, err := meter.Float64Histogram(
48+
"send_delay",
49+
metric.WithDescription("Time between reminder becoming eligible and actual send (seconds)"),
50+
metric.WithUnit("s"),
51+
metric.WithExplicitBucketBoundaries(delayBuckets...),
52+
)
53+
if err != nil {
54+
return nil, err
55+
}
56+
57+
newSendDelay, err := meter.Float64Histogram(
58+
"new_send_delay",
59+
metric.WithDescription("Time between reminder becoming eligible and actual send (seconds) for first time reminders"),
60+
metric.WithUnit("s"),
61+
metric.WithExplicitBucketBoundaries(delayBuckets...),
62+
)
63+
if err != nil {
64+
return nil, err
65+
}
66+
67+
batchSize, err := meter.Int64Histogram(
68+
"batch_size",
69+
metric.WithDescription("Current number of reminders in the batch"),
70+
)
71+
if err != nil {
72+
return nil, err
73+
}
74+
75+
totalBatches, err := meter.Int64Counter(
76+
"total_batches",
77+
metric.WithDescription("Total number of batches processed"),
78+
)
79+
if err != nil {
80+
return nil, err
81+
}
82+
83+
totalReminders, err := meter.Int64Counter(
84+
"total_reminders",
85+
metric.WithDescription("Total number of reminders processed"),
86+
)
87+
if err != nil {
88+
return nil, err
89+
}
90+
91+
return &Metrics{
92+
SendDelay: sendDelay,
93+
NewSendDelay: newSendDelay,
94+
BatchSize: batchSize,
95+
TotalBatches: totalBatches,
96+
TotalReminders: totalReminders,
97+
}, nil
98+
}
99+
100+
// RecordBatch records the metrics for a batch of reminders
101+
func (m *Metrics) RecordBatch(ctx context.Context, size int64) {
102+
m.BatchSize.Record(ctx, size)
103+
m.TotalBatches.Add(ctx, 1)
104+
m.TotalReminders.Add(ctx, size)
105+
}

internal/reminder/metrics_server.go

+84
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
// SPDX-FileCopyrightText: Copyright 2024 The Minder Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package reminder
5+
6+
import (
7+
"context"
8+
"fmt"
9+
"net/http"
10+
"time"
11+
12+
"github.com/prometheus/client_golang/prometheus/promhttp"
13+
"github.com/rs/zerolog"
14+
"go.opentelemetry.io/otel"
15+
"go.opentelemetry.io/otel/exporters/prometheus"
16+
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
17+
"go.opentelemetry.io/otel/sdk/resource"
18+
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
19+
)
20+
21+
const (
22+
metricsPath = "/metrics"
23+
readHeaderTimeout = 2 * time.Second
24+
)
25+
26+
func (r *reminder) startMetricServer(ctx context.Context) error {
27+
logger := zerolog.Ctx(ctx)
28+
29+
prometheusExporter, err := prometheus.New(
30+
prometheus.WithNamespace("reminder"),
31+
)
32+
if err != nil {
33+
return fmt.Errorf("failed to create Prometheus exporter: %w", err)
34+
}
35+
36+
res := resource.NewWithAttributes(
37+
semconv.SchemaURL,
38+
semconv.ServiceName("reminder"),
39+
// TODO: Make this auto-generated
40+
semconv.ServiceVersion("v0.1.0"),
41+
)
42+
43+
mp := sdkmetric.NewMeterProvider(
44+
sdkmetric.WithReader(prometheusExporter),
45+
sdkmetric.WithResource(res),
46+
)
47+
48+
otel.SetMeterProvider(mp)
49+
50+
mux := http.NewServeMux()
51+
mux.Handle(metricsPath, promhttp.Handler())
52+
53+
server := &http.Server{
54+
Addr: r.cfg.MetricServer.GetAddress(),
55+
Handler: mux,
56+
ReadHeaderTimeout: readHeaderTimeout,
57+
}
58+
59+
logger.Info().Msgf("starting metrics server on %s", server.Addr)
60+
61+
errCh := make(chan error)
62+
go func() {
63+
errCh <- server.ListenAndServe()
64+
}()
65+
66+
select {
67+
case err := <-errCh:
68+
return err
69+
case <-ctx.Done():
70+
case <-r.stop:
71+
}
72+
73+
// shutdown the metrics server when either the context is done or when reminder is stopped
74+
shutdownCtx, shutdownRelease := context.WithTimeout(context.Background(), 5*time.Second)
75+
defer shutdownRelease()
76+
77+
logger.Info().Msg("shutting down metrics server")
78+
79+
if err := mp.Shutdown(shutdownCtx); err != nil {
80+
logger.Err(err).Msg("error shutting down metrics provider")
81+
}
82+
83+
return server.Shutdown(shutdownCtx)
84+
}

internal/reminder/reminder.go

+46-15
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,11 @@ import (
1414
"github.com/ThreeDotsLabs/watermill/message"
1515
"github.com/google/uuid"
1616
"github.com/rs/zerolog"
17+
"go.opentelemetry.io/otel"
1718

1819
"github.com/mindersec/minder/internal/db"
1920
remindermessages "github.com/mindersec/minder/internal/reminder/messages"
21+
"github.com/mindersec/minder/internal/reminder/metrics"
2022
reminderconfig "github.com/mindersec/minder/pkg/config/reminder"
2123
"github.com/mindersec/minder/pkg/eventer/constants"
2224
)
@@ -42,6 +44,8 @@ type reminder struct {
4244
ticker *time.Ticker
4345

4446
eventPublisher message.Publisher
47+
48+
metrics *metrics.Metrics
4549
}
4650

4751
// NewReminder creates a new reminder instance
@@ -80,6 +84,20 @@ func (r *reminder) Start(ctx context.Context) error {
8084
return fmt.Errorf("invalid interval: %s", r.cfg.RecurrenceConfig.Interval)
8185
}
8286

87+
if r.cfg.MetricsConfig.Enabled {
88+
go func() {
89+
if err := r.startMetricServer(ctx); err != nil {
90+
logger.Error().Err(err).Msg("error starting metrics server")
91+
}
92+
}()
93+
94+
var err error
95+
r.metrics, err = metrics.NewMetrics(otel.Meter("reminder"))
96+
if err != nil {
97+
return err
98+
}
99+
}
100+
83101
r.ticker = time.NewTicker(interval)
84102
defer r.Stop()
85103

@@ -126,7 +144,7 @@ func (r *reminder) sendReminders(ctx context.Context) error {
126144
logger := zerolog.Ctx(ctx)
127145

128146
// Fetch a batch of repositories
129-
repos, err := r.getRepositoryBatch(ctx)
147+
repos, repoToLastUpdated, err := r.getRepositoryBatch(ctx)
130148
if err != nil {
131149
return fmt.Errorf("error fetching repository batch: %w", err)
132150
}
@@ -143,6 +161,10 @@ func (r *reminder) sendReminders(ctx context.Context) error {
143161
return fmt.Errorf("error creating reminder messages: %w", err)
144162
}
145163

164+
if r.metrics != nil {
165+
r.metrics.RecordBatch(ctx, int64(len(repos)))
166+
}
167+
146168
err = r.eventPublisher.Publish(constants.TopicQueueRepoReminder, messages...)
147169
if err != nil {
148170
return fmt.Errorf("error publishing messages: %w", err)
@@ -151,14 +173,19 @@ func (r *reminder) sendReminders(ctx context.Context) error {
151173
repoIds := make([]uuid.UUID, len(repos))
152174
for _, repo := range repos {
153175
repoIds = append(repoIds, repo.ID)
176+
if r.metrics != nil {
177+
// sendDelay = Now() - ReminderLastSent - MinElapsed
178+
reminderLastSent := repo.ReminderLastSent
179+
if reminderLastSent.Valid {
180+
r.metrics.SendDelay.Record(ctx, (time.Since(reminderLastSent.Time) - r.cfg.RecurrenceConfig.MinElapsed).Seconds())
181+
} else {
182+
// Recording metric for first time reminders (after how long was the initial reminder sent)
183+
newSendDelay := time.Since(repoToLastUpdated[repo.ID]) - r.cfg.RecurrenceConfig.MinElapsed
184+
r.metrics.NewSendDelay.Record(ctx, newSendDelay.Seconds())
185+
}
186+
}
154187
}
155188

156-
// TODO: Collect Metrics
157-
// Potential metrics:
158-
// - Gauge: Number of reminders in the current batch
159-
// - UpDownCounter: Average reminders sent per batch
160-
// - Histogram: reminder_last_sent time distribution
161-
162189
err = r.store.UpdateReminderLastSentForRepositories(ctx, repoIds)
163190
if err != nil {
164191
return fmt.Errorf("reminders published but error updating last sent time: %w", err)
@@ -167,7 +194,7 @@ func (r *reminder) sendReminders(ctx context.Context) error {
167194
return nil
168195
}
169196

170-
func (r *reminder) getRepositoryBatch(ctx context.Context) ([]db.Repository, error) {
197+
func (r *reminder) getRepositoryBatch(ctx context.Context) ([]db.Repository, map[uuid.UUID]time.Time, error) {
171198
logger := zerolog.Ctx(ctx)
172199

173200
logger.Debug().Msgf("fetching repositories after cursor: %s", r.repositoryCursor)
@@ -176,22 +203,25 @@ func (r *reminder) getRepositoryBatch(ctx context.Context) ([]db.Repository, err
176203
Limit: int64(r.cfg.RecurrenceConfig.BatchSize),
177204
})
178205
if err != nil {
179-
return nil, err
206+
return nil, nil, err
180207
}
181208

182-
eligibleRepos, err := r.getEligibleRepositories(ctx, repos)
209+
eligibleRepos, eligibleReposLastUpdated, err := r.getEligibleRepositories(ctx, repos)
183210
if err != nil {
184-
return nil, err
211+
return nil, nil, err
185212
}
186213
logger.Debug().Msgf("%d/%d repositories are eligible for reminders", len(eligibleRepos), len(repos))
187214

188215
r.updateRepositoryCursor(ctx, repos)
189216

190-
return eligibleRepos, nil
217+
return eligibleRepos, eligibleReposLastUpdated, nil
191218
}
192219

193-
func (r *reminder) getEligibleRepositories(ctx context.Context, repos []db.Repository) ([]db.Repository, error) {
220+
func (r *reminder) getEligibleRepositories(ctx context.Context, repos []db.Repository) (
221+
[]db.Repository, map[uuid.UUID]time.Time, error,
222+
) {
194223
eligibleRepos := make([]db.Repository, 0, len(repos))
224+
eligibleReposLastUpdated := make(map[uuid.UUID]time.Time, len(repos))
195225

196226
// We have a slice of repositories, but the sqlc-generated code wants a slice of UUIDs,
197227
// and similarly returns slices of ID -> date (in possibly different order), so we need
@@ -202,7 +232,7 @@ func (r *reminder) getEligibleRepositories(ctx context.Context, repos []db.Repos
202232
}
203233
oldestRuleEvals, err := r.store.ListOldestRuleEvaluationsByRepositoryId(ctx, repoIds)
204234
if err != nil {
205-
return nil, err
235+
return nil, nil, err
206236
}
207237
idToLastUpdate := make(map[uuid.UUID]time.Time, len(oldestRuleEvals))
208238
for _, times := range oldestRuleEvals {
@@ -213,10 +243,11 @@ func (r *reminder) getEligibleRepositories(ctx context.Context, repos []db.Repos
213243
for _, repo := range repos {
214244
if t, ok := idToLastUpdate[repo.ID]; ok && t.Before(cutoff) {
215245
eligibleRepos = append(eligibleRepos, repo)
246+
eligibleReposLastUpdated[repo.ID] = t
216247
}
217248
}
218249

219-
return eligibleRepos, nil
250+
return eligibleRepos, eligibleReposLastUpdated, nil
220251
}
221252

222253
func (r *reminder) updateRepositoryCursor(ctx context.Context, repos []db.Repository) {

internal/reminder/reminder_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ func Test_getRepositoryBatch(t *testing.T) {
159159

160160
r := createTestReminder(t, store, cfg)
161161

162-
got, err := r.getRepositoryBatch(context.Background())
162+
got, _, err := r.getRepositoryBatch(context.Background())
163163
if test.err != "" {
164164
require.ErrorContains(t, err, test.err)
165165
return

pkg/config/reminder/config.go

+6-4
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@ import (
1818

1919
// Config contains the configuration for the reminder service
2020
type Config struct {
21-
Database config.DatabaseConfig `mapstructure:"database"`
22-
RecurrenceConfig RecurrenceConfig `mapstructure:"recurrence"`
23-
EventConfig serverconfig.EventConfig `mapstructure:"events"`
24-
LoggingConfig LoggingConfig `mapstructure:"logging"`
21+
Database config.DatabaseConfig `mapstructure:"database"`
22+
RecurrenceConfig RecurrenceConfig `mapstructure:"recurrence"`
23+
EventConfig serverconfig.EventConfig `mapstructure:"events"`
24+
LoggingConfig LoggingConfig `mapstructure:"logging"`
25+
MetricsConfig serverconfig.MetricsConfig `mapstructure:"metrics"`
26+
MetricServer serverconfig.MetricServerConfig `mapstructure:"metric_server" default:"{\"port\":\"9091\"}"`
2527
}
2628

2729
// Validate validates the configuration

0 commit comments

Comments
 (0)