Skip to content

Commit fab5c55

Browse files
authored
Merge pull request #528 from alex268/master
Fixed lost compression errors
2 parents 3b6226b + d252cc4 commit fab5c55

File tree

2 files changed

+6
-7
lines changed

2 files changed

+6
-7
lines changed

topic/src/main/java/tech/ydb/topic/write/impl/EnqueuedMessage.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package tech.ydb.topic.write.impl;
22

3-
import java.io.IOException;
43
import java.time.Instant;
54
import java.util.List;
65
import java.util.concurrent.CompletableFuture;
@@ -35,7 +34,7 @@ public class EnqueuedMessage {
3534
private final YdbTransaction transaction;
3635

3736
private volatile boolean isReady = false;
38-
private volatile IOException compressError = null;
37+
private volatile Throwable compressError = null;
3938

4039
public EnqueuedMessage(Message message, SendSettings sendSettings, boolean noCompression) {
4140
this.bytes = message.getData();
@@ -60,7 +59,7 @@ public long getSize() {
6059
return bytes.length;
6160
}
6261

63-
public IOException getCompressError() {
62+
public Throwable getCompressError() {
6463
return compressError;
6564
}
6665

@@ -71,8 +70,9 @@ public void encode(String writeId, Codec codec) {
7170
bytes = Encoder.encode(codec, bytes);
7271
isReady = true;
7372
logger.trace("[{}] Successfully finished encoding message", writeId);
74-
} catch (IOException ex) {
73+
} catch (Throwable ex) {
7574
logger.error("[{}] Exception while encoding message: ", writeId, ex);
75+
compressError = ex;
7676
isReady = true;
7777
future.completeExceptionally(ex);
7878
}

topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package tech.ydb.topic.write.impl;
22

3-
import java.io.IOException;
43
import java.util.Deque;
54
import java.util.LinkedList;
65
import java.util.List;
@@ -165,7 +164,7 @@ private void acceptMessageIntoSendingQueue(EnqueuedMessage message) {
165164
} else {
166165
CompletableFuture
167166
.runAsync(() -> message.encode(id, settings.getCodec()), compressionExecutor)
168-
.thenRun(this::moveEncodedMessagesToSendingQueue);
167+
.whenComplete((res, th) -> moveEncodedMessagesToSendingQueue());
169168
}
170169
}
171170

@@ -187,7 +186,7 @@ private void moveEncodedMessagesToSendingQueue() {
187186
break;
188187
}
189188

190-
IOException error = msg.getCompressError();
189+
Throwable error = msg.getCompressError();
191190
if (error != null) { // just skip
192191
logger.warn("[{}] Message wasn't sent because of processing error", id, error);
193192
free(1, msg.getOriginalSize());

0 commit comments

Comments
 (0)