diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java b/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java index dc16aa477..8600fb597 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java @@ -56,7 +56,6 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; -import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/BlockBatch.java b/s3stream/src/main/java/com/automq/stream/s3/wal/BlockBatch.java index a7359a978..ef28ebf86 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/BlockBatch.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/BlockBatch.java @@ -19,7 +19,7 @@ import java.util.Collection; import java.util.Collections; -import java.util.List; +import java.util.Iterator; import java.util.concurrent.CompletableFuture; public class BlockBatch { @@ -53,12 +53,30 @@ public Collection blocks() { return Collections.unmodifiableCollection(blocks); } - public List> futures() { - return blocks.stream() - .map(Block::futures) - .flatMap(List::stream) - .toList(); + public Iterator> futures() { + return new Iterator<>() { + private final Iterator blockIterator = blocks.iterator(); + private Iterator> futureIterator = blockIterator.next().futures().iterator(); + @Override + public boolean hasNext() { + if (futureIterator.hasNext()) { + return true; + } else { + if (blockIterator.hasNext()) { + futureIterator = blockIterator.next().futures().iterator(); + return hasNext(); + } else { + return false; + } + } + } + + @Override + public CompletableFuture next() { + return futureIterator.next(); + } + }; } public void release() { diff --git a/s3stream/src/main/java/com/automq/stream/utils/FutureUtil.java b/s3stream/src/main/java/com/automq/stream/utils/FutureUtil.java index 1ee5040d0..eeb6f4e1a 100644 --- a/s3stream/src/main/java/com/automq/stream/utils/FutureUtil.java +++ b/s3stream/src/main/java/com/automq/stream/utils/FutureUtil.java @@ -17,7 +17,7 @@ package com.automq.stream.utils; -import java.util.Collection; +import java.util.Iterator; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; @@ -93,14 +93,16 @@ public static Throwable cause(Throwable ex) { return ex; } - public static void completeExceptionally(Collection> futures, Throwable ex) { - for (CompletableFuture future : futures) { + public static void completeExceptionally(Iterator> futures, Throwable ex) { + while (futures.hasNext()) { + CompletableFuture future = futures.next(); future.completeExceptionally(ex); } } - public static void complete(Collection> futures, T value) { - for (CompletableFuture future : futures) { + public static void complete(Iterator> futures, T value) { + while (futures.hasNext()) { + CompletableFuture future = futures.next(); future.complete(value); } }