Open
Description
👋 I have a question regarding some behavior I'm seeing with this library.
Let's say I have a local kafka broker configured with message.max.bytes=512
and I attempt the following:
conn := &kafka.Writer{
Addr: kafka.TCP("localhost:9092"),
BatchTimeout: 75 * time.Millisecond,
RequiredAcks: kafka.RequireAll,
}
messages := []kafka.Message{
{
Topic: "my-topic",
Value: []byte("couple chars"),
},
{
Topic: "my-topic",
Value: []byte("text containing more than 512 characters...."), // pretend this has more than 512 chars
},
}
switch err := conn.WriteMessages(context.Background(), messages...).(type) {
case nil:
return nil
case kafka.WriteErrors:
for i := range messages {
if err[i] != nil {
log.Fatalf("Failed to write message - %v", err[i])
}
}
default:
log.Fatalf("Failed to write messages to Kafka broker - %v", err)
}
In the code sample above I get two WriteErrors
, one for the first message and one for the second message.
WriteError values contain a list of errors where each entry matches the position of a message in the WriteMessages call. The program can determine the status of each message by looping over the error...
According to the documentation, I would expect to receive a partial failure. Shouldn't the first message succeed and the second message should fail?