Skip to content

Commit e7db849

Browse files
committed
Refactor: returns consumer running status
1 parent becfc46 commit e7db849

File tree

3 files changed

+15
-3
lines changed

3 files changed

+15
-3
lines changed

internal/queue/kafka/consumer.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import (
77

88
type Consumer struct {
99
callback func(*sarama.ConsumerMessage)
10-
ready chan bool
10+
ready chan bool // 채널이 존재한다면 준비 상태, 채널이 닫혔다면 구동되고 있는 상태
1111
}
1212

1313
func NewConsumer(callback func(message *sarama.ConsumerMessage)) *Consumer {
@@ -45,6 +45,7 @@ func (c *Consumer) Setup(session sarama.ConsumerGroupSession) error {
4545

4646
// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited.
4747
func (c *Consumer) Cleanup(session sarama.ConsumerGroupSession) error {
48+
c.ready = make(chan bool)
4849
log.Print("Consumer stopped")
4950
return nil
5051
}

internal/queue/kafka/operator.go

+12-2
Original file line numberDiff line numberDiff line change
@@ -67,10 +67,10 @@ func (k *ConsumeOperator[Msg]) Init() {
6767
func (k *ConsumeOperator[Msg]) Running() bool {
6868
select {
6969
case _, ok := <-k.consumer.ready:
70-
if !ok { // Channel 닫힘, Ready 됐다는 거
70+
if !ok { // Channel 닫힘, 가동 중 (Ready X)
7171
return true
7272
}
73-
default: // Channel 안 닫힘, Ready 인 됐다는 거
73+
default: // Channel 열림, Consume 가능 상태 (Ready)
7474
return false
7575
}
7676

@@ -90,7 +90,17 @@ func (k *ConsumeOperator[Msg]) StartConsume() error {
9090
ctx, cancel := context.WithCancel(context.Background())
9191
k.cancel = &cancel
9292

93+
// TODO: Race Condition 발생 가능성 있음 (Consume 직후랑 Running 체크 사이에 상태 변경되어 다중으로 실행될 수 있음)
94+
if k.Running() {
95+
return errors.New("already running")
96+
}
97+
9398
go func() {
99+
// NOTE: Ctx Err 시에도 Consumer Cleanup 은 무조건 발생, 아래 라인을 굳이 또 실행할 필요가 없습니다.
100+
//defer func() {
101+
// k.consumer.ready = make(chan bool)
102+
//}()
103+
94104
for {
95105
if err := client.Consume(ctx, []string{k.topic}, k.consumer); err != nil {
96106
if errors.Is(err, sarama.ErrClosedConsumerGroup) {

queue/operator.go

+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ type ConsumeOperator[InMsg any, Msg any] interface {
55
Serializer() MessageSerializer[InMsg, Msg]
66
Callback() Callback[Msg]
77
Consume(message InMsg)
8+
Running() bool
89
StartConsume() error
910
StopConsume() error
1011
}

0 commit comments

Comments
 (0)