@@ -39,9 +39,15 @@ func NewSync(db *database.DB, redis *redis.Client, logger *logging.Logger) *Sync
3939
4040// Sync synchronizes Redis history streams from s.redis to s.db and deletes the original data on success. 
4141// 
42- // If not nil, the callback function is appended to each synchronization pipeline and called before the entry is deleted 
43- // from Redis. 
44- func  (s  Sync ) Sync (ctx  context.Context , callback  func (database.Entity )) error  {
42+ // An optional callback and callbackKeyStructPtr might be given. Both most either be nil or not nil. 
43+ // 
44+ // The callbackKeyStructPtr says which pipeline keys should be mapped to which type, identified by a struct pointer. If 
45+ // a key is missing from the map, it will not be used for the callback. The callback function itself shall not block. 
46+ func  (s  Sync ) Sync (ctx  context.Context , callbackKeyStructPtr  map [string ]any , callback  func (database.Entity )) error  {
47+ 	if  (callbackKeyStructPtr  ==  nil ) !=  (callback  ==  nil ) {
48+ 		return  fmt .Errorf ("either both callbackKeyStructPtr and callback must be nil or none" )
49+ 	}
50+ 
4551	g , ctx  :=  errgroup .WithContext (ctx )
4652
4753	for  key , pipeline  :=  range  syncPipelines  {
@@ -67,8 +73,13 @@ func (s Sync) Sync(ctx context.Context, callback func(database.Entity)) error {
6773		// forward the entry after it has completed its own sync so that later stages can rely on previous stages being 
6874		// executed successfully. 
6975
70- 		if  callback  !=  nil  {
71- 			pipeline  =  append (pipeline , makeCallbackStageFunc (callback ))
76+ 		// Shadowed variable to allow appending custom callbacks. 
77+ 		pipeline  :=  pipeline 
78+ 		if  callbackKeyStructPtr  !=  nil  {
79+ 			_ , ok  :=  callbackKeyStructPtr [key ]
80+ 			if  ok  {
81+ 				pipeline  =  append (pipeline , makeCallbackStageFunc (callbackKeyStructPtr , callback ))
82+ 			}
7283		}
7384
7485		ch  :=  make ([]chan  redis.XMessage , len (pipeline )+ 1 )
@@ -364,28 +375,17 @@ func userNotificationStage(ctx context.Context, s Sync, key string, in <-chan re
364375
365376// makeCallbackStageFunc creates a new stageFunc calling the given callback function for each message. 
366377// 
378+ // The keyStructPtrs map decides what kind of database.Entity type will be used for the input data based on the key. 
379+ // 
367380// The callback call is blocking and the message will be forwarded to the out channel after the function has returned. 
368381// Thus, please ensure this function does not block too long. 
369- func  makeCallbackStageFunc (callback  func (database.Entity )) stageFunc  {
382+ func  makeCallbackStageFunc (keyStructPtrs   map [ string ] any ,  callback  func (database.Entity )) stageFunc  {
370383	return  func (ctx  context.Context , _  Sync , key  string , in  <- chan  redis.XMessage , out  chan <-  redis.XMessage ) error  {
371384		defer  close (out )
372385
373- 		var  structPtr  database.Entity 
374- 		switch  key  { // keep in sync with syncPipelines below 
375- 		case  "notification" :
376- 			structPtr  =  (* v1 .NotificationHistory )(nil )
377- 		case  "state" :
378- 			structPtr  =  (* v1 .StateHistory )(nil )
379- 		case  "downtime" :
380- 			structPtr  =  (* v1 .DowntimeHistory )(nil )
381- 		case  "comment" :
382- 			structPtr  =  (* v1 .CommentHistory )(nil )
383- 		case  "flapping" :
384- 			structPtr  =  (* v1 .FlappingHistory )(nil )
385- 		case  "acknowledgement" :
386- 			structPtr  =  (* v1 .AcknowledgementHistory )(nil )
387- 		default :
388- 			return  fmt .Errorf ("unsupported key %q" , key )
386+ 		structPtr , ok  :=  keyStructPtrs [key ]
387+ 		if  ! ok  {
388+ 			return  fmt .Errorf ("can't lookup struct pointer for key %q" , key )
389389		}
390390
391391		structifier  :=  structify .MakeMapStructifier (
@@ -402,7 +402,7 @@ func makeCallbackStageFunc(callback func(database.Entity)) stageFunc {
402402
403403				val , err  :=  structifier (msg .Values )
404404				if  err  !=  nil  {
405- 					return  errors .Wrapf (err , "can't structify values %#v for %s " , msg .Values , key )
405+ 					return  errors .Wrapf (err , "can't structify values %#v for %q " , msg .Values , key )
406406				}
407407
408408				entity , ok  :=  val .(database.Entity )
@@ -420,32 +420,41 @@ func makeCallbackStageFunc(callback func(database.Entity)) stageFunc {
420420	}
421421}
422422
423+ const  (
424+ 	SyncPipelineAcknowledgement  =  "acknowledgement" 
425+ 	SyncPipelineComment          =  "comment" 
426+ 	SyncPipelineDowntime         =  "downtime" 
427+ 	SyncPipelineFlapping         =  "flapping" 
428+ 	SyncPipelineNotification     =  "notification" 
429+ 	SyncPipelineState            =  "state" 
430+ )
431+ 
423432var  syncPipelines  =  map [string ][]stageFunc {
424- 	"notification" : {
425- 		writeOneEntityStage ((* v1 .NotificationHistory )(nil )), // notification_history 
426- 		userNotificationStage ,                               // user_notification_history (depends on notification_history) 
427- 		writeOneEntityStage ((* v1 .HistoryNotification )(nil )), // history (depends on notification_history) 
433+ 	SyncPipelineAcknowledgement : {
434+ 		writeOneEntityStage ((* v1 .AcknowledgementHistory )(nil )), // acknowledgement_history 
435+ 		writeOneEntityStage ((* v1 .HistoryAck )(nil )),             // history (depends on acknowledgement_history) 
428436	},
429- 	"state" : {
430- 		writeOneEntityStage ((* v1 .StateHistory )(nil )),   // state_history 
431- 		writeOneEntityStage ((* v1 .HistoryState )(nil )),   // history (depends on state_history) 
432- 		writeMultiEntityStage (stateHistoryToSlaEntity ), // sla_history_state 
437+ 	SyncPipelineComment : {
438+ 		writeOneEntityStage ((* v1 .CommentHistory )(nil )), // comment_history 
439+ 		writeOneEntityStage ((* v1 .HistoryComment )(nil )), // history (depends on comment_history) 
433440	},
434- 	"downtime" : {
441+ 	SyncPipelineDowntime : {
435442		writeOneEntityStage ((* v1 .DowntimeHistory )(nil )),    // downtime_history 
436443		writeOneEntityStage ((* v1 .HistoryDowntime )(nil )),    // history (depends on downtime_history) 
437444		writeOneEntityStage ((* v1 .SlaHistoryDowntime )(nil )), // sla_history_downtime 
438445	},
439- 	"comment" : {
440- 		writeOneEntityStage ((* v1 .CommentHistory )(nil )), // comment_history 
441- 		writeOneEntityStage ((* v1 .HistoryComment )(nil )), // history (depends on comment_history) 
442- 	},
443- 	"flapping" : {
446+ 	SyncPipelineFlapping : {
444447		writeOneEntityStage ((* v1 .FlappingHistory )(nil )), // flapping_history 
445448		writeOneEntityStage ((* v1 .HistoryFlapping )(nil )), // history (depends on flapping_history) 
446449	},
447- 	"acknowledgement" : {
448- 		writeOneEntityStage ((* v1 .AcknowledgementHistory )(nil )), // acknowledgement_history 
449- 		writeOneEntityStage ((* v1 .HistoryAck )(nil )),             // history (depends on acknowledgement_history) 
450+ 	SyncPipelineNotification : {
451+ 		writeOneEntityStage ((* v1 .NotificationHistory )(nil )), // notification_history 
452+ 		userNotificationStage ,                               // user_notification_history (depends on notification_history) 
453+ 		writeOneEntityStage ((* v1 .HistoryNotification )(nil )), // history (depends on notification_history) 
454+ 	},
455+ 	SyncPipelineState : {
456+ 		writeOneEntityStage ((* v1 .StateHistory )(nil )),   // state_history 
457+ 		writeOneEntityStage ((* v1 .HistoryState )(nil )),   // history (depends on state_history) 
458+ 		writeMultiEntityStage (stateHistoryToSlaEntity ), // sla_history_state 
450459	},
451460}
0 commit comments