-
Notifications
You must be signed in to change notification settings - Fork 8
Build #2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Build #2
Changes from 8 commits
736056f
73cb083
a44461d
14c9e4f
9305900
e05cf3a
7415c5c
0878b6e
b220902
6c26c63
fec8602
fcd1e6e
ed55fd4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -75,3 +75,4 @@ fabric.properties | |
target | ||
|
||
pom-development.xml | ||
.vscode/settings.json |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,10 +6,12 @@ | |
import cws.k8s.scheduler.dag.Vertex; | ||
import cws.k8s.scheduler.model.SchedulerConfig; | ||
import cws.k8s.scheduler.model.TaskConfig; | ||
import cws.k8s.scheduler.scheduler.NodeLabelAssign; | ||
import cws.k8s.scheduler.scheduler.PrioritizeAssignScheduler; | ||
import cws.k8s.scheduler.scheduler.Scheduler; | ||
import cws.k8s.scheduler.scheduler.prioritize.*; | ||
import cws.k8s.scheduler.scheduler.nodeassign.FairAssign; | ||
import cws.k8s.scheduler.scheduler.nodeassign.LabelAssign; | ||
import cws.k8s.scheduler.scheduler.nodeassign.NodeAssign; | ||
import cws.k8s.scheduler.scheduler.nodeassign.RandomNodeAssign; | ||
import cws.k8s.scheduler.scheduler.nodeassign.RoundRobinAssign; | ||
|
@@ -32,6 +34,8 @@ | |
import java.util.List; | ||
import java.util.Map; | ||
|
||
import com.fasterxml.jackson.databind.ObjectMapper; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please remove this unused import |
||
|
||
@RestController | ||
@Slf4j | ||
@EnableScheduling | ||
|
@@ -99,16 +103,25 @@ ResponseEntity<String> registerScheduler( | |
|
||
Scheduler scheduler; | ||
|
||
ObjectMapper objectMapper = new ObjectMapper(); | ||
t-aretz marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Map<String,String> nodelabel = objectMapper.convertValue(config.additional.get("myconfig"),Map.class); | ||
|
||
if ( schedulerHolder.containsKey( execution ) ) { | ||
return noSchedulerFor( execution ); | ||
} | ||
|
||
switch ( strategy.toLowerCase() ){ | ||
default: { | ||
final String[] split = strategy.split( "-" ); | ||
case "nodelabelassign": | ||
Prioritize prioritize; | ||
NodeAssign labelassign; | ||
NodeAssign assign; | ||
prioritize = new RankMaxPrioritize(); | ||
t-aretz marked this conversation as resolved.
Show resolved
Hide resolved
|
||
labelassign = new LabelAssign(nodelabel); | ||
assign = new FairAssign(); | ||
scheduler = new NodeLabelAssign(execution, client, namespace, config, prioritize, labelassign, assign); | ||
break; | ||
default: { | ||
final String[] split = strategy.split( "-" ); | ||
if ( split.length <= 2 ) { | ||
switch ( split[0].toLowerCase() ) { | ||
case "fifo": prioritize = new FifoPrioritize(); break; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,97 @@ | ||
package cws.k8s.scheduler.scheduler; | ||
|
||
import cws.k8s.scheduler.model.*; | ||
import cws.k8s.scheduler.scheduler.prioritize.Prioritize; | ||
import cws.k8s.scheduler.client.Informable; | ||
import cws.k8s.scheduler.client.KubernetesClient; | ||
import cws.k8s.scheduler.scheduler.nodeassign.NodeAssign; | ||
import cws.k8s.scheduler.util.NodeTaskAlignment; | ||
import lombok.extern.slf4j.Slf4j; | ||
|
||
import java.util.*; | ||
import java.util.stream.Collector; | ||
import java.util.stream.Collectors; | ||
|
||
@Slf4j | ||
public class NodeLabelAssign extends Scheduler { | ||
|
||
private final Prioritize prioritize; | ||
private final NodeAssign nodeAssigner; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe name this defaultNodeAssigner or fallbackNodeAssigner to clearify its function |
||
private final NodeAssign nodeLabelAssigner; | ||
|
||
public NodeLabelAssign( String execution, | ||
KubernetesClient client, | ||
String namespace, | ||
SchedulerConfig config, | ||
Prioritize prioritize, | ||
NodeAssign nodeLabelAssigner, | ||
NodeAssign nodeAssigner ) { | ||
super(execution, client, namespace, config); | ||
this.prioritize = prioritize; | ||
this.nodeLabelAssigner = nodeLabelAssigner; | ||
this.nodeAssigner = nodeAssigner; | ||
nodeAssigner.registerScheduler( this ); | ||
if ( nodeAssigner instanceof Informable ){ | ||
client.addInformable( (Informable) nodeAssigner ); | ||
} | ||
} | ||
|
||
@Override | ||
public void close() { | ||
super.close(); | ||
if ( nodeAssigner instanceof Informable ){ | ||
client.removeInformable( (Informable) nodeAssigner ); | ||
} | ||
} | ||
|
||
@Override | ||
public ScheduleObject getTaskNodeAlignment( | ||
final List<Task> unscheduledTasks, | ||
final Map<NodeWithAlloc, Requirements> availableByNode | ||
){ | ||
long start = System.currentTimeMillis(); | ||
if ( traceEnabled ) { | ||
int index = 1; | ||
for ( Task unscheduledTask : unscheduledTasks ) { | ||
unscheduledTask.getTraceRecord().setSchedulerPlaceInQueue( index++ ); | ||
} | ||
} | ||
prioritize.sortTasks( unscheduledTasks ); | ||
|
||
// print Tasks | ||
System.out.println("Tasks before Label Alignment"); | ||
t-aretz marked this conversation as resolved.
Show resolved
Hide resolved
|
||
unscheduledTasks.stream().map(obj -> obj.getConfig().getName()).forEach(System.out::println); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do not log this in stable version There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Now you use Stdout. |
||
|
||
// first alignemnt | ||
List<NodeTaskAlignment> alignmentLabelAssign = nodeLabelAssigner.getTaskNodeAlignment(unscheduledTasks, availableByNode); | ||
List<String> namesList = alignmentLabelAssign.stream().map(obj -> obj.task.getConfig().getName()).collect(Collectors.toList()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. listOfUnassignedTasks There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it is better to make variable alignmentLabelAssign from type NodeLabelAssign and then store a list of unscheduled tasks and fetch it here. Then you don't need to calculate this here again. |
||
System.out.println(namesList.toString()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do not log this |
||
|
||
|
||
List<Task> filteredTasks = new LinkedList<>(); | ||
|
||
for (final Task task : unscheduledTasks) { | ||
if (!namesList.contains(task.getConfig().getName())) { | ||
filteredTasks.add(task); | ||
} | ||
} | ||
|
||
// print Tasks | ||
System.out.println("Tasks after Label Alignment"); | ||
filteredTasks.stream().map(obj -> obj.getConfig().getName()).forEach(System.out::println); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How rare are these events? I would not log this on a regular base. |
||
|
||
// second alignemnt | ||
List<NodeTaskAlignment> alignment = nodeAssigner.getTaskNodeAlignment(filteredTasks, availableByNode); | ||
Lehmann-Fabian marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
|
||
alignmentLabelAssign.addAll(alignment); | ||
long timeDelta = System.currentTimeMillis() - start; | ||
for ( Task unscheduledTask : unscheduledTasks ) { | ||
unscheduledTask.getTraceRecord().setSchedulerTimeToSchedule( (int) timeDelta ); | ||
} | ||
|
||
final ScheduleObject scheduleObject = new ScheduleObject(alignmentLabelAssign); | ||
scheduleObject.setCheckStillPossible( false ); | ||
return scheduleObject; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -152,7 +152,7 @@ public boolean validSchedulePlan( List<NodeTaskAlignment> taskNodeAlignment ){ | |
return true; | ||
} | ||
|
||
abstract ScheduleObject getTaskNodeAlignment( | ||
public abstract ScheduleObject getTaskNodeAlignment( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why making this public? |
||
final List<Task> unscheduledTasks, | ||
final Map<NodeWithAlloc, Requirements> availableByNode | ||
); | ||
|
@@ -468,6 +468,7 @@ Map<NodeWithAlloc, Requirements> getAvailableByNode(){ | |
} | ||
logInfo.add("------------------------------------"); | ||
log.info(String.join("\n", logInfo)); | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you can leave this class as it was. |
||
return availableByNode; | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
package cws.k8s.scheduler.scheduler.nodeassign; | ||
|
||
import cws.k8s.scheduler.model.NodeWithAlloc; | ||
import cws.k8s.scheduler.model.PodWithAge; | ||
import cws.k8s.scheduler.model.Requirements; | ||
import cws.k8s.scheduler.model.Task; | ||
import cws.k8s.scheduler.util.NodeTaskAlignment; | ||
import lombok.extern.slf4j.Slf4j; | ||
|
||
import java.util.*; | ||
|
||
@Slf4j | ||
public class LabelAssign extends NodeAssign { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please name this NodeLabelAssign to show it is a node assigner |
||
|
||
public Map<String, String> nodelabel; | ||
|
||
public LabelAssign( | ||
final Map<String, String> nodelabel | ||
){ | ||
this.nodelabel = nodelabel; | ||
} | ||
|
||
@Override | ||
public List<NodeTaskAlignment> getTaskNodeAlignment( List<Task> unscheduledTasks, Map<NodeWithAlloc, Requirements> availableByNode ) { | ||
LinkedList<NodeTaskAlignment> alignment = new LinkedList<>(); | ||
final ArrayList<Map.Entry<NodeWithAlloc, Requirements>> entries = new ArrayList<>( availableByNode.entrySet() ); | ||
t-aretz marked this conversation as resolved.
Show resolved
Hide resolved
|
||
for ( final Task task : unscheduledTasks ) { | ||
|
||
String taskName = null; | ||
String taskLabel = null; | ||
|
||
try { | ||
taskName = task.getConfig().getName(); | ||
taskLabel = taskName.split("~")[1]; | ||
// ~ is used for a special case in which subtasks from one process in nextflow are generated | ||
// the labels in the nextflow config have to be named like this: ~label~ | ||
|
||
log.info("Label for task: " + task.getConfig().getName() + " : " + taskLabel); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do not log this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please change to debug level or remove |
||
} catch ( Exception e ){ | ||
log.warn( "Cannot find a label for task: " + task.getConfig().getName(), e ); | ||
continue; | ||
} | ||
|
||
final PodWithAge pod = task.getPod(); | ||
log.info("Pod: " + pod.getName() + " Requested Resources: " + pod.getRequest() ); | ||
t-aretz marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
if(nodelabel.containsKey(taskLabel)){ | ||
String nodeName = nodelabel.get(taskLabel); | ||
|
||
for ( Map.Entry<NodeWithAlloc, Requirements> e : entries ) { | ||
final NodeWithAlloc node = e.getKey(); | ||
|
||
if(nodeName.equals(node.getName())){ | ||
System.out.println("Aligned Pod to node: " + node.getName()); | ||
alignment.add( new NodeTaskAlignment( node, task ) ); | ||
availableByNode.get( node ).subFromThis(pod.getRequest()); | ||
log.info("--> " + node.getName()); | ||
task.getTraceRecord().foundAlignment(); | ||
break; | ||
} | ||
} | ||
} else | ||
{ | ||
t-aretz marked this conversation as resolved.
Show resolved
Hide resolved
|
||
log.info( "Task Label: " + taskLabel + " doesn't exist in config file."); | ||
Lehmann-Fabian marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} | ||
return alignment; | ||
} | ||
} | ||
|
Uh oh!
There was an error while loading. Please reload this page.