Skip to content

Commit 176e244

Browse files
committed
Feat: manual commit kafka ConsumeOperator implemented
1 parent 7e89e17 commit 176e244

File tree

5 files changed

+222
-0
lines changed

5 files changed

+222
-0
lines changed

internal/queue/kafka/consumer.go

+47
Original file line numberDiff line numberDiff line change
@@ -48,3 +48,50 @@ func (c *Consumer) Cleanup(session sarama.ConsumerGroupSession) error {
4848
func (c *Consumer) Callback() func(*sarama.ConsumerMessage) {
4949
return c.callback
5050
}
51+
52+
type MarkableConsumer struct {
53+
callback func(*sarama.ConsumerMessage, func())
54+
}
55+
56+
func NewMarkableConsumer(callback func(message *sarama.ConsumerMessage, ack func())) *MarkableConsumer {
57+
return &MarkableConsumer{
58+
callback: callback,
59+
}
60+
}
61+
62+
// ConsumeClaim must start a Consumer loop of ConsumerGroupClaim's Messages().
63+
// NOTE: This must not be called within a goroutine, already handled as goroutines by sarama.
64+
func (c *MarkableConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
65+
for {
66+
select {
67+
case msg, ok := <-claim.Messages():
68+
if !ok {
69+
// 메세지 채널 닫혔을 때
70+
return nil
71+
}
72+
73+
c.callback(msg, func() {
74+
session.MarkMessage(msg, "")
75+
})
76+
77+
case <-session.Context().Done():
78+
return nil
79+
}
80+
}
81+
}
82+
83+
// Setup is run at the beginning of a new session, before ConsumeClaim.
84+
func (c *MarkableConsumer) Setup(session sarama.ConsumerGroupSession) error {
85+
log.Print("Consumer up and running")
86+
return nil
87+
}
88+
89+
// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited.
90+
func (c *MarkableConsumer) Cleanup(session sarama.ConsumerGroupSession) error {
91+
log.Print("Consumer stopped")
92+
return nil
93+
}
94+
95+
func (c *MarkableConsumer) Callback() func(*sarama.ConsumerMessage, func()) {
96+
return c.callback
97+
}

internal/queue/kafka/operator.go

+143
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"errors"
66
"github.com/IBM/sarama"
77
"github.com/violetpay-org/event-queue/queue"
8+
"log"
89
)
910

