Skip to content

Commit becfc46

Browse files
committed
Feat: function that returns consumer running status
1 parent 07912e2 commit becfc46

File tree

2 files changed

+16
-0
lines changed

2 files changed

+16
-0
lines changed

internal/queue/kafka/consumer.go

+3
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,13 @@ import (
77

88
type Consumer struct {
99
callback func(*sarama.ConsumerMessage)
10+
ready chan bool
1011
}
1112

1213
func NewConsumer(callback func(message *sarama.ConsumerMessage)) *Consumer {
1314
return &Consumer{
1415
callback: callback,
16+
ready: make(chan bool),
1517
}
1618
}
1719

@@ -36,6 +38,7 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram
3638

3739
// Setup is run at the beginning of a new session, before ConsumeClaim.
3840
func (c *Consumer) Setup(session sarama.ConsumerGroupSession) error {
41+
close(c.ready)
3942
log.Print("Consumer up and running")
4043
return nil
4144
}

internal/queue/kafka/operator.go

+13
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,19 @@ func (k *ConsumeOperator[Msg]) Init() {
6464
k.initialized = true
6565
}
6666

67+
func (k *ConsumeOperator[Msg]) Running() bool {
68+
select {
69+
case _, ok := <-k.consumer.ready:
70+
if !ok { // Channel 닫힘, Ready 됐다는 거
71+
return true
72+
}
73+
default: // Channel 안 닫힘, Ready 인 됐다는 거
74+
return false
75+
}
76+
77+
return false
78+
}
79+
6780
func (k *ConsumeOperator[Msg]) StartConsume() error {
6881
if !k.initialized {
6982
k.Init()

0 commit comments

Comments
 (0)