Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.kafka.common.Configurable;

Expand Down Expand Up @@ -79,34 +78,34 @@ public InputStream getChunk(final ObjectKey objectKey,
final var currentChunk = manifest.chunkIndex().chunks().get(chunkId);
startPrefetching(objectKey, manifest, currentChunk.originalPosition + currentChunk.originalSize);
final ChunkKey chunkKey = new ChunkKey(objectKey.value(), chunkId);
final AtomicReference<InputStream> result = new AtomicReference<>();

final CompletableFuture<T> future = cache.asMap().compute(chunkKey, (key, existing) -> {
if (existing != null) {
statsCounter.recordHit();
return existing;
}

statsCounter.recordMiss();
final CompletableFuture<T> created = CompletableFuture.supplyAsync(() -> {
try (InputStream chunk = chunkManager.getChunk(objectKey, manifest, chunkId)) {
return this.cacheChunk(chunkKey, chunk);
} catch (final StorageBackendException | IOException e) {
throw new CompletionException(e);
}
});

created.whenComplete((r, ex) -> {
if (ex != null) {
cache.asMap().remove(key, created);
}
});

return created;
});

try {
return cache.asMap()
.compute(chunkKey, (key, val) -> CompletableFuture.supplyAsync(() -> {
if (val == null) {
statsCounter.recordMiss();
try {
final InputStream chunk =
chunkManager.getChunk(objectKey, manifest, chunkId);
final T t = this.cacheChunk(chunkKey, chunk);
result.getAndSet(cachedChunkToInputStream(t));
return t;
} catch (final StorageBackendException | IOException e) {
throw new CompletionException(e);
}
} else {
statsCounter.recordHit();
try {
final T cachedChunk = val.get();
result.getAndSet(cachedChunkToInputStream(cachedChunk));
return cachedChunk;
} catch (final InterruptedException | ExecutionException e) {
throw new CompletionException(e);
}
}
}, executor))
.thenApplyAsync(t -> result.get())
.get(getTimeout.toMillis(), TimeUnit.MILLISECONDS);
T t = future.get(getTimeout.toMillis(), TimeUnit.MILLISECONDS);
return this.cachedChunkToInputStream(t);
} catch (final ExecutionException e) {
// Unwrap previously wrapped exceptions if possible.
final Throwable cause = e.getCause();
Expand Down
Loading