Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Record Metrics for Reminder #4831

Merged
merged 2 commits into from
Mar 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions internal/reminder/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// SPDX-FileCopyrightText: Copyright 2025 The Minder Authors
// SPDX-License-Identifier: Apache-2.0

// Package metrics provides metrics for the reminder service
package metrics

import (
"go.opentelemetry.io/otel/metric"
)

// Default bucket boundaries in seconds for the delay histograms
var delayBuckets = []float64{
60, // 1 minute
300, // 5 minutes
600, // 10 minutes
1800, // 30 minutes
3600, // 1 hour
7200, // 2 hours
10800, // 3 hours
18000, // 5 hours
25200, // 7 hours
36000, // 10 hours
}

// Metrics contains all the metrics for the reminder service
type Metrics struct {
// Time between when a reminder became eligible and when it was sent
SendDelay metric.Float64Histogram

// Time between when a reminder became eligible and when it was sent for the first time
NewSendDelay metric.Float64Histogram

// Current number of reminders in the batch
BatchSize metric.Int64Histogram
}

// NewMetrics creates a new metrics instance
func NewMetrics(meter metric.Meter) (*Metrics, error) {
sendDelay, err := meter.Float64Histogram(
"send_delay",
metric.WithDescription("Time between reminder becoming eligible and actual send (seconds)"),
metric.WithUnit("s"),
metric.WithExplicitBucketBoundaries(delayBuckets...),
)
if err != nil {
return nil, err
}

newSendDelay, err := meter.Float64Histogram(
"new_send_delay",
metric.WithDescription("Time between reminder becoming eligible and actual send (seconds) for first time reminders"),
metric.WithUnit("s"),
metric.WithExplicitBucketBoundaries(delayBuckets...),
)
if err != nil {
return nil, err
}

batchSize, err := meter.Int64Histogram(
"batch_size",
metric.WithDescription("Current number of reminders in the batch"),
)
if err != nil {
return nil, err
}

return &Metrics{
SendDelay: sendDelay,
NewSendDelay: newSendDelay,
BatchSize: batchSize,
}, nil
}
94 changes: 94 additions & 0 deletions internal/reminder/metrics_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// SPDX-FileCopyrightText: Copyright 2025 The Minder Authors
// SPDX-License-Identifier: Apache-2.0

package reminder

import (
"context"
"errors"
"fmt"
"net/http"
"time"

"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/rs/zerolog"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/prometheus"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
)

const (
metricsPath = "/metrics"
readHeaderTimeout = 2 * time.Second
)

func (r *reminder) startMetricServer(ctx context.Context) error {
logger := zerolog.Ctx(ctx)

prometheusExporter, err := prometheus.New(
prometheus.WithNamespace("reminder"),
)
if err != nil {
return fmt.Errorf("failed to create Prometheus exporter: %w", err)
}

res := resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceName("reminder"),
// TODO: Make this auto-generated
semconv.ServiceVersion("v0.1.0"),
)

mp := sdkmetric.NewMeterProvider(
sdkmetric.WithReader(prometheusExporter),
sdkmetric.WithResource(res),
)

otel.SetMeterProvider(mp)

mux := http.NewServeMux()
mux.Handle(metricsPath, promhttp.Handler())
Comment on lines +51 to +52
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need an http.ServeMux here, rather than simply passing promhttp.Handler to the http.Server below?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to serve the metrics on the root path? I don't see any problem with that, but it's just that I've seen /metrics being commonly used.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have a separate port for the metrics, I don't see a reason not to serve it on all the paths off the root. Fetching /metrics should still work.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(On the other hand, I don't have a strong complaint about putting this at /metrics)


server := &http.Server{
Addr: r.cfg.MetricServer.GetAddress(),
Handler: mux,
ReadHeaderTimeout: readHeaderTimeout,
}
logger.Info().Msgf("starting metrics server on %s", server.Addr)

// Start the metrics server
go func() {
err = server.ListenAndServe()
if err != nil && !errors.Is(err, http.ErrServerClosed) {
logger.Err(err).Msg("error starting metrics server")
}
}()

// Watch for context cancellation or stop signal to shutdown the metrics server
go func() {
select {
case <-ctx.Done():
case <-r.stop:
}

// shutdown the metrics server when either the context is done or when reminder is stopped
shutdownCtx, shutdownRelease := context.WithTimeout(context.Background(), 5*time.Second)
defer shutdownRelease()

logger.Info().Msg("shutting down metrics server")

if err = server.Shutdown(shutdownCtx); err != nil {
logger.Err(err).Msg("error shutting down metrics server")
}

if err = mp.Shutdown(shutdownCtx); err != nil {
logger.Err(err).Msg("error shutting down metrics provider")
}

close(r.metricsServerDone)
}()

return nil
}
78 changes: 57 additions & 21 deletions internal/reminder/reminder.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ import (
"github.com/ThreeDotsLabs/watermill/message"
"github.com/google/uuid"
"github.com/rs/zerolog"
"go.opentelemetry.io/otel"

"github.com/mindersec/minder/internal/db"
remindermessages "github.com/mindersec/minder/internal/reminder/messages"
"github.com/mindersec/minder/internal/reminder/metrics"
reminderconfig "github.com/mindersec/minder/pkg/config/reminder"
"github.com/mindersec/minder/pkg/eventer/constants"
)
Expand All @@ -42,14 +44,18 @@ type reminder struct {
ticker *time.Ticker

eventPublisher message.Publisher

metrics *metrics.Metrics
metricsServerDone chan struct{}
}

