Skip to content

Commit 70d01b4

Browse files
mapanmapan1984
authored andcommitted
fix(#760): remove future on exception
Change-Id: I9c876701b14b72d476f7a700465c05f6b870b5c1
1 parent d46d3e7 commit 70d01b4

File tree

1 file changed

+31
-21
lines changed

1 file changed

+31
-21
lines changed

core/src/main/java/io/aiven/kafka/tieredstorage/fetch/cache/ChunkCache.java

Lines changed: 31 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -82,29 +82,39 @@ public InputStream getChunk(final ObjectKey objectKey,
8282
final AtomicReference<InputStream> result = new AtomicReference<>();
8383
try {
8484
return cache.asMap()
85-
.compute(chunkKey, (key, val) -> CompletableFuture.supplyAsync(() -> {
86-
if (val == null) {
87-
statsCounter.recordMiss();
88-
try {
89-
final InputStream chunk =
90-
chunkManager.getChunk(objectKey, manifest, chunkId);
91-
final T t = this.cacheChunk(chunkKey, chunk);
92-
result.getAndSet(cachedChunkToInputStream(t));
93-
return t;
94-
} catch (final StorageBackendException | IOException e) {
95-
throw new CompletionException(e);
85+
.compute(chunkKey, (key, val) -> {
86+
final CompletableFuture<T> future = CompletableFuture.supplyAsync(() -> {
87+
if (val == null) {
88+
statsCounter.recordMiss();
89+
try {
90+
final InputStream chunk =
91+
chunkManager.getChunk(objectKey, manifest, chunkId);
92+
final T t = this.cacheChunk(chunkKey, chunk); // NOTE: 抛出 java.net.SocketTimeoutException: Read timed out
93+
result.getAndSet(cachedChunkToInputStream(t));
94+
return t;
95+
} catch (final StorageBackendException | IOException e) {
96+
throw new CompletionException(e);
97+
}
98+
} else {
99+
statsCounter.recordHit();
100+
try {
101+
final T cachedChunk = val.get();
102+
result.getAndSet(cachedChunkToInputStream(cachedChunk));
103+
return cachedChunk;
104+
} catch (final InterruptedException | ExecutionException e) {
105+
throw new CompletionException(e);
106+
}
96107
}
97-
} else {
98-
statsCounter.recordHit();
99-
try {
100-
final T cachedChunk = val.get();
101-
result.getAndSet(cachedChunkToInputStream(cachedChunk));
102-
return cachedChunk;
103-
} catch (final InterruptedException | ExecutionException e) {
104-
throw new CompletionException(e);
108+
}, executor);
109+
110+
future.whenComplete((r, ex) -> {
111+
if (ex != null) {
112+
cache.asMap().remove(key, future);
105113
}
106-
}
107-
}, executor))
114+
});
115+
116+
return future;
117+
})
108118
.thenApplyAsync(t -> result.get())
109119
.get(getTimeout.toMillis(), TimeUnit.MILLISECONDS);
110120
} catch (final ExecutionException e) {

0 commit comments

Comments
 (0)