We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
1 parent bdc2415 commit de9f72eCopy full SHA for de9f72e
internal/queue/kafka/operator.go
@@ -12,7 +12,6 @@ type ConsumeOperator[Msg any] struct {
12
callback queue.Callback[Msg]
13
14
initialized bool
15
- running bool
16
cancel *context.CancelFunc
17
18
consumer *Consumer
@@ -106,6 +105,7 @@ type BeforeBytesProduceOperator struct {
106
105
107
func NewBytesProduceOperatorCtor(brokers []string, configProvider func() *sarama.Config) *BeforeBytesProduceOperator {
108
return &BeforeBytesProduceOperator{
+ pool: NewProducerPool(brokers, configProvider),
109
brokers: brokers,
110
configProvider: configProvider,
111
}
0 commit comments