Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 3 additions & 12 deletions aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.stream.Collectors;
import org.apache.iceberg.io.FileIOMetricsContext;
import org.apache.iceberg.io.PositionOutputStream;
Expand All @@ -53,9 +51,8 @@
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.io.CountingOutputStream;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.internal.util.Mimetype;
Expand Down Expand Up @@ -112,14 +109,8 @@ class S3OutputStream extends PositionOutputStream {
synchronized (S3OutputStream.class) {
if (executorService == null) {
executorService =
MoreExecutors.getExitingExecutorService(
(ThreadPoolExecutor)
Executors.newFixedThreadPool(
s3FileIOProperties.multipartUploadThreads(),
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("iceberg-s3fileio-upload-%d")
.build()));
ThreadPools.newExitingWorkerPool(
"iceberg-s3fileio-upload", s3FileIOProperties.multipartUploadThreads());
}
}
}
Expand Down
98 changes: 96 additions & 2 deletions core/src/main/java/org/apache/iceberg/util/ThreadPools.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
package org.apache.iceberg.util;

import java.time.Duration;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -29,8 +32,11 @@
import org.apache.iceberg.SystemConfigs;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ThreadPools {
private static final Logger LOG = LoggerFactory.getLogger(ThreadPools.class);

private ThreadPools() {}

Expand All @@ -44,6 +50,9 @@ private ThreadPools() {}

public static final int WORKER_THREAD_POOL_SIZE = SystemConfigs.WORKER_THREAD_POOL_SIZE.value();

private static final ConcurrentLinkedDeque<ExecutorService> THREAD_POOLS_TO_SHUTDOWN =
new ConcurrentLinkedDeque<>();

private static final ExecutorService WORKER_POOL =
newExitingWorkerPool("iceberg-worker-pool", WORKER_THREAD_POOL_SIZE);

Expand All @@ -56,6 +65,14 @@ private ThreadPools() {}
public static final int AUTH_REFRESH_THREAD_POOL_SIZE =
SystemConfigs.AUTH_REFRESH_THREAD_POOL_SIZE.value();

private static final int SHUTDOWN_TIMEOUT_SECONDS = 120;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private static final int SHUTDOWN_TIMEOUT_SECONDS = 120;
private static final Duration SHUTDOWN_TIMEOUT_SECONDS = Duration.ofSeconds(120);


private static Thread shutdownHook;

static {
initShutdownHook();
}

/**
* Return an {@link ExecutorService} that uses the "worker" thread-pool.
*
Expand Down Expand Up @@ -149,8 +166,85 @@ public static ExecutorService newWorkerPool(String namePrefix, int poolSize) {
* that should be automatically cleaned up on JVM shutdown.
*/
public static ExecutorService newExitingWorkerPool(String namePrefix, int poolSize) {
return MoreExecutors.getExitingExecutorService(
(ThreadPoolExecutor) newFixedThreadPool(namePrefix, poolSize));
ExecutorService service =
Executors.unconfigurableExecutorService(newFixedThreadPool(namePrefix, poolSize));
THREAD_POOLS_TO_SHUTDOWN.add(service);
return service;
}

/**
* Force manual shutdown of the thread pools created via the {@link #newExitingWorkerPool(String,
* int)}.
*/
public static void shutdownStartedThreadPools() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public static void shutdownStartedThreadPools() {
public static void shutdownThreadPools() {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think first thing we should remove the corresponding shutdown hook.

long startTime = System.nanoTime();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
long startTime = System.nanoTime();
long startTimeMs = System.currentTimeMillis();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably sufficient to use System.currentTimeMillis() here.

ExecutorService item;
Queue<ExecutorService> invoked = new ArrayDeque<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Queue<ExecutorService> invoked = new ArrayDeque<>();
Queue<ExecutorService> pendingShutdown = new ArrayDeque<>();

while ((item = THREAD_POOLS_TO_SHUTDOWN.poll()) != null) {
item.shutdown();
invoked.add(item);
}
while ((item = invoked.poll()) != null) {
long timeElapsed = System.nanoTime() - startTime;
long remainingTime = SHUTDOWN_TIMEOUT_SECONDS * 1_000_000_000L - timeElapsed;
if (remainingTime > 0) {
Comment on lines +188 to +190
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
long timeElapsed = System.nanoTime() - startTime;
long remainingTime = SHUTDOWN_TIMEOUT_SECONDS * 1_000_000_000L - timeElapsed;
if (remainingTime > 0) {
if (System.currentTimeMillis() - startTimeMs > SHUTDOWN_TIME_OUT.millis()) {
break;
}

try {
item.awaitTermination(remainingTime, TimeUnit.NANOSECONDS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also try this?

Suggested change
item.awaitTermination(remainingTime, TimeUnit.NANOSECONDS);
if (!item.awaitTermination(remainingTime, TimeUnit.NANOSECONDS)) {
item.snapshotNow();
}

} catch (InterruptedException ignored) {
// We're shutting down anyway, so just ignore.
}
}
}
}

/**
* Initialize a shutdown hook to stop the thread pools created via the {@link
* #newExitingWorkerPool(String, int)}.
*/
@SuppressWarnings("ShutdownHook")
private static void initShutdownHook() {
if (shutdownHook == null) {
shutdownHook =
Executors.defaultThreadFactory()
.newThread(
new Runnable() {
@Override
public void run() {
shutdownStartedThreadPools();
}
});

try {
shutdownHook.setName("DelayedShutdownHook-iceberg");
} catch (SecurityException e) {
LOG.warn("Cannot set thread name for the shutdown hook", e);
}

try {
Runtime.getRuntime().addShutdownHook(shutdownHook);
} catch (SecurityException e) {
LOG.warn("Cannot install a shutdown hook for thread pools clean up", e);
}
}
}

/**
* Stop the shutdown hook for the thread pools created via the {@link
* #newExitingWorkerPool(String, int)}.
*
* <p>Thread pools can still be stopped manually via the {@link #shutdownStartedThreadPools()}
* method.
*/
@SuppressWarnings("ShutdownHook")
public static void removeShutdownHook() {
if (shutdownHook != null) {
try {
Runtime.getRuntime().removeShutdownHook(shutdownHook);
} catch (SecurityException e) {
LOG.warn("Cannot remove the shutdown hook for thread pools clean up", e);
}
shutdownHook = null;
}
}

/** Creates a fixed-size thread pool that uses daemon threads. */
Expand Down