Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
<suppressions>
<suppress checks="AbbreviationAsWordInName" files="DataKeyAndAADEqualsTest"/>
<suppress checks="ClassDataAbstractionCoupling" files=".*Test\.java"/>
<suppress checks="ClassDataAbstractionCoupling" files="DefaultChunkManager.java"/>
<suppress checks="ClassFanOutComplexity" files=".*Test\.java"/>
<suppress checks="ClassFanOutComplexity" files="RemoteStorageManager.java"/>
<suppress checks="ClassFanOutComplexity" files="ChunkCache.java"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public void configure(final Map<String, ?> configs) {
}
final ChunkManagerFactory chunkManagerFactory = new ChunkManagerFactory();
chunkManagerFactory.configure(configs);
chunkManager = chunkManagerFactory.initChunkManager(fetcher, aesEncryptionProvider);
chunkManager = chunkManagerFactory.initChunkManager(fetcher, aesEncryptionProvider, config.useNewMode());
chunkSize = config.chunkSize();
compressionEnabled = config.compressionEnabled();
compressionHeuristic = config.compressionHeuristicEnabled();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ public void configure(final Map<String, ?> configs) {
}

public ChunkManager initChunkManager(final ObjectFetcher fileFetcher,
final AesEncryptionProvider aesEncryptionProvider) {
final DefaultChunkManager defaultChunkManager = new DefaultChunkManager(fileFetcher, aesEncryptionProvider);
final AesEncryptionProvider aesEncryptionProvider,
final boolean useNewMode) {
final DefaultChunkManager defaultChunkManager =
new DefaultChunkManager(fileFetcher, aesEncryptionProvider, useNewMode);
if (config.cacheClass() != null) {
try {
final ChunkCache<?> chunkCache = config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,15 @@

package io.aiven.kafka.tieredstorage.chunkmanager;

import java.io.ByteArrayInputStream;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;

import io.aiven.kafka.tieredstorage.Chunk;
import io.aiven.kafka.tieredstorage.manifest.SegmentEncryptionMetadata;
Expand All @@ -33,13 +39,21 @@
import io.aiven.kafka.tieredstorage.transform.DetransformChunkEnumeration;
import io.aiven.kafka.tieredstorage.transform.DetransformFinisher;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultChunkManager implements ChunkManager {
private static final Logger log = LoggerFactory.getLogger(DefaultChunkManager.class);

private final ObjectFetcher fetcher;
private final AesEncryptionProvider aesEncryptionProvider;
private final boolean useNewMode;

public DefaultChunkManager(final ObjectFetcher fetcher, final AesEncryptionProvider aesEncryptionProvider) {
public DefaultChunkManager(final ObjectFetcher fetcher, final AesEncryptionProvider aesEncryptionProvider,
final boolean useNewMode) {
this.fetcher = fetcher;
this.aesEncryptionProvider = aesEncryptionProvider;
this.useNewMode = useNewMode;
}

/**
Expand All @@ -50,8 +64,9 @@ public DefaultChunkManager(final ObjectFetcher fetcher, final AesEncryptionProvi
public InputStream getChunk(final ObjectKey objectKey, final SegmentManifest manifest,
final int chunkId) throws StorageBackendException {
final Chunk chunk = manifest.chunkIndex().chunks().get(chunkId);

final InputStream chunkContent = fetcher.fetch(objectKey, chunk.range());
final InputStream chunkContent = useNewMode
? getChunkContentNewMode(objectKey, chunk)
: getChunkContentOldMode(objectKey, chunk);

DetransformChunkEnumeration detransformEnum = new BaseDetransformChunkEnumeration(chunkContent, List.of(chunk));
final Optional<SegmentEncryptionMetadata> encryptionMetadata = manifest.encryption();
Expand All @@ -68,4 +83,130 @@ public InputStream getChunk(final ObjectKey objectKey, final SegmentManifest man
final DetransformFinisher detransformFinisher = new DetransformFinisher(detransformEnum);
return detransformFinisher.toInputStream();
}

private InputStream getChunkContentOldMode(final ObjectKey objectKey,
final Chunk chunk) throws StorageBackendException {
return fetcher.fetch(objectKey, chunk.range());
}

private final PersistentObjectInputStreamCache streamCache = new PersistentObjectInputStreamCache();
private final AtomicInteger requestCounter = new AtomicInteger(0);

private InputStream getChunkContentNewMode(final ObjectKey objectKey,
final Chunk chunk) throws StorageBackendException {
final int requestId = requestCounter.getAndIncrement();
log.error("[{}] I need chunk: {}", requestId, chunk);

try (final var streamHandler = streamCache.borrowOrCreate(requestId, objectKey, chunk)) {
final var stream = streamHandler.inputStream;

if (chunk.equals(stream.currentChunk)) {
log.error("[{}] Chunk cached", requestId);
return new ByteArrayInputStream(stream.currentChunkContent);
}

log.error("[{}] Reading chunk content", requestId);
final byte[] chunkBytes = stream.readChunk(chunk);
log.error("[{}] Read chunk content", requestId);
return new ByteArrayInputStream(chunkBytes);
} catch (final Exception e) {
throw new StorageBackendException("error", e);
}
}

private static class PersistentObjectInputStream extends FilterInputStream {
public int position = 0;
public Chunk currentChunk = null;
public byte[] currentChunkContent = null;

private PersistentObjectInputStream(final InputStream in) {
super(in);
}

private byte[] readChunk(final Chunk chunk) throws StorageBackendException {
if (chunk.transformedPosition != position) {
throw new IllegalArgumentException("Invalid chunk " + chunk + ", current position: " + position);
}

currentChunk = chunk;
try {
final int size = chunk.range().size();
currentChunkContent = in.readNBytes(size);
if (currentChunkContent.length != size) {
throw new StorageBackendException(
"Expected " + size + " bytes for chunk " + chunk + " but got " + currentChunkContent.length);
}
position += size;
return currentChunkContent;
} catch (final IOException e) {
throw new RuntimeException(e);
}
}
}

private class PersistentObjectInputStreamCache {
private final Object lock = new Object();
private final HashMap<ObjectKey, List<PersistentObjectInputStream>> persistentStreams = new HashMap<>();

StreamHandler borrowOrCreate(final int requestId,
final ObjectKey objectKey,
final Chunk chunk) throws StorageBackendException {
synchronized (lock) {
final List<PersistentObjectInputStream> streams = persistentStreams.get(objectKey);
log.error("[{}] Streams now: {}", requestId, persistentStreams);
final int from = chunk.range().from;
if (streams == null) {
log.error("[{}] Opening new stream", requestId);
return new StreamHandler(requestId, objectKey,
new PersistentObjectInputStream(fetcher.getContinuousStream(objectKey, from)));
}

for (int i = 0; i < streams.size(); i++) {
final var stream = streams.get(i);
if (stream.currentChunk.equals(chunk)) {
log.error("[{}] Stream cached", requestId);
streams.remove(i);
return new StreamHandler(requestId, objectKey, stream);
}
}

for (int i = 0; i < streams.size(); i++) {
final var stream = streams.get(i);
if (stream.position == from) {
log.error("[{}] Stream cached", requestId);
streams.remove(i);
return new StreamHandler(requestId, objectKey, stream);
}
}

log.error("[{}] Opening new stream", requestId);
return new StreamHandler(requestId, objectKey,
new PersistentObjectInputStream(fetcher.getContinuousStream(objectKey, from)));
}
}

class StreamHandler implements AutoCloseable {
private final int requestId;
private final ObjectKey objectKey;
public final PersistentObjectInputStream inputStream;

private StreamHandler(final int requestId,
final ObjectKey objectKey,
final PersistentObjectInputStream inputStream) {
this.requestId = requestId;
this.objectKey = objectKey;
this.inputStream = inputStream;
}

@Override
public void close() throws Exception {
synchronized (lock) {
log.error("[{}] Returning stream for {}", requestId, objectKey);
final var list = persistentStreams.computeIfAbsent(objectKey, k -> new ArrayList<>());
list.add(inputStream);
log.error("[{}] Streams now: {}", requestId, persistentStreams);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
public class RemoteStorageManagerConfig extends AbstractConfig {
private static final String STORAGE_PREFIX = "storage.";

private static final String STORAGE_USE_NEW_MODE_CONFIG = STORAGE_PREFIX + "use.new.mode";

private static final String STORAGE_BACKEND_CLASS_CONFIG = STORAGE_PREFIX + "backend.class";
private static final String STORAGE_BACKEND_CLASS_DOC = "The storage backend implementation class";

Expand Down Expand Up @@ -101,6 +103,14 @@ public class RemoteStorageManagerConfig extends AbstractConfig {

// TODO checkers

CONFIG.define(
STORAGE_USE_NEW_MODE_CONFIG,
ConfigDef.Type.BOOLEAN,
false,
ConfigDef.Importance.LOW,
""
);

CONFIG.define(
STORAGE_BACKEND_CLASS_CONFIG,
ConfigDef.Type.CLASS,
Expand Down Expand Up @@ -208,6 +218,10 @@ public class RemoteStorageManagerConfig extends AbstractConfig {
CUSTOM_METADATA_FIELDS_INCLUDE_DOC);
}

public boolean useNewMode() {
return getBoolean(STORAGE_USE_NEW_MODE_CONFIG);
}

/**
* Internal config for encryption.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ public static Stream<Arguments> cachingChunkManagers() {
@Test
void defaultChunkManager() {
chunkManagerFactory.configure(Map.of());
final ChunkManager chunkManager = chunkManagerFactory.initChunkManager(null, null);
final ChunkManager chunkManager =
chunkManagerFactory.initChunkManager(null, null, false);
assertThat(chunkManager).isInstanceOf(DefaultChunkManager.class);
}

Expand All @@ -68,7 +69,8 @@ void cachingChunkManagers(final Class<ChunkCache<?>> cls) {
)
);
try (final MockedConstruction<?> ignored = mockConstruction(cls)) {
final ChunkManager chunkManager = chunkManagerFactory.initChunkManager(null, null);
final ChunkManager chunkManager =
chunkManagerFactory.initChunkManager(null, null, false);
assertThat(chunkManager).isInstanceOf(cls);
verify((ChunkCache<?>) chunkManager).configure(Map.of(
"class", cls,
Expand All @@ -85,9 +87,10 @@ void failedInitialization() {
(cachingChunkManager, context) -> {
throw new InvocationTargetException(null);
})) {
assertThatThrownBy(() -> chunkManagerFactory.initChunkManager(null, null))
.isInstanceOf(RuntimeException.class)
.hasCauseInstanceOf(ReflectiveOperationException.class);
assertThatThrownBy(() ->
chunkManagerFactory.initChunkManager(null, null, false))
.isInstanceOf(RuntimeException.class)
.hasCauseInstanceOf(ReflectiveOperationException.class);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ void testGetChunk() throws Exception {
final FixedSizeChunkIndex chunkIndex = new FixedSizeChunkIndex(10, 10, 10, 10);

final SegmentManifest manifest = new SegmentManifestV1(chunkIndex, SEGMENT_INDEXES, false, null, null);
final ChunkManager chunkManager = new DefaultChunkManager(storage, null);
final ChunkManager chunkManager = new DefaultChunkManager(storage, null, false);
when(storage.fetch(OBJECT_KEY, chunkIndex.chunks().get(0).range()))
.thenReturn(new ByteArrayInputStream("0123456789".getBytes()));

Expand All @@ -89,7 +89,7 @@ void testGetChunkWithEncryption() throws Exception {

final var encryption = new SegmentEncryptionMetadataV1(dataKeyAndAAD.dataKey, dataKeyAndAAD.aad);
final var manifest = new SegmentManifestV1(chunkIndex, SEGMENT_INDEXES, false, encryption, null);
final ChunkManager chunkManager = new DefaultChunkManager(storage, aesEncryptionProvider);
final ChunkManager chunkManager = new DefaultChunkManager(storage, aesEncryptionProvider, false);

assertThat(chunkManager.getChunk(OBJECT_KEY, manifest, 0)).hasBinaryContent(TEST_CHUNK_CONTENT);
verify(storage).fetch(OBJECT_KEY, chunkIndex.chunks().get(0).range());
Expand All @@ -109,7 +109,7 @@ void testGetChunkWithCompression() throws Exception {
.thenReturn(new ByteArrayInputStream(compressed));

final var manifest = new SegmentManifestV1(chunkIndex, SEGMENT_INDEXES, true, null, null);
final ChunkManager chunkManager = new DefaultChunkManager(storage, null);
final ChunkManager chunkManager = new DefaultChunkManager(storage, null, false);

assertThat(chunkManager.getChunk(OBJECT_KEY, manifest, 0)).hasBinaryContent(TEST_CHUNK_CONTENT);
verify(storage).fetch(OBJECT_KEY, chunkIndex.chunks().get(0).range());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ void test(final Map<String, String> config,
final BytesRange range) throws StorageBackendException, IOException {
final ChunkManagerFactory chunkManagerFactory = new ChunkManagerFactory();
chunkManagerFactory.configure(config);
final ChunkManager chunkManager = chunkManagerFactory.initChunkManager(fetcher, null);
final ChunkManager chunkManager = chunkManagerFactory.initChunkManager(fetcher, null, false);
final var is = new FetchChunkEnumeration(chunkManager, OBJECT_KEY, SEGMENT_MANIFEST, range)
.toInputStream();
if (readFully) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,8 @@ public interface ObjectFetcher {
* @param range range with inclusive start/end positions
*/
InputStream fetch(ObjectKey key, BytesRange range) throws StorageBackendException;

default InputStream getContinuousStream(ObjectKey key, int from) throws StorageBackendException {
throw new IllegalStateException("not implemented");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,31 @@ public InputStream fetch(final ObjectKey key, final BytesRange range) throws Sto
}
}

@Override
public InputStream getContinuousStream(final ObjectKey key, final int from) throws StorageBackendException {
try {
final Blob blob = getBlob(key);

if (from >= blob.getSize()) {
throw new InvalidRangeException("Range start position " + from
+ " is outside file content. file size = " + blob.getSize());
}

final ReadChannel reader = blob.reader();
reader.seek(from);
return Channels.newInputStream(reader);
} catch (final IOException e) {
throw new StorageBackendException("Failed to fetch " + key, e);
} catch (final StorageException e) {
// https://cloud.google.com/storage/docs/json_api/v1/status-codes#416_Requested_Range_Not_Satisfiable
if (e.getCode() == 416) {
throw new InvalidRangeException("Invalid from TODO " + from, e);
}

throw new StorageBackendException("Failed to fetch " + key, e);
}
}

private Blob getBlob(final ObjectKey key) throws KeyNotFoundException {
// Unfortunately, it seems Google will do two a separate (HEAD-like) call to get blob metadata.
// Since the blobs are immutable in tiered storage, we can consider caching them locally
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,30 @@ public InputStream fetch(final ObjectKey key, final BytesRange range) throws Sto
}
}

@Override
public InputStream getContinuousStream(final ObjectKey key, final int from) throws StorageBackendException {
try {
final GetObjectRequest getRequest = GetObjectRequest.builder()
.bucket(bucketName)
.key(key.value())
.build();
final var result = s3Client.getObject(getRequest);
final long skipped = result.skip(from);
if (skipped != from) {
throw new StorageBackendException("Failed to seek to position " + from + " in " + key);
}
return result;
} catch (final AwsServiceException e) {
if (e.statusCode() == 404) {
throw new KeyNotFoundException(this, key, e);
}

throw new StorageBackendException("Failed to fetch " + key, e);
} catch (final IOException e) {
throw new StorageBackendException("Failed to fetch " + key, e);
}
}

@Override
public String toString() {
return "S3Storage{"
Expand Down