Skip to content

Commit ba86fc1

Browse files
chozo99s1ckknutwalkerjjaderberg
committed
Fix getFJPoolWithConcurrency issue
Co-authored-by: Martin Junghanns <[email protected]> Co-Authored-By: Paul Horn <[email protected]> Co-Authored-By: Jonatan Jäderberg <[email protected]>
1 parent 21897bb commit ba86fc1

File tree

7 files changed

+36
-23
lines changed

7 files changed

+36
-23
lines changed

core/src/main/java/org/neo4j/gds/core/concurrency/ParallelUtil.java

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ private ParallelUtil() {}
7676
* The concurrency value is assumed to already be validated towards the edition limitation.
7777
*/
7878
public static <T extends BaseStream<?, T>, R> R parallelStream(T data, int concurrency, Function<T, R> fn) {
79-
ForkJoinPool pool = getFJPoolWithConcurrency(concurrency);
79+
ForkJoinPool pool = Pools.createForkJoinPool(concurrency);
8080
try {
8181
return pool.submit(() -> fn.apply(data.parallel())).get();
8282
} catch (Exception e) {
@@ -1487,20 +1487,4 @@ void pushBack(final T element) {
14871487
pushedElement = element;
14881488
}
14891489
}
1490-
1491-
public static ForkJoinPool getFJPoolWithConcurrency(int concurrency) {
1492-
return new ForkJoinPool(concurrency, forkJoinPoolWorkerThreadFactory, null, false);
1493-
}
1494-
1495-
private static final ForkJoinPool.ForkJoinWorkerThreadFactory forkJoinPoolWorkerThreadFactory;
1496-
1497-
private static final String FORK_JOIN_INFIX = "-forkjoin-";
1498-
1499-
static {
1500-
forkJoinPoolWorkerThreadFactory = pool -> {
1501-
var worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
1502-
worker.setName(Pools.THREAD_NAME_PREFIX + FORK_JOIN_INFIX + worker.getPoolIndex());
1503-
return worker;
1504-
};
1505-
}
15061490
}

core/src/main/java/org/neo4j/gds/core/concurrency/Pools.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.concurrent.ExecutionException;
2929
import java.util.concurrent.ExecutorService;
3030
import java.util.concurrent.Executors;
31+
import java.util.concurrent.ForkJoinPool;
3132
import java.util.concurrent.FutureTask;
3233
import java.util.concurrent.RejectedExecutionHandler;
3334
import java.util.concurrent.ThreadPoolExecutor;
@@ -94,4 +95,14 @@ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
9495
}
9596
}
9697
}
98+
99+
public static final ForkJoinPool.ForkJoinWorkerThreadFactory FJ_WORKER_THREAD_FACTORY = pool -> {
100+
var worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
101+
worker.setName(Pools.THREAD_NAME_PREFIX + "-forkjoin-" + worker.getPoolIndex());
102+
return worker;
103+
};
104+
105+
public static ForkJoinPool createForkJoinPool(int concurrency) {
106+
return new ForkJoinPool(concurrency, FJ_WORKER_THREAD_FACTORY, null, false);
107+
}
97108
}

core/src/main/java/org/neo4j/gds/core/utils/paged/HugeMergeSort.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
package org.neo4j.gds.core.utils.paged;
2121

2222
import org.jetbrains.annotations.Nullable;
23-
import org.neo4j.gds.core.concurrency.ParallelUtil;
23+
import org.neo4j.gds.core.concurrency.Pools;
2424
import org.neo4j.gds.core.utils.mem.AllocationTracker;
2525

