Skip to content

Commit 20e6983

Browse files
Limit number of parallel publish operations per node
Signed-off-by: Lehmann_Fabian <[email protected]>
1 parent d1900cf commit 20e6983

File tree

2 files changed

+13
-2
lines changed

2 files changed

+13
-2
lines changed

src/main/java/cws/k8s/scheduler/publishDir/InformPublishFinishedRunnable.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,19 @@
33
import cws.k8s.scheduler.model.location.NodeLocation;
44
import lombok.RequiredArgsConstructor;
55

6+
import java.util.Map;
7+
68
@RequiredArgsConstructor
79
public class InformPublishFinishedRunnable implements Runnable {
810

911
private final PublishExecHolder execHolder;
1012
private final NodeLocation nodeLocation;
13+
private final Map<NodeLocation, Integer> currentPublishJobsPerNode;
1114

1215

1316
@Override
1417
public void run() {
1518
execHolder.finishedOnNode( nodeLocation );
19+
currentPublishJobsPerNode.compute( nodeLocation, ( k, v ) -> v == null ? 0 : v - 1 );
1620
}
1721
}

src/main/java/cws/k8s/scheduler/publishDir/PublishManager.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
import java.nio.file.Path;
1212
import java.util.*;
13+
import java.util.concurrent.ConcurrentHashMap;
1314
import java.util.stream.Collectors;
1415

1516
@Slf4j
@@ -28,6 +29,9 @@ public class PublishManager {
2829
// Used to check if symlink target is already published
2930
private final Map<Path, Set<Path>> publishMap = new HashMap<>();
3031
private final PublishExecHolder execHolder = new PublishExecHolder();
32+
private final Map<NodeLocation, Integer> currentPublishJobsPerNode = new ConcurrentHashMap<>();
33+
// Maximum number of parallel publish jobs per node
34+
private final int MAX_COPY_PER_NODE = 1;
3135

3236
/**
3337
* Maximum number of arguments for the publish command.
@@ -176,6 +180,7 @@ private void publishFiles( List<FileWrapper> items, NodeLocation node ) {
176180
}
177181
execHolder.finishedOnNode( node );
178182
};
183+
currentPublishJobsPerNode.compute( node, ( k, v ) -> v == null ? 1 : v + 1 );
179184
final PublishListener publishListener = new PublishListener( scheduler, name, onFinish );
180185
execHolder.addRunnable( node, () ->
181186
client.execCommand( daemonName, scheduler.getNamespace(), command, publishListener )
@@ -215,7 +220,7 @@ private void createSymlinksIntern( final Symlink[] symlinks, final int start, in
215220

216221
String name = "Copying from node: " + node;
217222
final String daemonName = scheduler.getDaemonNameOnNode( node );
218-
final Runnable onFinish = new InformPublishFinishedRunnable( execHolder, location );
223+
final Runnable onFinish = new InformPublishFinishedRunnable( execHolder, location, currentPublishJobsPerNode );
219224
final PublishListener publishListener = new PublishListener( scheduler, name, onFinish );
220225
execHolder.addRunnable( location, () ->
221226
client.execCommand( daemonName, scheduler.getNamespace(), command, publishListener )
@@ -242,7 +247,9 @@ private void publishFirstX( final Map<NodeLocation, LinkedList<FileWrapper>> nod
242247
for ( Map.Entry<NodeLocation, LinkedList<FileWrapper>> entry : nodeItemsMap.entrySet() ) {
243248
final NodeLocation node = entry.getKey();
244249
LinkedList<FileWrapper> items = entry.getValue();
245-
while ( currentlyCopyingTasksOnNode.getOrDefault( node, 0 ) < maxCopyPerNode && items != null && !items.isEmpty() ) {
250+
while ( currentPublishJobsPerNode.getOrDefault( node, 0 ) < MAX_COPY_PER_NODE
251+
&& currentlyCopyingTasksOnNode.getOrDefault( node, 0 ) < maxCopyPerNode
252+
&& items != null && !items.isEmpty() ) {
246253
currentlyCopyingTasksOnNode.compute( node, ( k, v ) -> v == null ? 1 : v + 1 );
247254
publishFiles( removeUntil( items, maxSize ), node );
248255
}

0 commit comments

Comments
 (0)