Skip to content

Commit f2cf212

Browse files
committed
Notifications: Address Code Review
- Bump IGL to latest changes in Icinga/icinga-go-library#145. - Allow specifying which pipeline keys are relevant, ignore others. - Allow specifying which pipeline key should be parsed in which type. - Create history.DowntimeHistoryMeta as a chimera combining history.DowntimeHistory and history.HistoryDowntime to allow access event_type, distinguishing between downtime_start and downtime_end. - Trace times for submission steps in the worker. Turns out, the single threaded worker blocks roughly two seconds for each Client.ProcessEvent method call. This might sum up to minutes if lots of events are processed at once. My current theory is that the delay results in the expensive bcrypt hash comparison on Notifications.
1 parent fd4c517 commit f2cf212

File tree

8 files changed

+255
-169
lines changed

8 files changed

+255
-169
lines changed

cmd/icingadb/main.go

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -170,23 +170,26 @@ func run() int {
170170
sig := make(chan os.Signal, 1)
171171
signal.Notify(sig, os.Interrupt, syscall.SIGTERM, syscall.SIGHUP)
172172

173-
var notificationsSourceCallback func(database.Entity)
174-
if cfg := cmd.Config.NotificationsSource; cfg.ApiBaseUrl != "" {
175-
logger.Info("Starting Icinga Notifications source")
176-
177-
notificationsSource := notifications.NewNotificationsSource(
178-
ctx,
179-
db,
180-
rc,
181-
logs.GetChildLogger("notifications-source"),
182-
cfg)
183-
notificationsSourceCallback = notificationsSource.Submit
184-
}
185-
186173
go func() {
174+
var callback func(database.Entity)
175+
var callbackKeyStructPtr map[string]any
176+
177+
if cfg := cmd.Config.NotificationsSource; cfg.ApiBaseUrl != "" {
178+
logger.Info("Starting Icinga Notifications source")
179+
180+
notificationsSource := notifications.NewNotificationsClient(
181+
ctx,
182+
db,
183+
rc,
184+
logs.GetChildLogger("notifications-source"),
185+
cfg)
186+
callback = notificationsSource.Submit
187+
callbackKeyStructPtr = notifications.SyncKeyStructPtrs
188+
}
189+
187190
logger.Info("Starting history sync")
188191

189-
if err := hs.Sync(ctx, notificationsSourceCallback); err != nil && !utils.IsContextCanceled(err) {
192+
if err := hs.Sync(ctx, callbackKeyStructPtr, callback); err != nil && !utils.IsContextCanceled(err) {
190193
logger.Fatalf("%+v", err)
191194
}
192195
}()

go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,14 @@ require (
77
github.com/goccy/go-yaml v1.13.0
88
github.com/google/go-cmp v0.7.0
99
github.com/google/uuid v1.6.0
10-
github.com/icinga/icinga-go-library v0.7.3-0.20250807134650-55c038b220d8
10+
github.com/icinga/icinga-go-library v0.7.3-0.20250904130608-5032573a325e
1111
github.com/jessevdk/go-flags v1.6.1
1212
github.com/jmoiron/sqlx v1.4.0
1313
github.com/mattn/go-sqlite3 v1.14.32
1414
github.com/okzk/sdnotify v0.0.0-20180710141335-d9becc38acbd
1515
github.com/pkg/errors v0.9.1
16+
github.com/redis/go-redis/v9 v9.13.0
1617
github.com/stretchr/testify v1.11.1
17-
github.com/redis/go-redis/v9 v9.11.0
1818
github.com/vbauerster/mpb/v6 v6.0.4
1919
go.uber.org/zap v1.27.0
2020
golang.org/x/sync v0.16.0

go.sum

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
3939
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
4040
github.com/icinga/icinga-go-library v0.7.3-0.20250807134650-55c038b220d8 h1:YStwl7OlLUN87ROcZ8LQRTDKBhEx0V7EX5+5OMRwEnM=
4141
github.com/icinga/icinga-go-library v0.7.3-0.20250807134650-55c038b220d8/go.mod h1:6xgV9o7JcGVg2I6CzPfefQZF4Ev8QeZnysiu82YaRY4=
42+
github.com/icinga/icinga-go-library v0.7.3-0.20250904130608-5032573a325e h1:yZPWPPCHKozWRm9VedDHd8igJh9uI4U2CJdgl1On+/4=
43+
github.com/icinga/icinga-go-library v0.7.3-0.20250904130608-5032573a325e/go.mod h1:exEJdfik2GPYrvZM6Gn4BXIBLIGg6OrCCMnILT+mTUs=
4244
github.com/jessevdk/go-flags v1.6.1 h1:Cvu5U8UGrLay1rZfv/zP7iLpSHGUZ/Ou68T0iX1bBK4=
4345
github.com/jessevdk/go-flags v1.6.1/go.mod h1:Mk8T1hIAWpOiJiHa9rJASDK2UGWji0EuPGBnNLMooyc=
4446
github.com/jmoiron/sqlx v1.4.0 h1:1PLqN7S1UYp5t4SrVVnt4nUVNemrDAtxlulVe+Qgm3o=
@@ -65,6 +67,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
6567
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
6668
github.com/redis/go-redis/v9 v9.11.0 h1:E3S08Gl/nJNn5vkxd2i78wZxWAPNZgUNTp8WIJUAiIs=
6769
github.com/redis/go-redis/v9 v9.11.0/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw=
70+
github.com/redis/go-redis/v9 v9.13.0 h1:PpmlVykE0ODh8P43U0HqC+2NXHXwG+GUtQyz+MPKGRg=
71+
github.com/redis/go-redis/v9 v9.13.0/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw=
6872
github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
6973
github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY=
7074
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=

internal/config/config.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import (
44
"github.com/creasty/defaults"
55
"github.com/icinga/icinga-go-library/database"
66
"github.com/icinga/icinga-go-library/logging"
7-
"github.com/icinga/icinga-go-library/notifications"
7+
"github.com/icinga/icinga-go-library/notifications/source"
88
"github.com/icinga/icinga-go-library/redis"
99
"github.com/icinga/icingadb/pkg/icingadb/history"
1010
"github.com/pkg/errors"
@@ -16,11 +16,11 @@ const DefaultConfigPath = "/etc/icingadb/config.yml"
1616

1717
// Config defines Icinga DB config.
1818
type Config struct {
19-
Database database.Config `yaml:"database" envPrefix:"DATABASE_"`
20-
Redis redis.Config `yaml:"redis" envPrefix:"REDIS_"`
21-
Logging logging.Config `yaml:"logging" envPrefix:"LOGGING_"`
22-
Retention RetentionConfig `yaml:"retention" envPrefix:"RETENTION_"`
23-
NotificationsSource notifications.Config `yaml:"notifications-source" envPrefix:"NOTIFICATIONS_SOURCE_"`
19+
Database database.Config `yaml:"database" envPrefix:"DATABASE_"`
20+
Redis redis.Config `yaml:"redis" envPrefix:"REDIS_"`
21+
Logging logging.Config `yaml:"logging" envPrefix:"LOGGING_"`
22+
Retention RetentionConfig `yaml:"retention" envPrefix:"RETENTION_"`
23+
NotificationsSource source.Config `yaml:"notifications-source" envPrefix:"NOTIFICATIONS_SOURCE_"`
2424
}
2525

2626
func (c *Config) SetDefaults() {

pkg/icingadb/history/sync.go

Lines changed: 49 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,15 @@ func NewSync(db *database.DB, redis *redis.Client, logger *logging.Logger) *Sync
3939

4040
// Sync synchronizes Redis history streams from s.redis to s.db and deletes the original data on success.
4141
//
42-
// If not nil, the callback function is appended to each synchronization pipeline and called before the entry is deleted
43-
// from Redis.
44-
func (s Sync) Sync(ctx context.Context, callback func(database.Entity)) error {
42+
// An optional callback and callbackKeyStructPtr might be given. Both most either be nil or not nil.
43+
//
44+
// The callbackKeyStructPtr says which pipeline keys should be mapped to which type, identified by a struct pointer. If
45+
// a key is missing from the map, it will not be used for the callback. The callback function itself shall not block.
46+
func (s Sync) Sync(ctx context.Context, callbackKeyStructPtr map[string]any, callback func(database.Entity)) error {
47+
if (callbackKeyStructPtr == nil) != (callback == nil) {
48+
return fmt.Errorf("either both callbackKeyStructPtr and callback must be nil or none")
49+
}
50+
4551
g, ctx := errgroup.WithContext(ctx)
4652

4753
for key, pipeline := range syncPipelines {
@@ -67,8 +73,13 @@ func (s Sync) Sync(ctx context.Context, callback func(database.Entity)) error {
6773
// forward the entry after it has completed its own sync so that later stages can rely on previous stages being
6874
// executed successfully.
6975

70-
if callback != nil {
71-
pipeline = append(pipeline, makeCallbackStageFunc(callback))
76+
// Shadowed variable to allow appending custom callbacks.
77+
pipeline := pipeline
78+
if callbackKeyStructPtr != nil {
79+
_, ok := callbackKeyStructPtr[key]
80+
if ok {
81+
pipeline = append(pipeline, makeCallbackStageFunc(callbackKeyStructPtr, callback))
82+
}
7283
}
7384

7485
ch := make([]chan redis.XMessage, len(pipeline)+1)
@@ -364,28 +375,17 @@ func userNotificationStage(ctx context.Context, s Sync, key string, in <-chan re
364375

365376
// makeCallbackStageFunc creates a new stageFunc calling the given callback function for each message.
366377
//
378+
// The keyStructPtrs map decides what kind of database.Entity type will be used for the input data based on the key.
379+
//
367380
// The callback call is blocking and the message will be forwarded to the out channel after the function has returned.
368381
// Thus, please ensure this function does not block too long.
369-
func makeCallbackStageFunc(callback func(database.Entity)) stageFunc {
382+
func makeCallbackStageFunc(keyStructPtrs map[string]any, callback func(database.Entity)) stageFunc {
370383
return func(ctx context.Context, _ Sync, key string, in <-chan redis.XMessage, out chan<- redis.XMessage) error {
371384
defer close(out)
372385

373-
var structPtr database.Entity
374-
switch key { // keep in sync with syncPipelines below
375-
case "notification":
376-
structPtr = (*v1.NotificationHistory)(nil)
377-
case "state":
378-
structPtr = (*v1.StateHistory)(nil)
379-
case "downtime":
380-
structPtr = (*v1.DowntimeHistory)(nil)
381-
case "comment":
382-
structPtr = (*v1.CommentHistory)(nil)
383-
case "flapping":
384-
structPtr = (*v1.FlappingHistory)(nil)
385-
case "acknowledgement":
386-
structPtr = (*v1.AcknowledgementHistory)(nil)
387-
default:
388-
return fmt.Errorf("unsupported key %q", key)
386+
structPtr, ok := keyStructPtrs[key]
387+
if !ok {
388+
return fmt.Errorf("can't lookup struct pointer for key %q", key)
389389
}
390390

391391
structifier := structify.MakeMapStructifier(
@@ -402,7 +402,7 @@ func makeCallbackStageFunc(callback func(database.Entity)) stageFunc {
402402

403403
val, err := structifier(msg.Values)
404404
if err != nil {
405-
return errors.Wrapf(err, "can't structify values %#v for %s", msg.Values, key)
405+
return errors.Wrapf(err, "can't structify values %#v for %q", msg.Values, key)
406406
}
407407

408408
entity, ok := val.(database.Entity)
@@ -420,32 +420,41 @@ func makeCallbackStageFunc(callback func(database.Entity)) stageFunc {
420420
}
421421
}
422422

423+
const (
424+
SyncPipelineAcknowledgement = "acknowledgement"
425+
SyncPipelineComment = "comment"
426+
SyncPipelineDowntime = "downtime"
427+
SyncPipelineFlapping = "flapping"
428+
SyncPipelineNotification = "notification"
429+
SyncPipelineState = "state"
430+
)
431+
423432
var syncPipelines = map[string][]stageFunc{
424-
"notification": {
425-
writeOneEntityStage((*v1.NotificationHistory)(nil)), // notification_history
426-
userNotificationStage, // user_notification_history (depends on notification_history)
427-
writeOneEntityStage((*v1.HistoryNotification)(nil)), // history (depends on notification_history)
433+
SyncPipelineAcknowledgement: {
434+
writeOneEntityStage((*v1.AcknowledgementHistory)(nil)), // acknowledgement_history
435+
writeOneEntityStage((*v1.HistoryAck)(nil)), // history (depends on acknowledgement_history)
428436
},
429-
"state": {
430-
writeOneEntityStage((*v1.StateHistory)(nil)), // state_history
431-
writeOneEntityStage((*v1.HistoryState)(nil)), // history (depends on state_history)
432-
writeMultiEntityStage(stateHistoryToSlaEntity), // sla_history_state
437+
SyncPipelineComment: {
438+
writeOneEntityStage((*v1.CommentHistory)(nil)), // comment_history
439+
writeOneEntityStage((*v1.HistoryComment)(nil)), // history (depends on comment_history)
433440
},
434-
"downtime": {
441+
SyncPipelineDowntime: {
435442
writeOneEntityStage((*v1.DowntimeHistory)(nil)), // downtime_history
436443
writeOneEntityStage((*v1.HistoryDowntime)(nil)), // history (depends on downtime_history)
437444
writeOneEntityStage((*v1.SlaHistoryDowntime)(nil)), // sla_history_downtime
438445
},
439-
"comment": {
440-
writeOneEntityStage((*v1.CommentHistory)(nil)), // comment_history
441-
writeOneEntityStage((*v1.HistoryComment)(nil)), // history (depends on comment_history)
442-
},
443-
"flapping": {
446+
SyncPipelineFlapping: {
444447
writeOneEntityStage((*v1.FlappingHistory)(nil)), // flapping_history
445448
writeOneEntityStage((*v1.HistoryFlapping)(nil)), // history (depends on flapping_history)
446449
},
447-
"acknowledgement": {
448-
writeOneEntityStage((*v1.AcknowledgementHistory)(nil)), // acknowledgement_history
449-
writeOneEntityStage((*v1.HistoryAck)(nil)), // history (depends on acknowledgement_history)
450+
SyncPipelineNotification: {
451+
writeOneEntityStage((*v1.NotificationHistory)(nil)), // notification_history
452+
userNotificationStage, // user_notification_history (depends on notification_history)
453+
writeOneEntityStage((*v1.HistoryNotification)(nil)), // history (depends on notification_history)
454+
},
455+
SyncPipelineState: {
456+
writeOneEntityStage((*v1.StateHistory)(nil)), // state_history
457+
writeOneEntityStage((*v1.HistoryState)(nil)), // history (depends on state_history)
458+
writeMultiEntityStage(stateHistoryToSlaEntity), // sla_history_state
450459
},
451460
}

pkg/icingadb/v1/history/downtime.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,12 @@ func (*HistoryDowntime) TableName() string {
8181
return "history"
8282
}
8383

84+
type DowntimeHistoryMeta struct {
85+
DowntimeHistoryEntity `json:",inline"`
86+
DowntimeHistory `json:",inline"`
87+
HistoryMeta `json:",inline"`
88+
}
89+
8490
type SlaHistoryDowntime struct {
8591
DowntimeHistoryEntity `json:",inline"`
8692
HistoryTableMeta `json:",inline"`

0 commit comments

Comments
 (0)