Skip to content

Commit abc7c8f

Browse files
committed
refactor: decouple original size from chunking
Chunking is defined by original chunk size, not original content size. This PR removed the scenario where these two sizes are conflated (e.g. passing original size zero to disable chunking, etc.).
1 parent cdffc7f commit abc7c8f

File tree

4 files changed

+79
-64
lines changed

4 files changed

+79
-64
lines changed

core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -457,23 +457,26 @@ InputStream transformIndex(final IndexType indexType,
457457
);
458458
}
459459
final var transformFinisher = new TransformFinisher(transformEnum, size);
460+
// Getting next element and expecting that it is the only one.
461+
// No need to get a sequenced input stream
460462
final var inputStream = transformFinisher.nextElement();
461-
segmentIndexBuilder.add(indexType, singleChunk(transformFinisher.chunkIndex()).range().size());
463+
final var chunkIndex = transformFinisher.chunkIndex();
464+
// by getting a chunk index, means that the transformation is completed.
465+
if (chunkIndex == null) {
466+
throw new IllegalStateException("Chunking disabled when single chunk is expected");
467+
}
468+
if (chunkIndex.chunks().size() != 1) {
469+
// not expected, as next element run once. But for safety
470+
throw new IllegalStateException("Number of chunks different than 1, single chunk is expected");
471+
}
472+
segmentIndexBuilder.add(indexType, chunkIndex.chunks().get(0).range().size());
462473
return inputStream;
463474
} else {
464475
segmentIndexBuilder.add(indexType, 0);
465476
return InputStream.nullInputStream();
466477
}
467478
}
468479

