Skip to content

Commit c8377d7

Browse files
mapanmapan1984
authored andcommitted
fix(#760): remove future on exception
Change-Id: I9c876701b14b72d476f7a700465c05f6b870b5c1
1 parent 0664dc4 commit c8377d7

File tree

1 file changed

+28
-36
lines changed

1 file changed

+28
-36
lines changed

core/src/main/java/io/aiven/kafka/tieredstorage/fetch/cache/ChunkCache.java

Lines changed: 28 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@
4747
import com.github.benmanes.caffeine.cache.Weigher;
4848

4949
public abstract class ChunkCache<T> implements ChunkManager, Configurable {
50+
private static final org.slf4j.Logger log = org.slf4j.LoggerFactory.getLogger(ChunkCache.class);
51+
5052
public static final String METRIC_GROUP = "chunk-cache-metrics";
5153
public static final String THREAD_POOL_METRIC_GROUP = "chunk-cache-thread-pool-metrics";
5254

@@ -79,44 +81,34 @@ public InputStream getChunk(final ObjectKey objectKey,
7981
final var currentChunk = manifest.chunkIndex().chunks().get(chunkId);
8082
startPrefetching(objectKey, manifest, currentChunk.originalPosition + currentChunk.originalSize);
8183
final ChunkKey chunkKey = new ChunkKey(objectKey.value(), chunkId);
82-
final AtomicReference<InputStream> result = new AtomicReference<>();
83-
try {
84-
return cache.asMap()
85-
.compute(chunkKey, (key, val) -> {
86-
final CompletableFuture<T> future = CompletableFuture.supplyAsync(() -> {
87-
if (val == null) {
88-
statsCounter.recordMiss();
89-
try {
90-
final InputStream chunk =
91-
chunkManager.getChunk(objectKey, manifest, chunkId);
92-
final T t = this.cacheChunk(chunkKey, chunk);
93-
result.getAndSet(cachedChunkToInputStream(t));
94-
return t;
95-
} catch (final StorageBackendException | IOException e) {
96-
throw new CompletionException(e);
97-
}
98-
} else {
99-
statsCounter.recordHit();
100-
try {
101-
final T cachedChunk = val.get();
102-
result.getAndSet(cachedChunkToInputStream(cachedChunk));
103-
return cachedChunk;
104-
} catch (final InterruptedException | ExecutionException e) {
105-
throw new CompletionException(e);
106-
}
107-
}
108-
}, executor);
10984

110-
future.whenComplete((r, ex) -> {
111-
if (ex != null) {
112-
cache.asMap().remove(key, future);
113-
}
114-
});
85+
final CompletableFuture<T> future = cache.asMap().compute(chunkKey, (key, val) -> {
86+
if (val != null) {
87+
statsCounter.recordHit();
88+
return val;
89+
}
11590

116-
return future;
117-
})
118-
.thenApplyAsync(t -> result.get())
119-
.get(getTimeout.toMillis(), TimeUnit.MILLISECONDS);
91+
statsCounter.recordMiss();
92+
final CompletableFuture<T> created = CompletableFuture.supplyAsync(() -> {
93+
try (InputStream chunk = chunkManager.getChunk(objectKey, manifest, chunkId)) {
94+
return this.cacheChunk(chunkKey, chunk);
95+
} catch (final StorageBackendException | IOException e) {
96+
throw new CompletionException(e);
97+
}
98+
});
99+
100+
created.whenComplete((r, ex) -> {
101+
if (ex != null) {
102+
cache.asMap().remove(key, created);
103+
}
104+
});
105+
106+
return created;
107+
});
108+
109+
try {
110+
T t = future.get(getTimeout.toMillis(), TimeUnit.MILLISECONDS);
111+
return this.cachedChunkToInputStream(t);
120112
} catch (final ExecutionException e) {
121113
// Unwrap previously wrapped exceptions if possible.
122114
final Throwable cause = e.getCause();

0 commit comments

Comments
 (0)