Skip to content

Commit 8e247c2

Browse files
committed
refactored a bit
1 parent 62eb672 commit 8e247c2

File tree

7 files changed

+60
-26
lines changed

7 files changed

+60
-26
lines changed

client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ type Client struct {
4646
// Limit for message size, applied after compression stage.
4747
//
4848
// If zero, no limit is applied.
49-
MaxMessageSize int64
49+
MaxMessageBytes int64
5050
}
5151

5252
// A ConsumerGroup and Topic as these are both strings we define a type for

produce.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,13 +159,15 @@ func (c *Client) Produce(ctx context.Context, req *ProduceRequest) (*ProduceResp
159159
},
160160
}},
161161
}},
162-
MaxMessageSize_: c.MaxMessageSize,
162+
MaxMessageBytes: c.MaxMessageBytes,
163163
})
164164

165165
switch {
166166
case err == nil:
167167
case errors.Is(err, protocol.ErrNoRecord):
168168
return new(ProduceResponse), nil
169+
case protocol.IsMaxMessageBytesExceeded(err):
170+
return nil, MessageTooLargeError{}
169171
default:
170172
return nil, fmt.Errorf("kafka.(*Client).Produce: %w", err)
171173
}

protocol/error.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package protocol
22

33
import (
4+
"errors"
45
"fmt"
56
)
67