2626
import java.util.concurrent.CountedCompleter;
@@ -31,8 +31,12 @@ public final class HugeMergeSort {
3131

3232
public static void sort(HugeLongArray array, int concurrency, AllocationTracker allocationTracker) {
3333
var temp = HugeLongArray.newArray(array.size(), allocationTracker);
34-
var forkJoinPool = ParallelUtil.getFJPoolWithConcurrency(concurrency);
35-
forkJoinPool.invoke(new MergeSortTask(null, array, temp, 0, array.size() - 1));
34+
var forkJoinPool = Pools.createForkJoinPool(concurrency);
35+
try {
36+
forkJoinPool.invoke(new MergeSortTask(null, array, temp, 0, array.size() - 1));
37+
} finally {
38+
forkJoinPool.shutdown();
39+
}
3640
}
3741

3842
static class MergeSortTask extends CountedCompleter<Void> {

pregel/src/main/java/org/neo4j/gds/beta/pregel/ForkJoinComputer.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,4 +80,9 @@ public void runIteration() {
8080
public boolean hasConverged() {
8181
return !sentMessage.get() && voteBits.allSet();
8282
}
83+
84+
@Override
85+
void release() {
86+
forkJoinPool.shutdown();
87+
}
8388
}

pregel/src/main/java/org/neo4j/gds/beta/pregel/PartitionedComputer.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,10 @@
2020
package org.neo4j.gds.beta.pregel;
2121

2222
import org.jetbrains.annotations.NotNull;
23-
import org.neo4j.gds.core.utils.partition.Partition;
2423
import org.neo4j.gds.api.Graph;
2524
import org.neo4j.gds.core.concurrency.ParallelUtil;
2625
import org.neo4j.gds.core.utils.paged.HugeAtomicBitSet;
26+
import org.neo4j.gds.core.utils.partition.Partition;
2727
import org.neo4j.gds.core.utils.partition.PartitionUtils;
2828
import org.neo4j.gds.core.utils.progress.tasks.ProgressTracker;
2929

@@ -83,6 +83,12 @@ public boolean hasConverged() {
8383

8484
}
8585

86+
@Override
87+
void release() {
88+
// Unlike in the sibling ForkJoinComputer, we will not shut down the
89+
// executor service (thread pool), since we use the shared global thread pool.
90+
}
91+
8692
@NotNull
8793
private List<PartitionedComputeStep<CONFIG, ?>> createComputeSteps(HugeAtomicBitSet voteBits) {
8894
Function<Partition, PartitionedComputeStep<CONFIG, ?>> partitionFunction = partition -> new PartitionedComputeStep<>(

pregel/src/main/java/org/neo4j/gds/beta/pregel/Pregel.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import org.immutables.value.Value;
2323
import org.neo4j.gds.api.Graph;
2424
import org.neo4j.gds.beta.pregel.context.MasterComputeContext;
25-
import org.neo4j.gds.core.concurrency.ParallelUtil;
25+
import org.neo4j.gds.core.concurrency.Pools;
2626
import org.neo4j.gds.core.utils.mem.AllocationTracker;
2727
import org.neo4j.gds.core.utils.mem.MemoryEstimation;
2828
import org.neo4j.gds.core.utils.mem.MemoryEstimations;
@@ -149,7 +149,7 @@ private Pregel(
149149
.messenger(messenger)
150150
.voteBits(HugeAtomicBitSet.create(graph.nodeCount(), allocationTracker))
151151
.executorService(config.useForkJoin()
152-
? ParallelUtil.getFJPoolWithConcurrency(config.concurrency())
152+
? Pools.createForkJoinPool(config.concurrency())
153153
: executor)
154154
.progressTracker(progressTracker)
155155
.build();
@@ -194,6 +194,7 @@ public PregelResult run() {
194194
.build();
195195
} finally {
196196
progressTracker.endSubTask();
197+
computer.release();
197198
}
198199
}
199200

pregel/src/main/java/org/neo4j/gds/beta/pregel/PregelComputer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ abstract class PregelComputer<CONFIG extends PregelConfig> {
6464

6565
abstract boolean hasConverged();
6666

67+
abstract void release();
68+
6769
static <CONFIG extends PregelConfig> ComputerBuilder<CONFIG> builder() {
6870
return new ComputerBuilder<>();
6971
}

0 commit comments

Comments
 (0)