Skip to content

Commit

Permalink
perf: use Iterator rather than List in BlockBatch.futures (0.20%)
Browse files Browse the repository at this point in the history
Signed-off-by: Ning Yu <[email protected]>
  • Loading branch information
Chillax-0v0 committed Jan 12, 2024
1 parent e7eee29 commit a058058
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 12 deletions.
1 change: 0 additions & 1 deletion s3stream/src/main/java/com/automq/stream/s3/S3Stream.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
30 changes: 24 additions & 6 deletions s3stream/src/main/java/com/automq/stream/s3/wal/BlockBatch.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;

public class BlockBatch {
Expand Down Expand Up @@ -53,12 +53,30 @@ public Collection<Block> blocks() {
return Collections.unmodifiableCollection(blocks);
}

public List<CompletableFuture<WriteAheadLog.AppendResult.CallbackResult>> futures() {
return blocks.stream()
.map(Block::futures)
.flatMap(List::stream)
.toList();
public Iterator<CompletableFuture<WriteAheadLog.AppendResult.CallbackResult>> futures() {
return new Iterator<>() {
private final Iterator<Block> blockIterator = blocks.iterator();
private Iterator<CompletableFuture<WriteAheadLog.AppendResult.CallbackResult>> futureIterator = blockIterator.next().futures().iterator();

@Override
public boolean hasNext() {
if (futureIterator.hasNext()) {
return true;
} else {
if (blockIterator.hasNext()) {
futureIterator = blockIterator.next().futures().iterator();
return hasNext();
} else {
return false;
}
}
}

@Override
public CompletableFuture<WriteAheadLog.AppendResult.CallbackResult> next() {
return futureIterator.next();
}
};
}

public void release() {
Expand Down
12 changes: 7 additions & 5 deletions s3stream/src/main/java/com/automq/stream/utils/FutureUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package com.automq.stream.utils;

import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -93,14 +93,16 @@ public static Throwable cause(Throwable ex) {
return ex;
}

public static <T> void completeExceptionally(Collection<CompletableFuture<T>> futures, Throwable ex) {
for (CompletableFuture<T> future : futures) {
public static <T> void completeExceptionally(Iterator<CompletableFuture<T>> futures, Throwable ex) {
while (futures.hasNext()) {
CompletableFuture<T> future = futures.next();
future.completeExceptionally(ex);
}
}

public static <T> void complete(Collection<CompletableFuture<T>> futures, T value) {
for (CompletableFuture<T> future : futures) {
public static <T> void complete(Iterator<CompletableFuture<T>> futures, T value) {
while (futures.hasNext()) {
CompletableFuture<T> future = futures.next();
future.complete(value);
}
}
Expand Down

0 comments on commit a058058

Please sign in to comment.