Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ build/
rpm/
rpmbuild/
*.sh
io.*/
9 changes: 9 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,12 @@ docker_image: build/distributions/tiered-storage-for-apache-kafka-$(VERSION).tgz
.PHONY: docker_push
docker_push:
docker push $(IMAGE_TAG)

bench_prep:
sudo sh -c 'echo 1 >/proc/sys/kernel/perf_event_paranoid'
sudo sh -c 'echo 0 >/proc/sys/kernel/kptr_restrict'

BENCH=io.aiven.kafka.tieredstorage.benchs.transform.TransformBench

bench_run:
java -XX:+UnlockDiagnosticVMOptions -XX:+DebugNonSafepoints -cp "benchmarks/build/install/benchmarks/*" $(BENCH)
28 changes: 28 additions & 0 deletions benchmarks/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Benchmarks

## How to run

> from https://www.baeldung.com/java-async-profiler

Enable Kernel configs:

```shell
sudo sh -c 'echo 1 >/proc/sys/kernel/perf_event_paranoid'
sudo sh -c 'echo 0 >/proc/sys/kernel/kptr_restrict'
```

set `LD_LIBRARY_PATH` environment variable with async-profile build directory:

```shell
export LD_LIBRARY_PATH=/opt/async-profiler-2.9-linux-x64/build/
```

```shell
./gradlew benchmarks:installDist
```

Run benchmark:

```shell
java -XX:+UnlockDiagnosticVMOptions -XX:+DebugNonSafepoints -cp "benchmarks/build/install/benchmarks/*" io.aiven.kafka.tieredstorage.benchs.transform.DetransformBench > results.txt 2>&1
```
36 changes: 36 additions & 0 deletions benchmarks/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2021 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

ext {
jmhVersion = "1.36"
}