1011
type ConsumeOperator[Msg any] struct {
@@ -100,6 +101,148 @@ func (k *ConsumeOperator[Msg]) StopConsume() error {
100101
return nil
101102
}
102103

104+
// AckConsumeOperator 는 Acknowledge (메세지 브로커에게 보내는 메세지 정상 수신 응답) 을 다루는 컨슈머입니다. 실행되는 흐름은 다음과 같습니다.
105+
//
106+
// 1. 내부 컨슈머가 메세지를 받습니다.
107+
//
108+
// 2. 애크 콜백 함수 (AckCallback) 를 호출합니다.
109+
// 만약 AckCallback 함수가 true 를 반환하면, 메세지를 정상적으로 수신했음을 브로커에 알립니다. 이후 다음 과정으로 진행합니다.
110+
// 만약 AckCallback 함수가 false 를 반환하면, 메세지를 수신했음을 브로커에 알리지 않습니다. 이후 다음 과정으로 진행하지 않고 즉시 처리 과정을 종료합니다.
111+
//
112+
// 3. 콜백 함수 (Callback) 를 호출합니다.
113+
//
114+
// 4. 종료합니다.
115+
type AckConsumeOperator[Msg any] struct {
116+
serializer queue.MessageSerializer[*sarama.ConsumerMessage, Msg]
117+
118+
ackCallback queue.AckCallback[Msg]
119+
callback queue.Callback[Msg]
120+
121+
initialized bool
122+
cancel *context.CancelFunc
123+
124+
consumer *MarkableConsumer
125+
brokers []string
126+
topic string
127+
groupId string
128+
config *sarama.Config
129+
}
130+
131+
func NewAckConsumeOperator[Msg any](serializer queue.MessageSerializer[*sarama.ConsumerMessage, Msg], ackCallback queue.AckCallback[Msg], callback queue.Callback[Msg], brokers []string, topic string, groupId string, config *sarama.Config) *AckConsumeOperator[Msg] {
132+
return &AckConsumeOperator[Msg]{
133+
serializer: serializer,
134+
ackCallback: ackCallback,
135+
callback: callback,
136+
brokers: brokers,
137+
topic: topic,
138+
groupId: groupId,
139+
config: config,
140+
}
141+
}
142+
143+
func (k *AckConsumeOperator[Msg]) QueueName() string {
144+
return k.topic
145+
}
146+
147+
func (k *AckConsumeOperator[Msg]) Serializer() queue.MessageSerializer[*sarama.ConsumerMessage, Msg] {
148+
return k.serializer
149+
}
150+
151+
func (k *AckConsumeOperator[Msg]) AckCallback() queue.AckCallback[Msg] {
152+
return k.ackCallback
153+
}
154+
155+
func (k *AckConsumeOperator[Msg]) Callback() queue.Callback[Msg] {
156+
return func(msg Msg) {
157+
k.callback(msg)
158+
}
159+
}
160+
161+
// BeforeConsume 은 Serializer 를 호출한 후 AckCallback 함수를 실행하는 함수입니다.
162+
// 실제 컨슘할 때에는 AckCallback 결과에 따라 Callback 함수를 실행할지 결정합니다.
163+
func (k *AckConsumeOperator[Msg]) BeforeConsume(msg *sarama.ConsumerMessage, ack func()) (sMsg Msg, ok bool) {
164+
var err error
165+
sMsg, err = k.serializer.Serialize(msg)
166+
if err != nil {
167+
log.Fatal("Serializing error: ", err)
168+
return sMsg, false
169+
}
170+
171+
if !k.ackCallback(sMsg) {
172+
return sMsg, false
173+
}
174+
175+
ack()
176+
return sMsg, true
177+
}
178+
179+
// Consume 은 Serializer 를 호출한 후 Callback 함수를 실행하는 함수입니다.
180+
// 주의: 실제 컨슘할 때의 로직과는 다릅니다. 실제로는 BeforeConsume 함수를 통해 AckCallback 함수를 실행한 뒤 Ack 결과에 따라 Callback 을 실행합니다.
181+
// ConsumeOperator 를 구현함과 동시에 테스트 가능성을 높이기 위해 이 함수를 따로 분리했습니다.
182+
func (k *AckConsumeOperator[Msg]) Consume(msg *sarama.ConsumerMessage) {
183+
sMsg, err := k.serializer.Serialize(msg)
184+
if err != nil {
185+
return
186+
}
187+
188+
k.callback(sMsg)
189+
}
190+
191+
func (k *AckConsumeOperator[Msg]) Init() {
192+
consumer := NewMarkableConsumer(func(msg *sarama.ConsumerMessage, ack func()) {
193+
sMsg, ok := k.BeforeConsume(msg, ack)
194+
if !ok {
195+
return
196+
}
197+
198+
// goroutine 으로 실행! BeforeConsume 이 완료되면 Consume된 상태이므로 goroutine으로 실행합니다..
199+
go k.callback(sMsg)
200+
})
201+
202+
k.consumer = consumer
203+
k.initialized = true
204+
}
205+
206+
func (k *AckConsumeOperator[Msg]) StartConsume() error {
207+
if !k.initialized {
208+
k.Init()
209+
}
210+
211+
client, err := sarama.NewConsumerGroup(k.brokers, k.groupId, k.config)
212+
if err != nil {
213+
return err
214+
}
215+
216+
ctx, cancel := context.WithCancel(context.Background())
217+
k.cancel = &cancel
218+
219+
go func() {
220+
for {
221+
if err := client.Consume(ctx, []string{k.topic}, k.consumer); err != nil {
222+
if errors.Is(err, sarama.ErrClosedConsumerGroup) {
223+
return
224+
}
225+
}
226+
227+
if ctx.Err() != nil {
228+
return
229+
}
230+
}
231+
}()
232+
233+
return nil
234+
}
235+
236+
func (k *AckConsumeOperator[Msg]) StopConsume() error {
237+
if k.cancel == nil {
238+
return errors.New("tried to stop consume before starting (or operator no started, but StopConsume called)")
239+
}
240+
241+
(*k.cancel)()
242+
243+
return nil
244+
}
245+
103246
type BytesProduceOperatorCtor struct {
104247
pool *ProducerPool
105248

internal/queue/kafka/operator_test.go

+14
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,13 @@ func mapConsumeOperatorProvider(queueName string, serializer queue.MessageSerial
1212
return NewConsumeOperator(serializer, callback, brokers, queueName, "test-group-id", sarama.NewConfig())
1313
}
1414

15+
func mapAckConsumeOperatorProvider(queueName string, serializer queue.MessageSerializer[*sarama.ConsumerMessage, map[string]string], callback queue.Callback[map[string]string]) queue.ConsumeOperator[*sarama.ConsumerMessage, map[string]string] {
16+
f := func(_ map[string]string) bool {
17+
return true
18+
}
19+
return NewAckConsumeOperator(serializer, f, callback, brokers, queueName, "test-group-id", sarama.NewConfig())
20+
}
21+
1522
func consumeMessageProvider() *sarama.ConsumerMessage {
1623
return &sarama.ConsumerMessage{}
1724
}
@@ -23,7 +30,14 @@ func TestConsumeOperator(t *testing.T) {
2330
mapConsumeOperatorProvider,
2431
consumeMessageProvider,
2532
)
33+
34+
queue.TestSuiteConsumeOperator[*sarama.ConsumerMessage, map[string]string](
35+
t,
36+
mapAckConsumeOperatorProvider,
37+
consumeMessageProvider,
38+
)
2639
})
40+
2741
}
2842

