@@ -22,26 +22,60 @@ const (
22
22
)
23
23
24
24
type Trigger struct {
25
+ initialised bool
25
26
lock sync.RWMutex
26
27
runningCommands map [string ]context.CancelFunc
27
28
triggerConfigs []config.Trigger
29
+ subscriptions map [string ]subscription
30
+ subscribeQOS byte
28
31
publishQOS byte
29
32
30
33
Executor * cmd.CommandExecutor
31
34
MqttClient MQTT.Client
32
35
}
33
36
37
+ type subscription struct {
38
+ trigger config.Trigger
39
+ handler MQTT.MessageHandler
40
+ }
41
+
34
42
func (t * Trigger ) Initialise (subscribeQOS , publishQOS byte , triggerConfigs []config.Trigger ) {
43
+ t .subscribeQOS = subscribeQOS
35
44
t .publishQOS = publishQOS
36
45
t .runningCommands = map [string ]context.CancelFunc {}
46
+ t .subscriptions = map [string ]subscription {}
37
47
t .triggerConfigs = triggerConfigs //safe the configs so that we can unsubscribe later (see Close func)
38
48
39
49
for _ , triggerConf := range triggerConfigs {
40
- t .MqttClient .Subscribe (triggerConf .Topic , subscribeQOS , t .createTriggerHandler (triggerConf ))
50
+ t .subscriptions [triggerConf .Topic ] = subscription {
51
+ trigger : triggerConf ,
52
+ handler : t .createTriggerHandler (triggerConf ),
53
+ }
54
+
55
+ t .MqttClient .Subscribe (triggerConf .Topic , subscribeQOS , t .subscriptions [triggerConf .Name ].handler )
41
56
42
57
//publish the stopped state on startup
43
58
t .publishStatus (triggerConf .Topic , PayloadStatusStopped )
44
59
}
60
+
61
+ t .initialised = true
62
+ }
63
+
64
+ func (t * Trigger ) IsInitialised () bool {
65
+ return t .initialised
66
+ }
67
+
68
+ func (t * Trigger ) ReInitialise () {
69
+ for triggerName , subscription := range t .subscriptions {
70
+ t .MqttClient .Subscribe (subscription .trigger .Topic , t .subscribeQOS , subscription .handler )
71
+
72
+ //publish the current state on reinitialisation
73
+ if t .isCommandRunning (triggerName ) {
74
+ t .publishStatus (subscription .trigger .Topic , PayloadStatusRunning )
75
+ } else {
76
+ t .publishStatus (subscription .trigger .Topic , PayloadStatusStopped )
77
+ }
78
+ }
45
79
}
46
80
47
81
func (t * Trigger ) createTriggerHandler (triggerConfig config.Trigger ) MQTT.MessageHandler {
@@ -56,14 +90,14 @@ func (t *Trigger) createTriggerHandler(triggerConfig config.Trigger) MQTT.Messag
56
90
switch action {
57
91
case PayloadStart :
58
92
//ensure that only one trigger runs at the same time
59
- if t .isCommandRunning (triggerConfig ) {
93
+ if t .isCommandRunning (triggerConfig . Name ) {
60
94
zap .L ().Warn ("Command is already running. Skip execution!" , zap .String ("trigger" , triggerConfig .Name ))
61
95
return
62
96
}
63
97
64
98
go t .executeCommand (message .Topic (), triggerConfig )
65
99
case PayloadStop :
66
- if ! t .isCommandRunning (triggerConfig ) {
100
+ if ! t .isCommandRunning (triggerConfig . Name ) {
67
101
//no command running -> no action
68
102
return
69
103
}
@@ -75,12 +109,12 @@ func (t *Trigger) createTriggerHandler(triggerConfig config.Trigger) MQTT.Messag
75
109
}
76
110
}
77
111
78
- func (t * Trigger ) isCommandRunning (trigger config. Trigger ) bool {
112
+ func (t * Trigger ) isCommandRunning (triggerName string ) bool {
79
113
//we only need read access
80
114
t .lock .RLock ()
81
115
defer t .lock .RUnlock ()
82
116
83
- _ , exist := t .runningCommands [trigger . Name ]
117
+ _ , exist := t .runningCommands [triggerName ]
84
118
return exist
85
119
}
86
120
0 commit comments