Skip to content

Commit 914260d

Browse files
committed
Test: domain produce operator
1 parent c934bcf commit 914260d

File tree

3 files changed

+65
-16
lines changed

3 files changed

+65
-16
lines changed

internal/queue/kafka/operator_test.go

+22-4
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,38 @@ import (
88

99
var brokers = []string{"kafka.vp-datacenter-1.violetpay.net:9092", "kafka.vp-datacenter-1.violetpay.net:9093", "kafka.vp-datacenter-1.violetpay.net:9094"}
1010

11-
func mapOperatorProvider(queueName string, serializer queue.MessageSerializer[*sarama.ConsumerMessage, map[string]string], callback queue.Callback[map[string]string]) queue.ConsumeOperator[*sarama.ConsumerMessage, map[string]string] {
11+
func mapConsumeOperatorProvider(queueName string, serializer queue.MessageSerializer[*sarama.ConsumerMessage, map[string]string], callback queue.Callback[map[string]string]) queue.ConsumeOperator[*sarama.ConsumerMessage, map[string]string] {
1212
return NewConsumeOperator(serializer, callback, brokers, queueName, "test-group-id", sarama.NewConfig())
1313
}
1414

15-
func rawMsgProvider() *sarama.ConsumerMessage {
15+
func consumeMessageProvider() *sarama.ConsumerMessage {
1616
return &sarama.ConsumerMessage{}
1717
}
1818

1919
func TestConsumeOperator(t *testing.T) {
2020
t.Run("map[string]string", func(t *testing.T) {
2121
queue.TestSuiteConsumeOperator[*sarama.ConsumerMessage, map[string]string](
2222
t,
23-
mapOperatorProvider,
24-
rawMsgProvider,
23+
mapConsumeOperatorProvider,
24+
consumeMessageProvider,
2525
)
2626
})
2727
}
28+
29+
func bytesProduceOperatorProvider(queueName string) queue.ProduceOperator[[]byte] {
30+
return NewBytesProduceOperator(brokers, queueName, func() *sarama.Config {
31+
return sarama.NewConfig()
32+
})
33+
}
34+
35+
func produceBytesMessageProvider() []byte {
36+
return []byte("test")
37+
}
38+
39+
func TestBytesProduceOperator(t *testing.T) {
40+
queue.TestSuiteProduceOperator(
41+
t,
42+
bytesProduceOperatorProvider,
43+
produceBytesMessageProvider,
44+
)
45+
}

queue/operator.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ type ConsumeOperator[InMsg any, Msg any] interface {
44
QueueName() string
55
Serializer() MessageSerializer[InMsg, Msg]
66
Callback() Callback[Msg]
7-
Consume(msg InMsg)
7+
Consume(message InMsg)
88
StartConsume() error
99
StopConsume() error
1010
}

queue/operator_tests.go

+42-11
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ func newMockSerializer[Input any, Output any]() *mockSerializer[Input, Output] {
2828
func TestSuiteConsumeOperator[InMsg any, Msg any](
2929
t *testing.T,
3030
operatorProvider func(queueName string, serializer MessageSerializer[InMsg, Msg], callback Callback[Msg]) ConsumeOperator[InMsg, Msg],
31-
rawMessageProvider func() InMsg,
31+
testMessageProvider func() InMsg,
3232
) {
3333
t.Run("ConsumeOperator Test Suite", func(t *testing.T) {
3434
var queueName string
@@ -72,7 +72,7 @@ func TestSuiteConsumeOperator[InMsg any, Msg any](
7272
})
7373

7474
assert.NotNil(t, operator.Serializer(), serializer)
75-
_, err := serializer.Serialize(rawMessageProvider())
75+
_, err := serializer.Serialize(testMessageProvider())
7676
if err != nil {
7777
return
7878
}
@@ -107,7 +107,7 @@ func TestSuiteConsumeOperator[InMsg any, Msg any](
107107
operator = operatorProvider(queueName, serializer, callback)
108108
})
109109

110-
msg := rawMessageProvider()
110+
msg := testMessageProvider()
111111
assert.NotNil(t, msg)
112112

113113
expectedSerializedValue, err := serializer.Serialize(msg)
@@ -132,10 +132,8 @@ func TestSuiteConsumeOperator[InMsg any, Msg any](
132132
operator = operatorProvider(queueName, serializer, callback)
133133
})
134134

135-
assert.NotPanics(t, func() {
136-
operator.StartConsume()
137-
})
138-
135+
err := operator.StartConsume()
136+
assert.Nil(t, err)
139137
})
140138

141139
t.Run("StopConsume", func(t *testing.T) {
@@ -150,12 +148,45 @@ func TestSuiteConsumeOperator[InMsg any, Msg any](
150148
operator = operatorProvider(queueName, serializer, callback)
151149
})
152150

153-
assert.NotPanics(t, func() {
154-
operator.StartConsume()
151+
err := operator.StartConsume()
152+
assert.Nil(t, err)
153+
154+
err = operator.StopConsume()
155+
assert.Nil(t, err)
156+
})
157+
})
158+
}
159+
160+
func TestSuiteProduceOperator[Msg any](
161+
t *testing.T,
162+
operatorProvider func(queueName string) ProduceOperator[Msg],
163+
testMessageProvider func() Msg,
164+
) {
165+
t.Run("ProduceOperator Test Suite", func(t *testing.T) {
166+
queueName := "testQueueName"
167+
var operator ProduceOperator[Msg]
168+
169+
t.Run("QueueName", func(t *testing.T) {
170+
t.Cleanup(func() {
171+
queueName = "testQueueName"
172+
operator = nil
155173
})
156174

157-
assert.NotPanics(t, func() {
158-
operator.StopConsume()
175+
operator = operatorProvider(queueName)
176+
assert.Equal(t, operator.QueueName(), queueName)
177+
})
178+
179+
t.Run("Produce", func(t *testing.T) {
180+
t.Run("Produce with valid message", func(t *testing.T) {
181+
t.Cleanup(func() {
182+
queueName = "testQueueName"
183+
operator = nil
184+
})
185+
186+
operator = operatorProvider(queueName)
187+
msg := testMessageProvider()
188+
err := operator.Produce(msg)
189+
assert.Nil(t, err)
159190
})
160191
})
161192
})

0 commit comments

Comments
 (0)