Skip to content

Commit

Permalink
fix reliable acks only processed for one tx type
Browse files Browse the repository at this point in the history
  • Loading branch information
patrickdemers6 committed Feb 19, 2025
1 parent 933d124 commit fc3e6eb
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 1 deletion.
5 changes: 4 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,10 @@ func (c *Config) configureReliableAckSources() (map[telemetry.Dispatcher]map[str
for _, dispatcher := range validDispatchers {
if dispatcher == dispatchRule {
dispatchRuleFound = true
reliableAckSources[dispatchRule] = map[string]interface{}{txType: true}
if _, ok := reliableAckSources[dispatchRule]; !ok {
reliableAckSources[dispatchRule] = make(map[string]interface{}, 1)
}
reliableAckSources[dispatchRule][txType] = true
break
}
}
Expand Down
12 changes: 12 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,18 @@ var _ = Describe("Test full application config", func() {
})

Context("configure reliable acks", func() {
It("configures each datasource", func() {
config, err := loadTestApplicationConfig(TestMultipleTxTypeReliableAckConfig)
Expect(err).NotTo(HaveOccurred())

reliableAcks, err := config.configureReliableAckSources()
Expect(err).ToNot(HaveOccurred())
Expect(reliableAcks["kafka"]).To(HaveLen(2))
Expect(reliableAcks["kafka"]["V"]).To(BeTrue())
Expect(reliableAcks["kafka"]["errors"]).To(BeTrue())
Expect(reliableAcks["mqtt"]).To(HaveLen(1))
Expect(reliableAcks["mqtt"]["alerts"]).To(BeTrue())
})

DescribeTable("fails",
func(configInput string, errMessage string) {
Expand Down
30 changes: 30 additions & 0 deletions config/test_configs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,3 +283,33 @@ const TestBadTxTypeReliableAckConfig = `
}
}
`

const TestMultipleTxTypeReliableAckConfig = `
{
"host": "127.0.0.1",
"port": 443,
"status_port": 8080,
"namespace": "tesla_telemetry",
"reliable_ack_sources": {
"V": "kafka",
"errors": "kafka",
"alerts": "mqtt"
},
"kafka": {
"bootstrap.servers": "some.broker1:9093,some.broker1:9093",
"ssl.ca.location": "kafka.ca",
"ssl.certificate.location": "kafka.crt",
"ssl.key.location": "kafka.key",
"queue.buffering.max.messages": 1000000
},
"records": {
"V": ["kafka"],
"errors": ["kafka", "mqtt"],
"alerts": ["mqtt"]
},
"tls": {
"server_cert": "your_own_cert.crt",
"server_key": "your_own_key.key"
}
}
`

0 comments on commit fc3e6eb

Please sign in to comment.