File tree 1 file changed +21
-2
lines changed
1 file changed +21
-2
lines changed Original file line number Diff line number Diff line change @@ -2,12 +2,16 @@ package kafkaqueue
2
2
3
3
import (
4
4
"context"
5
+ "encoding/json"
5
6
"errors"
6
7
"github.com/IBM/sarama"
7
8
"github.com/violetpay-org/event-queue/queue"
8
9
"log"
10
+ "sync/atomic"
9
11
)
10
12
13
+ var count = & atomic.Int64 {}
14
+
11
15
type ConsumeOperator [Msg any ] struct {
12
16
serializer queue.MessageSerializer [* sarama.ConsumerMessage , Msg ]
13
17
callback queue.Callback [Msg ]
@@ -304,16 +308,31 @@ func (k *BytesProduceOperator) Produce(message []byte) error {
304
308
return errors .New ("message is nil" )
305
309
}
306
310
307
- log .Print ("Before Produce Message:" + string (k .topic ))
311
+ var test testStruct
312
+ err := json .Unmarshal (message , & test )
313
+ if err != nil {
314
+ return err
315
+ }
308
316
309
- _ , _ , err := producer .SendMessage (& sarama.ProducerMessage {
317
+ if test .status == "success" {
318
+ count .Add (1 )
319
+ }
320
+
321
+ _ , _ , err = producer .SendMessage (& sarama.ProducerMessage {
310
322
Topic : k .topic ,
311
323
Value : sarama .ByteEncoder (message ),
312
324
})
313
325
326
+ log .Print ("Produced. BeforeProduce Count (썩세스만): " , count .Load ())
327
+
314
328
if err != nil {
315
329
return err
316
330
}
317
331
318
332
return nil
319
333
}
334
+
335
+ type testStruct struct {
336
+ requestId string
337
+ status string
338
+ }
You can’t perform that action at this time.
0 commit comments