Skip to content

Commit 345d8af

Browse files
committed
feat: support push with key
1 parent 319b750 commit 345d8af

File tree

3 files changed

+148
-1
lines changed

3 files changed

+148
-1
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ require (
4242
github.com/prometheus/procfs v0.12.0 // indirect
4343
github.com/redis/go-redis/v9 v9.5.3 // indirect
4444
github.com/spaolacci/murmur3 v1.1.0 // indirect
45+
github.com/stretchr/objx v0.5.2 // indirect
4546
go.opentelemetry.io/otel/exporters/jaeger v1.17.0 // indirect
4647
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.19.0 // indirect
4748
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.19.0 // indirect

kq/pusher.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,16 @@ type (
1616
PushOption func(options *pushOptions)
1717

1818
Pusher struct {
19-
producer *kafka.Writer
2019
topic string
20+
producer kafkaWriter
2121
executor *executors.ChunkExecutor
2222
}
2323

24+
kafkaWriter interface {
25+
Close() error
26+
WriteMessages(ctx context.Context, msgs ...kafka.Message) error
27+
}
28+
2429
pushOptions struct {
2530
// kafka.Writer options
2631
allowAutoTopicCreation bool

kq/pusher_test.go

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
package kq
2+
3+
import (
4+
"context"
5+
"errors"
6+
"testing"
7+
"time"
8+
9+
"github.com/segmentio/kafka-go"
10+
"github.com/stretchr/testify/assert"
11+
"github.com/stretchr/testify/mock"
12+
)
13+
14+
// MockKafkaWriter is a mock for kafka.Writer
15+
type MockKafkaWriter struct {
16+
mock.Mock
17+
}
18+
19+
func (m *MockKafkaWriter) WriteMessages(ctx context.Context, msgs ...kafka.Message) error {
20+
args := m.Called(ctx, msgs)
21+
return args.Error(0)
22+
}
23+
24+
func (m *MockKafkaWriter) Close() error {
25+
args := m.Called()
26+
return args.Error(0)
27+
}
28+
29+
func TestNewPusher(t *testing.T) {
30+
addrs := []string{"localhost:9092"}
31+
topic := "test-topic"
32+
33+
t.Run("DefaultOptions", func(t *testing.T) {
34+
pusher := NewPusher(addrs, topic)
35+
assert.NotNil(t, pusher)
36+
assert.NotNil(t, pusher.producer)
37+
assert.Equal(t, topic, pusher.topic)
38+
assert.NotNil(t, pusher.executor)
39+
})
40+
41+
t.Run("WithSyncPush", func(t *testing.T) {
42+
pusher := NewPusher(addrs, topic, WithSyncPush())
43+
assert.NotNil(t, pusher)
44+
assert.NotNil(t, pusher.producer)
45+
assert.Equal(t, topic, pusher.topic)
46+
assert.Nil(t, pusher.executor)
47+
})
48+
49+
t.Run("WithChunkSize", func(t *testing.T) {
50+
pusher := NewPusher(addrs, topic, WithChunkSize(100))
51+
assert.NotNil(t, pusher)
52+
assert.NotNil(t, pusher.executor)
53+
})
54+
55+
t.Run("WithFlushInterval", func(t *testing.T) {
56+
pusher := NewPusher(addrs, topic, WithFlushInterval(time.Second))
57+
assert.NotNil(t, pusher)
58+
assert.NotNil(t, pusher.executor)
59+
})
60+
61+
t.Run("WithAllowAutoTopicCreation", func(t *testing.T) {
62+
pusher := NewPusher(addrs, topic, WithAllowAutoTopicCreation())
63+
assert.NotNil(t, pusher)
64+
assert.True(t, pusher.producer.(*kafka.Writer).AllowAutoTopicCreation)
65+
})
66+
}
67+
68+
func TestPusher_Close(t *testing.T) {
69+
mockWriter := new(MockKafkaWriter)
70+
pusher := &Pusher{
71+
producer: mockWriter,
72+
}
73+
74+
mockWriter.On("Close").Return(nil)
75+
76+
err := pusher.Close()
77+
assert.NoError(t, err)
78+
mockWriter.AssertExpectations(t)
79+
}
80+
81+
func TestPusher_Name(t *testing.T) {
82+
topic := "test-topic"
83+
pusher := &Pusher{topic: topic}
84+
85+
assert.Equal(t, topic, pusher.Name())
86+
}
87+
88+
func TestPusher_Push(t *testing.T) {
89+
mockWriter := new(MockKafkaWriter)
90+
pusher := &Pusher{
91+
producer: mockWriter,
92+
topic: "test-topic",
93+
}
94+
95+
ctx := context.Background()
96+
value := "test-value"
97+
98+
mockWriter.On("WriteMessages", mock.Anything, mock.AnythingOfType("[]kafka.Message")).Return(nil)
99+
100+
err := pusher.Push(ctx, value)
101+
assert.NoError(t, err)
102+
mockWriter.AssertExpectations(t)
103+
}
104+
105+
func TestPusher_PushWithKey(t *testing.T) {
106+
mockWriter := new(MockKafkaWriter)
107+
pusher := &Pusher{
108+
producer: mockWriter,
109+
topic: "test-topic",
110+
}
111+
112+
ctx := context.Background()
113+
key := "test-key"
114+
value := "test-value"
115+
116+
mockWriter.On("WriteMessages", mock.Anything, mock.AnythingOfType("[]kafka.Message")).Return(nil)
117+
118+
err := pusher.PushWithKey(ctx, key, value)
119+
assert.NoError(t, err)
120+
mockWriter.AssertExpectations(t)
121+
}
122+
123+
func TestPusher_PushWithKey_Error(t *testing.T) {
124+
mockWriter := new(MockKafkaWriter)
125+
pusher := &Pusher{
126+
producer: mockWriter,
127+
topic: "test-topic",
128+
}
129+
130+
ctx := context.Background()
131+
key := "test-key"
132+
value := "test-value"
133+
134+
expectedError := errors.New("write error")
135+
mockWriter.On("WriteMessages", mock.Anything, mock.AnythingOfType("[]kafka.Message")).Return(expectedError)
136+
137+
err := pusher.PushWithKey(ctx, key, value)
138+
assert.Error(t, err)
139+
assert.Equal(t, expectedError, err)
140+
mockWriter.AssertExpectations(t)
141+
}

0 commit comments

Comments
 (0)