@@ -89,3 +90,28 @@ func (e *TopicPartitionError) Error() string {
8990
func (e *TopicPartitionError) Unwrap() error {
9091
return e.Err
9192
}
93+
94+
type MaxMessageBytesExceededError interface {
95+
error
96+
IsMaxMessageBytesExceeded()
97+
}
98+
99+
func IsMaxMessageBytesExceeded(err error) bool {
100+
var target MaxMessageBytesExceededError
101+
return errors.As(err, &target)
102+
}
103+
104+
type baseMaxMessageBytesExceededError struct{ error }
105+
106+
func (b baseMaxMessageBytesExceededError) IsFatal() {}
107+
108+
func (b baseMaxMessageBytesExceededError) Unwrap() error {
109+
return b.error
110+
}
111+
112+
func NewBaseMaxMessageBytesExceededError(err error) error {
113+
if err == nil {
114+
return nil
115+
}
116+
return &baseMaxMessageBytesExceededError{error: err}
117+
}

protocol/produce/produce.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,13 @@ type Request struct {
1616
Timeout int32 `kafka:"min=v0,max=v8"`
1717
Topics []RequestTopic `kafka:"min=v0,max=v8"`
1818

19-
// Use this to store max.message.size
20-
MaxMessageSize_ int64 `kafka:"-"`
19+
// Use this to store max.message.bytes
20+
MaxMessageBytes int64 `kafka:"-"`
2121
}
2222

2323
func (r *Request) ApiKey() protocol.ApiKey { return protocol.Produce }
2424

25-
func (r *Request) MaxMessageSize() int64 { return r.MaxMessageSize_ }
25+
func (r *Request) MaxMessageBytesSize() int64 { return r.MaxMessageBytes }
2626

2727
func (r *Request) Broker(cluster protocol.Cluster) (protocol.Broker, error) {
2828
broker := protocol.Broker{ID: -1}

protocol/protocol.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -527,11 +527,11 @@ type Merger interface {
527527
Merge(messages []Message, results []interface{}) (Message, error)
528528
}
529529

530-
// MaxMessageSizeKeeper is an extension of the Message interface, which aimed
531-
// to store max.message.size parameter
532-
type MaxMessageSizeKeeper interface {
533-
// Returns locally stored max.message.size value
534-
MaxMessageSize() int64
530+
// MaxMessageBytesKeeper is an extension of the Message interface, which aimed
531+
// to store max.message.bytes parameter
532+
type MaxMessageBytesKeeper interface {
533+
// Returns locally stored max.message.bytes value
534+
MaxMessageBytesSize() int64
535535
}
536536

537537
// Result converts r to a Message or an error, or panics if r could not be

protocol/request.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -127,9 +127,9 @@ func WriteRequest(w io.Writer, apiVersion int16, correlationID int32, clientID s
127127
if err == nil {
128128
messageSize := uint32(b.Size()) - 4
129129

130-
if p, _ := msg.(MaxMessageSizeKeeper); p != nil && p.MaxMessageSize() != 0 {
131-
if messageSize > uint32(p.MaxMessageSize()) {
132-
return fmt.Errorf("message size: %d exceeded max.message.size: %d", messageSize, p.MaxMessageSize())
130+
if p, ok := msg.(MaxMessageBytesKeeper); ok && p.MaxMessageBytesSize() != 0 {
131+
if messageSize > uint32(p.MaxMessageBytesSize()) {
132+
return NewBaseMaxMessageBytesExceededError(fmt.Errorf("message size: %d exceeded max.message.bytes: %d", messageSize, p.MaxMessageBytesSize()))
133133
}
134134
}
135135

writer.go

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -127,8 +127,8 @@ type Writer struct {
127127
BatchBytes int64
128128

129129
// Setting this flag to true causes the WriteMessages starts to derive 'BatchBytes'
130-
// from topic 'max.message.size' setting. If writer is used to write to multiple
131-
// topics each topic 'max.message.size' will be handled appropriately.
130+
// from topic 'max.message.bytes' setting. If writer is used to write to multiple
131+
// topics each topic 'max.message.bytes' will be handled appropriately.
132132
// This option simplifies maintaining of architecture - creates the one source of
133133
// truth - topic settings on broker side
134134
//
@@ -138,7 +138,7 @@ type Writer struct {
138138
// Setting this flag to true causes the WriteMessages starts to apply 'BatchBytes'
139139
// as limiting factor after compression stage.
140140
// When this flag is false - it's possible to get case, when Value can exceed
141-
// 'max.message.size' setting, but after compression it's less.
141+
// 'max.message.bytes' setting, but after compression it's less.
142142
// And WriteMessages returns an error, when indeed there are no error.
143143
//
144144
// Nevertheless, 'BatchBytes' also has second function - to form batches, and
@@ -647,15 +647,14 @@ func (w *Writer) WriteMessages(ctx context.Context, msgs ...Message) error {
647647
return nil
648648
}
649649

650+
balancer := w.balancer()
650651
if w.AutoDeriveBatchBytes {
651652
err := w.deriveBatchBytes(msgs)
652653
if err != nil {
653654
return err
654655
}
655656
}
656657

657-
balancer := w.balancer()
658-
659658
if !w.ApplyBatchBytesAfterCompression {
660659
for i := range msgs {
661660
n := int64(msgs[i].totalSize())
@@ -766,7 +765,9 @@ func (w *Writer) produce(key topicPartition, batch *writeBatch) (*ProduceRespons
766765
defer cancel()
767766

768767
client := w.client(timeout)
769-
client.MaxMessageSize = w.batchBytes(key.topic)
768+
if w.ApplyBatchBytesAfterCompression {
769+
client.MaxMessageBytes = w.batchBytes(key.topic)
770+
}
770771

771772
return client.Produce(ctx, &ProduceRequest{
772773
Partition: int(key.partition),
@@ -860,7 +861,10 @@ func (w *Writer) deriveBatchBytes(msgs []Message) error {
860861
return err
861862
}
862863

863-
if _, ok := w.maxMessageBytesPerTopic.Load(topic); ok {
864+
if res, ok := w.maxMessageBytesPerTopic.Load(topic); ok {
865+
// DEBUG DEBUG DEBUG DEBUG DEBUG DEBUG DEBUG DEBUG DEBUG DEBUG DEBUG DEBUG DEBUG DEBUG DEBUG DEBUG DEBUG
866+
fmt.Println("Writer::deriveBatchBytes::LOAD", "topic", topic, "res", res)
867+
// DEBUG DEBUG DEBUG DEBUG DEBUG DEBUG DEBUG DEBUG DEBUG DEBUG DEBUG DEBUG DEBUG DEBUG DEBUG DEBUG DEBUG
864868
continue
865869
}
866870

@@ -887,6 +891,9 @@ func (w *Writer) deriveBatchBytes(msgs []Message) error {
887891
if err != nil {
888892
return err
889893
}
894+
// DEBUG DEBUG DEBUG DEBUG DEBUG DEBUG DEBUG DEBUG DEBUG DEBUG DEBUG DEBUG DEBUG DEBUG DEBUG DEBUG DEBUG
895+
fmt.Println("Writer::deriveBatchBytes::STORE", "topic", topic, "res", maxMessageBytesStr)
896+
// DEBUG DEBUG DEBUG DEBUG DEBUG DEBUG DEBUG DEBUG DEBUG DEBUG DEBUG DEBUG DEBUG DEBUG DEBUG DEBUG DEBUG
890897
w.maxMessageBytesPerTopic.Store(topic, int64(maxMessageBytes))
891898
}
892899
return nil
@@ -897,16 +904,15 @@ func (w *Writer) batchBytes(topic string) int64 {
897904
if result, ok := w.maxMessageBytesPerTopic.Load(topic); ok {
898905
return result.(int64)
899906
}
900-
901907
// batchBytes expects it's called after 'deriveBatchBytes(msgs)'
902908
// It means, there are no unknown topics
903909
panic(fmt.Sprintf("unknown topic: %s", topic))
904-
} else {
905-
if w.BatchBytes > 0 {
906-
return w.BatchBytes
907-
}
908-
return 1048576
909910
}
911+
912+
if w.BatchBytes > 0 {
913+
return w.BatchBytes
914+
}
915+
return 1048576
910916
}
911917

912918
func (w *Writer) batchTimeout() time.Duration {

0 commit comments

Comments
 (0)