// NewReminder creates a new reminder instance
func NewReminder(ctx context.Context, store db.Store, config *reminderconfig.Config) (Interface, error) {
r := &reminder{
store: store,
cfg: config,
stop: make(chan struct{}),
store: store,
cfg: config,
stop: make(chan struct{}),
metricsServerDone: make(chan struct{}),
}

// Set to a random UUID to start
Expand All @@ -74,21 +80,40 @@ func (r *reminder) Start(ctx context.Context) error {
return errors.New("reminder stopped, cannot start again")
default:
}
// Reminder only stops in case of error or context cancellation (or when Stop() is called)
// An errored out reminder cannot be started again, so it is stopped here
// This also prevents resource leaks if user doesn't explicitly stop the reminder
defer r.Stop()

interval := r.cfg.RecurrenceConfig.Interval
if interval <= 0 {
return fmt.Errorf("invalid interval: %s", r.cfg.RecurrenceConfig.Interval)
}

if r.cfg.MetricsConfig.Enabled {
if err := r.startMetricServer(ctx); err != nil {
logger.Err(err).Msg("failed to start metrics server")
}

var err error
r.metrics, err = metrics.NewMetrics(otel.Meter("reminder"))
if err != nil {
return err
}
} else {
close(r.metricsServerDone)
}

r.ticker = time.NewTicker(interval)
defer r.Stop()

for {
select {
case <-ctx.Done():
<-r.metricsServerDone
logger.Info().Msg("reminder stopped")
return nil
case <-r.stop:
<-r.metricsServerDone
logger.Info().Msg("reminder stopped")
return nil
case <-r.ticker.C:
Expand Down Expand Up @@ -120,13 +145,15 @@ func (r *reminder) Stop() {
zerolog.Ctx(context.Background()).Error().Err(err).Msg("error closing event publisher")
}
})
// Wait for the metrics server to stop
<-r.metricsServerDone
}

