Skip to content

Commit d26928c

Browse files
FinalizerThread for copyTasks
Signed-off-by: Lehmann_Fabian <[email protected]>
1 parent f2c9bb9 commit d26928c

File tree

2 files changed

+56
-5
lines changed

2 files changed

+56
-5
lines changed
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package cws.k8s.scheduler.scheduler;
2+
3+
import lombok.RequiredArgsConstructor;
4+
5+
import java.util.LinkedList;
6+
import java.util.Queue;
7+
import java.util.function.Consumer;
8+
9+
@RequiredArgsConstructor
10+
public class FinalizerThread<T> extends Thread {
11+
12+
private final Queue<T> items = new LinkedList<>();
13+
private final Consumer <? super T> finalizeItem;
14+
15+
@Override
16+
public void run() {
17+
while ( !Thread.currentThread().isInterrupted() ) {
18+
synchronized ( finalizeItem ) {
19+
while ( !items.isEmpty() ) {
20+
try {
21+
finalizeItem.accept( items.poll() );
22+
} catch ( Exception e ) {
23+
System.err.println( "Error finalizing item: " + e.getMessage() );
24+
}
25+
}
26+
try {
27+
finalizeItem.wait();
28+
} catch ( InterruptedException e ) {
29+
Thread.currentThread().interrupt();
30+
System.err.println( "Finalizer thread interrupted: " + e.getMessage() );
31+
}
32+
}
33+
}
34+
}
35+
36+
public void addItem( T item ) {
37+
synchronized ( finalizeItem ) {
38+
items.add( item );
39+
finalizeItem.notify();
40+
}
41+
}
42+
43+
}

src/main/java/cws/k8s/scheduler/scheduler/LocationAwareSchedulerV2.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ public class LocationAwareSchedulerV2 extends SchedulerWithDaemonSet {
6767
protected final int copySameTaskInParallel;
6868
protected final int maxHeldCopyTaskReady;
6969

70+
private final FinalizerThread<Tuple<CopyTask, Boolean>> finalizerThread;
71+
7072
/**
7173
* This value must be between 1 and 100.
7274
* 100 means that the data will be copied with full speed.
@@ -127,6 +129,8 @@ public LocationAwareSchedulerV2(
127129
this.copyInAdvance = new CopyInAdvanceNodeWithMostData( getCurrentlyCopying(), inputAlignment, this.copySameTaskInParallel );
128130
this.maxHeldCopyTaskReady = config.maxHeldCopyTaskReady == null ? 3 : config.maxHeldCopyTaskReady;
129131
this.prioPhaseThree = config.prioPhaseThree == null ? 70 : config.prioPhaseThree;
132+
finalizerThread = new FinalizerThread<>( x -> processCopyTaskFinished( x.getA(), x.getB() ) );
133+
finalizerThread.start();
130134
}
131135

132136
@Override
@@ -283,15 +287,19 @@ private void undoReserveCopyTask( CopyTask copyTask ) {
283287
}
284288

285289
public void copyTaskFinished( CopyTask copyTask, boolean success ) {
290+
finalizerThread.addItem( new Tuple<CopyTask, Boolean>(copyTask, success) );
291+
}
292+
293+
private void processCopyTaskFinished( CopyTask copyTask, boolean success ) {
286294
synchronized ( copyLock ) {
287295
freeLocations( copyTask.getAllLocationWrapper() );
288296
if( success ){
289-
copyTask.getInputFiles().parallelStream().forEach( TaskInputFileLocationWrapper::success );
290-
removeFromCopyingToNode( copyTask.getTask(), copyTask.getNodeLocation(), copyTask.getFilesForCurrentNode() );
291-
copyTask.getTask().preparedOnNode( copyTask.getNodeLocation() );
297+
copyTask.getInputFiles().parallelStream().forEach( TaskInputFileLocationWrapper::success );
298+
removeFromCopyingToNode( copyTask.getTask(), copyTask.getNodeLocation(), copyTask.getFilesForCurrentNode() );
299+
copyTask.getTask().preparedOnNode( copyTask.getNodeLocation() );
292300
} else {
293-
removeFromCopyingToNode( copyTask.getTask(), copyTask.getNodeLocation(), copyTask.getFilesForCurrentNode() );
294-
handleProblematicCopy( copyTask );
301+
removeFromCopyingToNode( copyTask.getTask(), copyTask.getNodeLocation(), copyTask.getFilesForCurrentNode() );
302+
handleProblematicCopy( copyTask );
295303
}
296304
}
297305
}

0 commit comments

Comments
 (0)