Skip to content

Commit 0233856

Browse files
committed
Fix: bug that cannot catch produce error, now checking by monitoring errors of each producer
1 parent b6fe589 commit 0233856

File tree

1 file changed

+37
-1
lines changed

1 file changed

+37
-1
lines changed

internal/producer.go

+37-1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ import (
99

1010
// ProducerPool is a pool of producers that can be used to produce messages to Kafka for one set of brokers.
1111
// It is not related to Transaction, Transactional Producer implements by configProvider.
12+
//
13+
// Will be overridden Returns.Errors to true, because of monitoring errors.
1214
type ProducerPool struct {
1315
locker sync.Mutex
1416
producers map[common.Topic][]sarama.AsyncProducer
@@ -84,11 +86,45 @@ func NewProducerPool(brokers []string, configProvider func() *sarama.Config) *Pr
8486
}
8587

8688
func (p *ProducerPool) generateProducer() sarama.AsyncProducer {
87-
producer, err := sarama.NewAsyncProducer(p.brokers, p.configProvider())
89+
cfg := p.configProvider()
90+
cfg.Producer.Return.Errors = true
91+
92+
producer, err := sarama.NewAsyncProducer(p.brokers, cfg)
8893
if err != nil {
8994
fmt.Println("Error creating producer", err)
9095
return nil
9196
}
9297

98+
go p.monitorErrors(producer)
99+
93100
return producer
94101
}
102+
103+
func (p *ProducerPool) monitorErrors(producer sarama.AsyncProducer) {
104+
for err := range producer.Errors() {
105+
fmt.Println("ERROR! Failed to produce message:", err.Err)
106+
if err.Err == sarama.ErrShuttingDown {
107+
if err.Msg == nil {
108+
// 프로듀서가 꺼졌는데 메시지가 남아있지 않은 경우에만 break 합니다.
109+
// 메세지가 남아있는 경우 재발행해야 되기 때문에 break 하면 안됩니다.
110+
break
111+
}
112+
}
113+
114+
// 재발행.
115+
msg := err.Msg
116+
p.republishMessage(msg)
117+
}
118+
}
119+
120+
// republishMessage republishes a message that failed to be produced.
121+
func (p *ProducerPool) republishMessage(msg *sarama.ProducerMessage) {
122+
// Republish message
123+
producer := p.Take(common.Topic{Name: msg.Topic, Partition: msg.Partition})
124+
if producer != nil {
125+
producer.Input() <- msg
126+
p.Return(producer, common.Topic{Name: msg.Topic, Partition: msg.Partition})
127+
} else {
128+
p.republishMessage(msg)
129+
}
130+
}

0 commit comments

Comments
 (0)