2943
func bytesProduceOperatorProvider(queueName string) queue.ProduceOperator[[]byte] {

queue/callback.go

+14
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,17 @@
11
package queue
22

3+
// Callback 는 메시지를 처리하는 콜백 함수입니다.
34
type Callback[M any] func(req M)
5+
6+
// AckCallback 는 Acknowledge 를 처리하는 콜백 함수 타입입니다. 실행되는 흐름은 아래와 같습니다.
7+
//
8+
// 1. 내부 컨슈머가 메세지를 받습니다.
9+
//
10+
// 2. 애크 콜백 함수 (AckCallback) 를 호출합니다.
11+
// 만약 AckCallback 함수가 true 를 반환하면, 메세지를 정상적으로 수신했음을 브로커에 알립니다. 이후 다음 과정으로 진행합니다.
12+
// 만약 AckCallback 함수가 false 를 반환하면, 메세지를 수신했음을 브로커에 알리지 않습니다. 이후 다음 과정으로 진행하지 않고 즉시 처리 과정을 종료합니다.
13+
//
14+
// 3. 콜백 함수 (Callback) 를 호출합니다.
15+
//
16+
// 4. 종료합니다.
17+
type AckCallback[M any] func(req M) bool

queue/kafka/operator.go

+4
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@ func NewConsumeOperator[Msg any](serializer queue.MessageSerializer[*sarama.Cons
1010
return kafkaqueue.NewConsumeOperator(serializer, callback, brokers, topic, groupId, config)
1111
}
1212

13+
func NewAckConsumeOperator[Msg any](serializer queue.MessageSerializer[*sarama.ConsumerMessage, Msg], ackCallback queue.AckCallback[Msg], callback queue.Callback[Msg], brokers []string, topic string, groupId string, config *sarama.Config) *kafkaqueue.AckConsumeOperator[Msg] {
14+
return kafkaqueue.NewAckConsumeOperator(serializer, ackCallback, callback, brokers, topic, groupId, config)
15+
}
16+
1317
func NewProduceOperatorCtor(brokers []string, configProvider func() *sarama.Config) *kafkaqueue.BytesProduceOperatorCtor {
1418
return kafkaqueue.NewBytesProduceOperatorCtor(brokers, configProvider)
1519
}

0 commit comments

Comments
 (0)