Skip to content

Commit dc45dda

Browse files
committed
refactor: segment manifest builder
Bunch of optional fields can be passed via builder
1 parent cdffc7f commit dc45dda

File tree

10 files changed

+237
-39
lines changed

10 files changed

+237
-39
lines changed

core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@
5757
import io.aiven.kafka.tieredstorage.fetch.index.MemorySegmentIndexesCache;
5858
import io.aiven.kafka.tieredstorage.fetch.index.SegmentIndexesCache;
5959
import io.aiven.kafka.tieredstorage.manifest.SegmentEncryptionMetadata;
60-
import io.aiven.kafka.tieredstorage.manifest.SegmentEncryptionMetadataV1;
6160
import io.aiven.kafka.tieredstorage.manifest.SegmentIndex;
6261
import io.aiven.kafka.tieredstorage.manifest.SegmentIndexesV1;
6362
import io.aiven.kafka.tieredstorage.manifest.SegmentIndexesV1Builder;
@@ -481,19 +480,13 @@ void uploadManifest(final RemoteLogSegmentMetadata remoteLogSegmentMetadata,
481480
final DataKeyAndAAD maybeEncryptionKey,
482481
final SegmentCustomMetadataBuilder customMetadataBuilder
483482
) throws StorageBackendException, IOException {
484-
final SegmentEncryptionMetadataV1 maybeEncryptionMetadata;
483+
final var segmentManifestBuilder = SegmentManifestV1.newBuilder(chunkIndex, segmentIndexes)
484+
.withRlsm(remoteLogSegmentMetadata)
485+
.withCompressionEnabled(requiresCompression);
485486
if (maybeEncryptionKey != null) {
486-
maybeEncryptionMetadata = new SegmentEncryptionMetadataV1(maybeEncryptionKey);
487-
} else {
488-
maybeEncryptionMetadata = null;
487+
segmentManifestBuilder.withEncryptionKey(maybeEncryptionKey);
489488
}
490-
final SegmentManifest segmentManifest = new SegmentManifestV1(
491-
chunkIndex,
492-
segmentIndexes,
493-
requiresCompression,
494-
maybeEncryptionMetadata,
495-
remoteLogSegmentMetadata
496-
);
489+
final SegmentManifest segmentManifest = segmentManifestBuilder.build();
497490
final String manifest = mapper.writeValueAsString(segmentManifest);
498491
final ObjectKey manifestObjectKey =
499492
objectKeyFactory.key(remoteLogSegmentMetadata, ObjectKeyFactory.Suffix.MANIFEST);

core/src/main/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestV1.java

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
2323

2424
import io.aiven.kafka.tieredstorage.manifest.index.ChunkIndex;
25+
import io.aiven.kafka.tieredstorage.security.DataKeyAndAAD;
2526

2627
import com.fasterxml.jackson.annotation.JsonCreator;
2728
import com.fasterxml.jackson.annotation.JsonInclude;
@@ -44,7 +45,7 @@ public SegmentManifestV1(
4445
this(chunkIndex, segmentIndexes, compression, encryption, null);
4546
}
4647

47-
public SegmentManifestV1(final ChunkIndex chunkIndex,
48+
private SegmentManifestV1(final ChunkIndex chunkIndex,
4849
final SegmentIndexesV1 segmentIndexes,
4950
final boolean compression,
5051
final SegmentEncryptionMetadataV1 encryption,
@@ -58,6 +59,13 @@ public SegmentManifestV1(final ChunkIndex chunkIndex,
5859
this.remoteLogSegmentMetadata = remoteLogSegmentMetadata;
5960
}
6061

62+
public static Builder newBuilder(
63+
final ChunkIndex chunkIndex,
64+
final SegmentIndexesV1 segmentIndexes
65+
) {
66+
return new Builder(chunkIndex, segmentIndexes);
67+
}
68+
6169
@Override
6270
@JsonProperty("chunkIndex")
6371
public ChunkIndex chunkIndex() {
@@ -129,4 +137,46 @@ public String toString() {
129137
+ ", encryption=" + encryption
130138
+ ")";
131139
}
140+
141+
public static class Builder {
142+
final ChunkIndex chunkIndex;
143+
final SegmentIndexesV1 segmentIndexes;
144+
boolean compression = false;
145+
SegmentEncryptionMetadataV1 encryptionMetadata = null;
146+
RemoteLogSegmentMetadata rlsm = null;
147+
148+
public Builder(
149+
final ChunkIndex chunkIndex,
150+
final SegmentIndexesV1 segmentIndexes
151+
) {
152+
this.chunkIndex = chunkIndex;
153+
this.segmentIndexes = segmentIndexes;
154+
}
155+
156+
public Builder withCompressionEnabled(final boolean requiresCompression) {
157+
this.compression = requiresCompression;
158+
return this;
159+
}
160+
161+
public Builder withEncryptionMetadata(final SegmentEncryptionMetadataV1 encryptionMetadata) {
162+
this.encryptionMetadata = Objects.requireNonNull(encryptionMetadata, "encryptionMetadata cannot be null");
163+
return this;
164+
}
165+
166+
public Builder withEncryptionKey(final DataKeyAndAAD dataKeyAndAAD) {
167+
this.encryptionMetadata = new SegmentEncryptionMetadataV1(
168+
Objects.requireNonNull(dataKeyAndAAD, "dataKeyAndAAD cannot be null")
169+
);
170+
return this;
171+
}
172+
173+
public Builder withRlsm(final RemoteLogSegmentMetadata rlsm) {
174+
this.rlsm = Objects.requireNonNull(rlsm, "remoteLogSegmentMetadata cannot be null");
175+
return this;
176+
}
177+
178+
public SegmentManifestV1 build() {
179+
return new SegmentManifestV1(chunkIndex, segmentIndexes, compression, encryptionMetadata, rlsm);
180+
}
181+
}
132182
}

core/src/test/java/io/aiven/kafka/tieredstorage/fetch/DefaultChunkManagerTest.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,8 @@ class DefaultChunkManagerTest extends AesKeyAwareTest {
6262
void testGetChunk() throws Exception {
6363
final FixedSizeChunkIndex chunkIndex = new FixedSizeChunkIndex(10, 10, 10, 10);
6464

65-
final SegmentManifest manifest = new SegmentManifestV1(chunkIndex, SEGMENT_INDEXES, false, null, null);
65+
final SegmentManifest manifest = SegmentManifestV1.newBuilder(chunkIndex, SEGMENT_INDEXES)
66+
.build();
6667
final ChunkManager chunkManager = new DefaultChunkManager(storage, null);
6768
when(storage.fetch(OBJECT_KEY, chunkIndex.chunks().get(0).range()))
6869
.thenReturn(new ByteArrayInputStream("0123456789".getBytes()));
@@ -88,7 +89,9 @@ void testGetChunkWithEncryption() throws Exception {
8889
new ByteArrayInputStream(encrypted));
8990

9091
final var encryption = new SegmentEncryptionMetadataV1(dataKeyAndAAD.dataKey, dataKeyAndAAD.aad);
91-
final var manifest = new SegmentManifestV1(chunkIndex, SEGMENT_INDEXES, false, encryption, null);
92+
final var manifest = SegmentManifestV1.newBuilder(chunkIndex, SEGMENT_INDEXES)
93+
.withEncryptionMetadata(encryption)
94+
.build();
9295
final ChunkManager chunkManager = new DefaultChunkManager(storage, aesEncryptionProvider);
9396

9497
assertThat(chunkManager.getChunk(OBJECT_KEY, manifest, 0)).hasBinaryContent(TEST_CHUNK_CONTENT);
@@ -108,7 +111,9 @@ void testGetChunkWithCompression() throws Exception {
108111
when(storage.fetch(OBJECT_KEY, chunkIndex.chunks().get(0).range()))
109112
.thenReturn(new ByteArrayInputStream(compressed));
110113

111-
final var manifest = new SegmentManifestV1(chunkIndex, SEGMENT_INDEXES, true, null, null);
114+
final var manifest = SegmentManifestV1.newBuilder(chunkIndex, SEGMENT_INDEXES)
115+
.withCompressionEnabled(true)
116+
.build();
112117
final ChunkManager chunkManager = new DefaultChunkManager(storage, null);
113118

114119
assertThat(chunkManager.getChunk(OBJECT_KEY, manifest, 0)).hasBinaryContent(TEST_CHUNK_CONTENT);

core/src/test/java/io/aiven/kafka/tieredstorage/fetch/FetchChunkEnumerationSourceInputStreamClosingTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,8 @@ class FetchChunkEnumerationSourceInputStreamClosingTest {
7070
.add(IndexType.LEADER_EPOCH, 1)
7171
.add(IndexType.TRANSACTION, 1)
7272
.build();
73-
static final SegmentManifest SEGMENT_MANIFEST = new SegmentManifestV1(
74-
CHUNK_INDEX, SEGMENT_INDEXES, false, null, null);
73+
static final SegmentManifest SEGMENT_MANIFEST = SegmentManifestV1.newBuilder(CHUNK_INDEX, SEGMENT_INDEXES)
74+
.build();
7575

7676
TestObjectFetcher fetcher;
7777

core/src/test/java/io/aiven/kafka/tieredstorage/fetch/FetchChunkEnumerationTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ class FetchChunkEnumerationTest {
5353
.add(IndexType.LEADER_EPOCH, 1)
5454
.add(IndexType.TRANSACTION, 1)
5555
.build();
56-
final SegmentManifest manifest = new SegmentManifestV1(chunkIndex, segmentIndexesV1, false, null, null);
56+
final SegmentManifest manifest = SegmentManifestV1.newBuilder(chunkIndex, segmentIndexesV1).build();
5757

5858
static final byte[] CHUNK_CONTENT = "0123456789".getBytes();
5959
static final ObjectKey SEGMENT_KEY = new TestObjectKey("topic/segment");
@@ -64,7 +64,7 @@ class FetchChunkEnumerationTest {
6464
@Test
6565
void failsWhenLargerStartPosition() {
6666
// Given
67-
final SegmentManifest manifest = new SegmentManifestV1(chunkIndex, segmentIndexesV1, false, null, null);
67+
final SegmentManifest manifest = SegmentManifestV1.newBuilder(chunkIndex, segmentIndexesV1).build();
6868
// When
6969
final int from = 1000;
7070
final int to = from + 1;

core/src/test/java/io/aiven/kafka/tieredstorage/fetch/cache/ChunkCacheTest.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,11 @@ class ChunkCacheTest {
8686
.add(IndexType.TRANSACTION, 1)
8787
.build();
8888

89-
private static final SegmentManifest SEGMENT_MANIFEST =
90-
new SegmentManifestV1(FIXED_SIZE_CHUNK_INDEX, SEGMENT_INDEXES, false, null, null);
89+
private static final SegmentManifest SEGMENT_MANIFEST = SegmentManifestV1.newBuilder(
90+
FIXED_SIZE_CHUNK_INDEX,
91+
SEGMENT_INDEXES
92+
)
93+
.build();
9194
private static final String TEST_EXCEPTION_MESSAGE = "test_message";
9295
private static final String SEGMENT_KEY = "topic/segment";
9396
private static final ObjectKey SEGMENT_OBJECT_KEY = () -> SEGMENT_KEY;

core/src/test/java/io/aiven/kafka/tieredstorage/fetch/cache/DiskChunkCacheMetricsTest.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -56,16 +56,16 @@ class DiskChunkCacheMetricsTest {
5656
TimeUnit.SECONDS.convert(new MetricConfig().timeWindowMs(), TimeUnit.MILLISECONDS);
5757

5858
static final SegmentManifest SEGMENT_MANIFEST =
59-
new SegmentManifestV1(
60-
new FixedSizeChunkIndex(10, 30, 10, 10),
61-
SegmentIndexesV1.builder()
62-
.add(RemoteStorageManager.IndexType.OFFSET, 1)
63-
.add(RemoteStorageManager.IndexType.TIMESTAMP, 1)
64-
.add(RemoteStorageManager.IndexType.PRODUCER_SNAPSHOT, 1)
65-
.add(RemoteStorageManager.IndexType.LEADER_EPOCH, 1)
66-
.add(RemoteStorageManager.IndexType.TRANSACTION, 1)
67-
.build(),
68-
false, null, null);
59+
SegmentManifestV1.newBuilder(
60+
new FixedSizeChunkIndex(10, 30, 10, 10),
61+
SegmentIndexesV1.builder()
62+
.add(RemoteStorageManager.IndexType.OFFSET, 1)
63+
.add(RemoteStorageManager.IndexType.TIMESTAMP, 1)
64+
.add(RemoteStorageManager.IndexType.PRODUCER_SNAPSHOT, 1)
65+
.add(RemoteStorageManager.IndexType.LEADER_EPOCH, 1)
66+
.add(RemoteStorageManager.IndexType.TRANSACTION, 1)
67+
.build())
68+
.build();
6969

7070
static final ObjectKey OBJECT_KEY_PATH = () -> "topic/segment";
7171

core/src/test/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestProviderTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ void shouldReturnAndCache() throws StorageBackendException, IOException {
115115
when(storage.fetch(key))
116116
.thenReturn(new ByteArrayInputStream(MANIFEST.getBytes()));
117117
final var chunkIndex = new FixedSizeChunkIndex(100, 1000, 110, 110);
118-
final var expectedManifest = new SegmentManifestV1(chunkIndex, SEGMENT_INDEXES, false, null, null);
118+
final var expectedManifest = SegmentManifestV1.newBuilder(chunkIndex, SEGMENT_INDEXES).build();
119119
assertThat(provider.get(key)).isEqualTo(expectedManifest);
120120
verify(storage).fetch(key);
121121
assertThat(provider.get(key)).isEqualTo(expectedManifest);
@@ -155,7 +155,7 @@ void shouldNotPoisonCacheWithFailedFutures()
155155
.hasMessage("test");
156156

157157
final var chunkIndex = new FixedSizeChunkIndex(100, 1000, 110, 110);
158-
final var expectedManifest = new SegmentManifestV1(chunkIndex, SEGMENT_INDEXES, false, null, null);
158+
final var expectedManifest = SegmentManifestV1.newBuilder(chunkIndex, SEGMENT_INDEXES).build();
159159

160160
await().atMost(Duration.ofMillis(50))
161161
.pollInterval(Duration.ofMillis(5))
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
/*
2+
* Copyright 2024 Aiven Oy
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.aiven.kafka.tieredstorage.manifest;
18+
19+
import javax.crypto.SecretKey;
20+
import javax.crypto.spec.SecretKeySpec;
21+
22+
import java.util.Map;
23+
24+
import org.apache.kafka.common.TopicIdPartition;
25+
import org.apache.kafka.common.Uuid;
26+
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
27+
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
28+
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
29+
30+
import io.aiven.kafka.tieredstorage.manifest.index.FixedSizeChunkIndex;
31+
import io.aiven.kafka.tieredstorage.security.DataKeyAndAAD;
32+
33+
import org.junit.jupiter.api.Test;
34+
35+
import static org.assertj.core.api.Assertions.assertThat;
36+
37+
class SegmentManifestV1BuilderTest {
38+
static final FixedSizeChunkIndex CHUNK_INDEX =
39+
new FixedSizeChunkIndex(100, 1000, 110, 110);
40+
static final SecretKey DATA_KEY = new SecretKeySpec(new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, "AES");
41+
static final byte[] AAD = {10, 11, 12, 13};
42+
static final SegmentIndexesV1 SEGMENT_INDEXES = new SegmentIndexesV1Builder()
43+
.add(RemoteStorageManager.IndexType.OFFSET, 1)
44+
.add(RemoteStorageManager.IndexType.TIMESTAMP, 1)
45+
.add(RemoteStorageManager.IndexType.PRODUCER_SNAPSHOT, 1)
46+
.add(RemoteStorageManager.IndexType.LEADER_EPOCH, 1)
47+
.add(RemoteStorageManager.IndexType.TRANSACTION, 1)
48+
.build();
49+
static final RemoteLogSegmentMetadata REMOTE_LOG_SEGMENT_METADATA = new RemoteLogSegmentMetadata(
50+
new RemoteLogSegmentId(
51+
new TopicIdPartition(Uuid.fromString("lZ6vvmajTWKDBUTV6SQAtQ"), 42, "topic1"),
52+
Uuid.fromString("adh9f8BMS4anaUnD8KWfWg")
53+
),
54+
0,
55+
1000L,
56+
1000000000L,
57+
2,
58+
2000000000L,
59+
100500,
60+
Map.of(0, 100L, 1, 200L, 2, 300L)
61+
);
62+
63+
@Test
64+
void minimal() {
65+
final var manifest = SegmentManifestV1.newBuilder(CHUNK_INDEX, SEGMENT_INDEXES).build();
66+
assertThat(manifest.chunkIndex()).isEqualTo(CHUNK_INDEX);
67+
assertThat(manifest.segmentIndexes()).isEqualTo(SEGMENT_INDEXES);
68+
assertThat(manifest.compression()).isFalse();
69+
assertThat(manifest.encryption()).isEmpty();
70+
assertThat(manifest.remoteLogSegmentMetadata()).isNull();
71+
}
72+
73+
@Test
74+
void withRlsm() {
75+
final var manifest = SegmentManifestV1.newBuilder(CHUNK_INDEX, SEGMENT_INDEXES)
76+
.withRlsm(REMOTE_LOG_SEGMENT_METADATA)
77+
.build();
78+
assertThat(manifest.chunkIndex()).isEqualTo(CHUNK_INDEX);
79+
assertThat(manifest.segmentIndexes()).isEqualTo(SEGMENT_INDEXES);
80+
assertThat(manifest.compression()).isFalse();
81+
assertThat(manifest.encryption()).isEmpty();
82+
assertThat(manifest.remoteLogSegmentMetadata()).isEqualTo(REMOTE_LOG_SEGMENT_METADATA);
83+
}
84+
85+
@Test
86+
void withCompressionEnabled() {
87+
final var manifest = SegmentManifestV1.newBuilder(CHUNK_INDEX, SEGMENT_INDEXES)
88+
.withCompressionEnabled(true)
89+
.build();
90+
assertThat(manifest.chunkIndex()).isEqualTo(CHUNK_INDEX);
91+
assertThat(manifest.segmentIndexes()).isEqualTo(SEGMENT_INDEXES);
92+
assertThat(manifest.compression()).isTrue();
93+
assertThat(manifest.encryption()).isEmpty();
94+
assertThat(manifest.remoteLogSegmentMetadata()).isNull();
95+
}
96+
97+
@Test
98+
void withCompressionDisabled() {
99+
final var manifest = SegmentManifestV1.newBuilder(CHUNK_INDEX, SEGMENT_INDEXES)
100+
.withCompressionEnabled(false)
101+
.build();
102+
assertThat(manifest.chunkIndex()).isEqualTo(CHUNK_INDEX);
103+
assertThat(manifest.segmentIndexes()).isEqualTo(SEGMENT_INDEXES);
104+
assertThat(manifest.compression()).isFalse();
105+
assertThat(manifest.encryption()).isEmpty();
106+
assertThat(manifest.remoteLogSegmentMetadata()).isNull();
107+
}
108+
109+
@Test
110+
void withEncryptionKey() {
111+
final var manifest = SegmentManifestV1.newBuilder(CHUNK_INDEX, SEGMENT_INDEXES)
112+
.withEncryptionKey(new DataKeyAndAAD(DATA_KEY, AAD))
113+
.build();
114+
assertThat(manifest.chunkIndex()).isEqualTo(CHUNK_INDEX);
115+
assertThat(manifest.segmentIndexes()).isEqualTo(SEGMENT_INDEXES);
116+
assertThat(manifest.compression()).isFalse();
117+
assertThat(manifest.encryption()).isPresent();
118+
manifest.encryption().ifPresent(segmentEncryptionMetadata -> {
119+
assertThat(segmentEncryptionMetadata.dataKey()).isEqualTo(DATA_KEY);
120+
assertThat(segmentEncryptionMetadata.aad()).isEqualTo(AAD);
121+
});
122+
assertThat(manifest.remoteLogSegmentMetadata()).isNull();
123+
}
124+
125+
@Test
126+
void full() {
127+
final var manifest = SegmentManifestV1.newBuilder(CHUNK_INDEX, SEGMENT_INDEXES)
128+
.withCompressionEnabled(true)
129+
.withEncryptionKey(new DataKeyAndAAD(DATA_KEY, AAD))
130+
.withRlsm(REMOTE_LOG_SEGMENT_METADATA)
131+
.build();
132+
assertThat(manifest.chunkIndex()).isEqualTo(CHUNK_INDEX);
133+
assertThat(manifest.segmentIndexes()).isEqualTo(SEGMENT_INDEXES);
134+
assertThat(manifest.compression()).isTrue();
135+
assertThat(manifest.encryption()).isPresent();
136+
manifest.encryption().ifPresent(segmentEncryptionMetadata -> {
137+
assertThat(segmentEncryptionMetadata.dataKey()).isEqualTo(DATA_KEY);
138+
assertThat(segmentEncryptionMetadata.aad()).isEqualTo(AAD);
139+
});
140+
assertThat(manifest.remoteLogSegmentMetadata()).isEqualTo(REMOTE_LOG_SEGMENT_METADATA);
141+
}
142+
}

0 commit comments

Comments
 (0)