Skip to content

Commit d5c8047

Browse files
committed
Make null value to fail to produce
To avoid producing null value messages
1 parent e24b378 commit d5c8047

File tree

2 files changed

+24
-6
lines changed

2 files changed

+24
-6
lines changed

produce.go

+10-6
Original file line numberDiff line numberDiff line change
@@ -200,13 +200,17 @@ func (cmd *produceCmd) makeSaramaMessage(msg producerMessage) (*sarama.ProducerM
200200
}
201201
sm.Key = sarama.ByteEncoder(key)
202202
}
203-
if msg.Value != nil {
204-
value, err := cmd.decodeValue(msg.Value)
205-
if err != nil {
206-
return nil, fmt.Errorf("cannot decode value: %v", err)
207-
}
208-
sm.Value = sarama.ByteEncoder(value)
203+
204+
if msg.Value == nil {
205+
return nil, fmt.Errorf(`empty "value" JSON key to produce: %+v`, msg)
209206
}
207+
208+
value, err := cmd.decodeValue(msg.Value)
209+
if err != nil {
210+
return nil, fmt.Errorf("cannot decode value: %v", err)
211+
}
212+
sm.Value = sarama.ByteEncoder(value)
213+
210214
if msg.Timestamp != nil {
211215
sm.Timestamp = *msg.Timestamp
212216
}

produce_test.go

+14
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"encoding/json"
55
"fmt"
66
"testing"
7+
"time"
78

89
"github.com/Shopify/sarama"
910
qt "github.com/frankban/quicktest"
@@ -128,6 +129,19 @@ func TestMakeSaramaMessage(t *testing.T) {
128129
c.Assert(err, qt.IsNil)
129130
c.Assert(gotValue, qt.ContentEquals, expectedBlob)
130131
})
132+
133+
c.Run("Null value should fail", func(c *qt.C) {
134+
cmd := new(produceCmd)
135+
cmd.decodeKey, err = cmd.decoderForType("key", "string")
136+
c.Assert(err, qt.IsNil)
137+
pn := int32(1)
138+
for _, pmsg := range []producerMessage{{}, {Key: json.RawMessage(`"k"`)}, {Partition: &pn}, {Timestamp: &time.Time{}}} {
139+
msg, err := cmd.makeSaramaMessage(pmsg)
140+
c.Assert(err, qt.IsNotNil)
141+
c.Assert(err, qt.ErrorMatches, `empty "value" JSON key to produce:.*`)
142+
c.Assert(msg, qt.IsNil)
143+
}
144+
})
131145
}
132146

133147
func TestDeserializeLines(t *testing.T) {

0 commit comments

Comments
 (0)