dependencies {
implementation project(':core')
implementation group: "org.apache.kafka", name: "kafka-storage-api", version: kafkaVersion
implementation group: "org.apache.kafka", name: "kafka-clients", version: kafkaVersion

implementation "org.openjdk.jmh:jmh-core:$jmhVersion"
implementation "org.openjdk.jmh:jmh-core-benchmarks:$jmhVersion"
annotationProcessor "org.openjdk.jmh:jmh-generator-annprocess:$jmhVersion"

implementation project(":storage:s3")
implementation "org.testcontainers:localstack:$testcontainersVersion"
implementation "com.amazonaws:aws-java-sdk-s3:1.12.418"
implementation "javax.xml.bind:jaxb-api:2.3.1"

implementation "org.slf4j:slf4j-log4j12:1.7.36"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
/*
* Copyright 2023 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.tieredstorage.benchs.storage.s3;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.security.SecureRandom;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

import io.aiven.kafka.tieredstorage.storage.s3.S3MultiPartOutputStream;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PartETag;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.UploadPartRequest;
import com.amazonaws.services.s3.model.UploadPartResult;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;

@State(Scope.Benchmark)
@Fork(value = 1)
@Warmup(iterations = 2)
@Measurement(iterations = 3)
@BenchmarkMode({Mode.SampleTime})
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public abstract class S3UploadBench {

static final String BUCKET_NAME = "kafka-ts-benchmark-test";
public static final String OBJECT_PREFIX = "topic/partition";
static final String OBJECT_KEY = OBJECT_PREFIX + "/log";

static AmazonS3 s3Client;
static Path segmentPath;
@Param({"209715200", "524288000", "1073741824"})
public int contentLength; // 200MiB, 500MiB, 1GiB

@Param({"5242880", "10485760", "83886080"})
public int partSize; // 5MiB, 10MiB, 80MiB

abstract AmazonS3 s3();

public void setup() {
try {
segmentPath = Files.createTempFile("segment", ".log");
// to fill with random bytes.
final SecureRandom secureRandom = new SecureRandom();
try (final var out = Files.newOutputStream(segmentPath)) {
final byte[] bytes = new byte[contentLength];
secureRandom.nextBytes(bytes);
out.write(bytes);
}
} catch (final IOException e) {
throw new RuntimeException(e);
}
}

public void teardown() {
try {
Files.deleteIfExists(segmentPath);
} catch (final IOException e) {
throw new RuntimeException(e);
}
}

@Benchmark
public Object multiPartUploadBenchmark() {
final var key = OBJECT_KEY + "_" + Instant.now().toString();
try (final var out = new S3MultiPartOutputStream(BUCKET_NAME, key, partSize, s3Client);
final InputStream inputStream = Files.newInputStream(segmentPath)) {
inputStream.transferTo(out);
} catch (final AmazonS3Exception | IOException e) {
throw new RuntimeException("Failed to upload " + OBJECT_KEY, e);
}
return new Object();
}

@Benchmark
public Object putObjectBenchmark() {
final ObjectMetadata metadata = new ObjectMetadata();
metadata.setContentLength(contentLength);
try (final InputStream in = Files.newInputStream(segmentPath)) {
final var key = OBJECT_KEY + "_" + Instant.now().toString();
final PutObjectRequest offsetPutRequest = new PutObjectRequest(
BUCKET_NAME,
key,
in,
metadata);
return s3Client.putObject(offsetPutRequest);
} catch (final IOException e) {
throw new RuntimeException(e);
}
}

@Benchmark
public Object blockingMultiPartUpload() {
final List<PartETag> partETags = new ArrayList<>();
final var key = OBJECT_KEY + "_" + Instant.now().toString();
final InitiateMultipartUploadRequest multipartRequest =
new InitiateMultipartUploadRequest(BUCKET_NAME, key);
InitiateMultipartUploadResult initiated = null;
try {
initiated = s3Client.initiateMultipartUpload(multipartRequest);
long partSize = this.partSize;
long filePosition = 0;
final File file = segmentPath.toFile();
for (int i = 1; filePosition < contentLength; i++) {
// Because the last part could be less than 5 MB, adjust the part size as needed.
partSize = Math.min(partSize, contentLength - filePosition);

// Create the request to upload a part.
final UploadPartRequest uploadRequest = new UploadPartRequest()
.withBucketName(BUCKET_NAME)
.withKey(key)
.withUploadId(initiated.getUploadId())
.withPartNumber(i)
.withFileOffset(filePosition)
.withFile(file)
.withPartSize(partSize);

// Upload the part and add the response's ETag to our list.
final UploadPartResult uploadResult = s3Client.uploadPart(uploadRequest);
partETags.add(uploadResult.getPartETag());

filePosition += partSize;
}
// Complete the multipart upload.
final CompleteMultipartUploadRequest compRequest = new CompleteMultipartUploadRequest(
BUCKET_NAME,
key,
initiated.getUploadId(),
partETags
);
return s3Client.completeMultipartUpload(compRequest);
} catch (final Exception e) {
if (initiated != null) {
final AbortMultipartUploadRequest abortRequest =
new AbortMultipartUploadRequest(BUCKET_NAME, key, initiated.getUploadId());
s3Client.abortMultipartUpload(abortRequest);
}
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright 2023 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.tieredstorage.benchs.storage.s3;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.profile.AsyncProfiler;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.options.OptionsBuilder;

public class S3UploadWithAwsS3Bench extends S3UploadBench {

@Override
AmazonS3 s3() {
return AmazonS3ClientBuilder
.standard()
.build();
}

@Setup(Level.Trial)
public void setup() {
s3Client = s3();

super.setup();
}

@TearDown(Level.Trial)
public void teardown() {
super.teardown();
}


public static void main(final String[] args) throws Exception {
final String event;
if (args.length > 0 && args[0].equals("alloc")) {
event = "event=alloc";
} else {
event = "event=cpu";
}

new Runner(new OptionsBuilder()
.include(S3UploadWithAwsS3Bench.class.getSimpleName())
.addProfiler(AsyncProfiler.class, "-output=flamegraph;" + event)
.build()).run();
}
}
Loading