469-
private Chunk singleChunk(final ChunkIndex chunkIndex) {
470-
final var chunks = chunkIndex.chunks();
471-
if (chunks.size() != 1) {
472-
throw new IllegalStateException("Single chunk expected when transforming indexes");
473-
}
474-
return chunks.get(0);
475-
}
476-
477480
void uploadManifest(final RemoteLogSegmentMetadata remoteLogSegmentMetadata,
478481
final ChunkIndex chunkIndex,
479482
final SegmentIndexesV1 segmentIndexes,

core/src/main/java/io/aiven/kafka/tieredstorage/transform/BaseTransformChunkEnumeration.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,10 @@ public class BaseTransformChunkEnumeration implements TransformChunkEnumeration
3232

3333
private byte[] chunk = null;
3434

35-
public BaseTransformChunkEnumeration(final InputStream inputStream) {
36-
this.inputStream = Objects.requireNonNull(inputStream, "inputStream cannot be null");
37-
38-
this.originalChunkSize = 0;
39-
}
40-
35+
/**
36+
* @param inputStream original content
37+
* @param originalChunkSize chunk size from the <b>original</b> content. If zero, it disables chunking.
38+
*/
4139
public BaseTransformChunkEnumeration(final InputStream inputStream,
4240
final int originalChunkSize) {
4341
this.inputStream = Objects.requireNonNull(inputStream, "inputStream cannot be null");

core/src/main/java/io/aiven/kafka/tieredstorage/transform/TransformFinisher.java

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -27,43 +27,53 @@
2727
import io.aiven.kafka.tieredstorage.manifest.index.FixedSizeChunkIndexBuilder;
2828
import io.aiven.kafka.tieredstorage.manifest.index.VariableSizeChunkIndexBuilder;
2929

30-
// TODO test transforms and detransforms with property-based tests
31-
3230
/**
3331
* The transformation finisher.
3432
*
3533
* <p>It converts enumeration of {@code byte[]} into enumeration of {@link InputStream},
3634
* so that it could be used in {@link SequenceInputStream}.
3735
*
3836
* <p>It's responsible for building the chunk index.
37+
* The chunk index is empty (i.e. null) if chunking has been disabled (i.e. chunk size is zero),
38+
* but could also have a single chunk if the chunk size is equal or higher to the original file size.
39+
* Otherwise, the chunk index will contain more than one chunk.
3940
*/
4041
public class TransformFinisher implements Enumeration<InputStream> {
4142
private final TransformChunkEnumeration inner;
4243
private final AbstractChunkIndexBuilder chunkIndexBuilder;
43-
private final int originalFileSize;
4444
private ChunkIndex chunkIndex = null;
4545

46-
public TransformFinisher(final TransformChunkEnumeration inner) {
47-
this(inner, 0);
48-
}
49-
50-
public TransformFinisher(final TransformChunkEnumeration inner, final int originalFileSize) {
46+
public TransformFinisher(
47+
final TransformChunkEnumeration inner,
48+
final int originalFileSize
49+
) {
5150
this.inner = Objects.requireNonNull(inner, "inner cannot be null");
52-
this.originalFileSize = originalFileSize;
5351

5452
if (originalFileSize < 0) {
5553
throw new IllegalArgumentException(
5654
"originalFileSize must be non-negative, " + originalFileSize + " given");
5755
}
5856

57+
this.chunkIndexBuilder = chunkIndexBuilder(inner, inner.originalChunkSize(), originalFileSize);
58+
}
59+
60+
private static AbstractChunkIndexBuilder chunkIndexBuilder(
61+
final TransformChunkEnumeration inner,
62+
final int originalChunkSize,
63+
final int originalFileSize
64+
) {
5965
final Integer transformedChunkSize = inner.transformedChunkSize();
60-
if (originalFileSize == 0) {
61-
this.chunkIndexBuilder = null;
62-
} else if (transformedChunkSize == null) {
63-
this.chunkIndexBuilder = new VariableSizeChunkIndexBuilder(inner.originalChunkSize(), originalFileSize);
66+
if (transformedChunkSize == null) {
67+
return new VariableSizeChunkIndexBuilder(
68+
originalChunkSize,
69+
originalFileSize
70+
);
6471
} else {
65-
this.chunkIndexBuilder = new FixedSizeChunkIndexBuilder(
66-
inner.originalChunkSize(), originalFileSize, transformedChunkSize);
72+
return new FixedSizeChunkIndexBuilder(
73+
originalChunkSize,
74+
originalFileSize,
75+
transformedChunkSize
76+
);
6777
}
6878
}
6979

@@ -75,7 +85,7 @@ public boolean hasMoreElements() {
7585
@Override
7686
public InputStream nextElement() {
7787
final var chunk = inner.nextElement();
78-
if (chunkIndexBuilder != null) {
88+
if (inner.originalChunkSize() > 0) {
7989
if (hasMoreElements()) {
8090
this.chunkIndexBuilder.addChunk(chunk.length);
8191
} else {
@@ -87,7 +97,7 @@ public InputStream nextElement() {
8797
}
8898

8999
public ChunkIndex chunkIndex() {
90-
if (chunkIndex == null && originalFileSize > 0) {
100+
if (chunkIndex == null && inner.originalChunkSize() > 0) {
91101
throw new IllegalStateException("Chunk index was not built, was finisher used?");
92102
}
93103
return this.chunkIndex;

core/src/test/java/io/aiven/kafka/tieredstorage/transform/TransformsEndToEndTest.java

Lines changed: 36 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -70,39 +70,43 @@ void compressionAndEncryption(final int chunkSize) throws IOException {
7070

7171
private void test(final int chunkSize, final boolean compression, final boolean encryption) throws IOException {
7272
// Transform.
73-
TransformChunkEnumeration transformEnum = new BaseTransformChunkEnumeration(
74-
new ByteArrayInputStream(original), chunkSize);
75-
if (compression) {
76-
transformEnum = new CompressionChunkEnumeration(transformEnum);
77-
}
78-
if (encryption) {
79-
transformEnum = new EncryptionChunkEnumeration(transformEnum, AesKeyAwareTest::encryptionCipherSupplier);
80-
}
81-
final var transformFinisher = chunkSize == 0
82-
? new TransformFinisher(transformEnum)
83-
: new TransformFinisher(transformEnum, ORIGINAL_SIZE);
84-
final byte[] uploadedData;
85-
final ChunkIndex chunkIndex;
86-
try (final var sis = transformFinisher.toInputStream()) {
87-
uploadedData = sis.readAllBytes();
88-
chunkIndex = transformFinisher.chunkIndex();
89-
}
73+
try (final var inputStream = new ByteArrayInputStream(original)) {
74+
TransformChunkEnumeration transformEnum = new BaseTransformChunkEnumeration(inputStream, chunkSize);
75+
if (compression) {
76+
transformEnum = new CompressionChunkEnumeration(transformEnum);
77+
}
78+
if (encryption) {
79+
transformEnum = new EncryptionChunkEnumeration(
80+
transformEnum,
81+
AesKeyAwareTest::encryptionCipherSupplier
82+
);
83+
}
84+
final var transformFinisher = new TransformFinisher(transformEnum, ORIGINAL_SIZE);
85+
final byte[] uploadedData;
86+
final ChunkIndex chunkIndex;
87+
try (final var sis = transformFinisher.toInputStream()) {
88+
uploadedData = sis.readAllBytes();
89+
chunkIndex = transformFinisher.chunkIndex();
90+
}
9091

91-
// Detransform.
92-
DetransformChunkEnumeration detransformEnum = chunkIndex == null
93-
? new BaseDetransformChunkEnumeration(new ByteArrayInputStream(uploadedData))
94-
: new BaseDetransformChunkEnumeration(new ByteArrayInputStream(uploadedData), chunkIndex.chunks());
95-
if (encryption) {
96-
detransformEnum = new DecryptionChunkEnumeration(
97-
detransformEnum, ivSize, AesKeyAwareTest::decryptionCipherSupplier);
98-
}
99-
if (compression) {
100-
detransformEnum = new DecompressionChunkEnumeration(detransformEnum);
101-
}
102-
final var detransformFinisher = new DetransformFinisher(detransformEnum);
103-
try (final var sis = detransformFinisher.toInputStream()) {
104-
final byte[] downloaded = sis.readAllBytes();
105-
assertThat(downloaded).isEqualTo(original);
92+
// Detransform.
93+
try (final var uploadedStream = new ByteArrayInputStream(uploadedData)) {
94+
DetransformChunkEnumeration detransformEnum = chunkIndex == null
95+
? new BaseDetransformChunkEnumeration(uploadedStream)
96+
: new BaseDetransformChunkEnumeration(uploadedStream, chunkIndex.chunks());
97+
if (encryption) {
98+
detransformEnum = new DecryptionChunkEnumeration(
99+
detransformEnum, ivSize, AesKeyAwareTest::decryptionCipherSupplier);
100+
}
101+
if (compression) {
102+
detransformEnum = new DecompressionChunkEnumeration(detransformEnum);
103+
}
104+
final var detransformFinisher = new DetransformFinisher(detransformEnum);
105+
try (final var sis = detransformFinisher.toInputStream()) {
106+
final byte[] downloaded = sis.readAllBytes();
107+
assertThat(downloaded).isEqualTo(original);
108+
}
109+
}
106110
}
107111
}
108112
}

0 commit comments

Comments
 (0)