func (r *reminder) sendReminders(ctx context.Context) error {
logger := zerolog.Ctx(ctx)

// Fetch a batch of repositories
repos, err := r.getRepositoryBatch(ctx)
repos, repoToLastUpdated, err := r.getRepositoryBatch(ctx)
if err != nil {
return fmt.Errorf("error fetching repository batch: %w", err)
}
Expand All @@ -143,6 +170,10 @@ func (r *reminder) sendReminders(ctx context.Context) error {
return fmt.Errorf("error creating reminder messages: %w", err)
}

if r.metrics != nil {
r.metrics.BatchSize.Record(ctx, int64(len(repos)))
}

err = r.eventPublisher.Publish(constants.TopicQueueRepoReminder, messages...)
if err != nil {
return fmt.Errorf("error publishing messages: %w", err)
Expand All @@ -151,13 +182,16 @@ func (r *reminder) sendReminders(ctx context.Context) error {
repoIds := make([]uuid.UUID, len(repos))
for _, repo := range repos {
repoIds = append(repoIds, repo.ID)
}
if r.metrics != nil {
sendDelay := time.Since(repoToLastUpdated[repo.ID]) - r.cfg.RecurrenceConfig.MinElapsed

// TODO: Collect Metrics
// Potential metrics:
// - Gauge: Number of reminders in the current batch
// - UpDownCounter: Average reminders sent per batch
// - Histogram: reminder_last_sent time distribution
recorder := r.metrics.SendDelay
if !repo.ReminderLastSent.Valid {
recorder = r.metrics.NewSendDelay
}
recorder.Record(ctx, sendDelay.Seconds())
}
}
Comment on lines +185 to +194
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now this all brings back the question that how useful is ReminderLastSent metric that we store in the main DB? Is there any other application for it? Right now it's just being used as a boolean for checking whether a repo has been reconciled or not (through reminder)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, you mean the column in the database? Given where we're at now, maybe we don't need it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep it for some time around, till we setup reminder with metrics to see if this can be potentially valuable. We can cleanup later if not. (Adding code requires more testing)


err = r.store.UpdateReminderLastSentForRepositories(ctx, repoIds)
if err != nil {
Expand All @@ -167,7 +201,7 @@ func (r *reminder) sendReminders(ctx context.Context) error {
return nil
}

func (r *reminder) getRepositoryBatch(ctx context.Context) ([]db.Repository, error) {
func (r *reminder) getRepositoryBatch(ctx context.Context) ([]db.Repository, map[uuid.UUID]time.Time, error) {
logger := zerolog.Ctx(ctx)

logger.Debug().Msgf("fetching repositories after cursor: %s", r.repositoryCursor)
Expand All @@ -176,21 +210,23 @@ func (r *reminder) getRepositoryBatch(ctx context.Context) ([]db.Repository, err
Limit: int64(r.cfg.RecurrenceConfig.BatchSize),
})
if err != nil {
return nil, err
return nil, nil, err
}

eligibleRepos, err := r.getEligibleRepositories(ctx, repos)
eligibleRepos, eligibleReposLastUpdated, err := r.getEligibleRepositories(ctx, repos)
if err != nil {
return nil, err
return nil, nil, err
}
logger.Debug().Msgf("%d/%d repositories are eligible for reminders", len(eligibleRepos), len(repos))

r.updateRepositoryCursor(ctx, repos)

return eligibleRepos, nil
return eligibleRepos, eligibleReposLastUpdated, nil
}

func (r *reminder) getEligibleRepositories(ctx context.Context, repos []db.Repository) ([]db.Repository, error) {
func (r *reminder) getEligibleRepositories(ctx context.Context, repos []db.Repository) (
[]db.Repository, map[uuid.UUID]time.Time, error,
) {
eligibleRepos := make([]db.Repository, 0, len(repos))

// We have a slice of repositories, but the sqlc-generated code wants a slice of UUIDs,
Expand All @@ -202,11 +238,11 @@ func (r *reminder) getEligibleRepositories(ctx context.Context, repos []db.Repos
}
oldestRuleEvals, err := r.store.ListOldestRuleEvaluationsByRepositoryId(ctx, repoIds)
if err != nil {
return nil, err
return nil, nil, err
}
idToLastUpdate := make(map[uuid.UUID]time.Time, len(oldestRuleEvals))
for _, times := range oldestRuleEvals {
idToLastUpdate[times.RepositoryID] = times.OldestLastUpdated
for _, ruleEval := range oldestRuleEvals {
idToLastUpdate[ruleEval.RepositoryID] = ruleEval.OldestLastUpdated
}

cutoff := time.Now().Add(-1 * r.cfg.RecurrenceConfig.MinElapsed)
Expand All @@ -216,7 +252,7 @@ func (r *reminder) getEligibleRepositories(ctx context.Context, repos []db.Repos
}
}

return eligibleRepos, nil
return eligibleRepos, idToLastUpdate, nil
}

func (r *reminder) updateRepositoryCursor(ctx context.Context, repos []db.Repository) {
Expand Down
2 changes: 1 addition & 1 deletion internal/reminder/reminder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func Test_getRepositoryBatch(t *testing.T) {

r := createTestReminder(t, store, cfg)

got, err := r.getRepositoryBatch(context.Background())
got, _, err := r.getRepositoryBatch(context.Background())
if test.err != "" {
require.ErrorContains(t, err, test.err)
return
Expand Down
10 changes: 6 additions & 4 deletions pkg/config/reminder/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ import (

// Config contains the configuration for the reminder service
type Config struct {
Database config.DatabaseConfig `mapstructure:"database"`
RecurrenceConfig RecurrenceConfig `mapstructure:"recurrence"`
EventConfig serverconfig.EventConfig `mapstructure:"events"`
LoggingConfig LoggingConfig `mapstructure:"logging"`
Database config.DatabaseConfig `mapstructure:"database"`
RecurrenceConfig RecurrenceConfig `mapstructure:"recurrence"`
EventConfig serverconfig.EventConfig `mapstructure:"events"`
LoggingConfig LoggingConfig `mapstructure:"logging"`
MetricsConfig serverconfig.MetricsConfig `mapstructure:"metrics"`
MetricServer serverconfig.MetricServerConfig `mapstructure:"metric_server" default:"{\"port\":\"9091\"}"`
}

// Validate validates the configuration
Expand Down