Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Build #2

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,4 @@ fabric.properties
target

pom-development.xml
.vscode/settings.json
9 changes: 4 additions & 5 deletions src/main/java/cws/k8s/scheduler/model/SchedulerConfig.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package cws.k8s.scheduler.model;

import com.fasterxml.jackson.databind.JsonNode;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.ToString;

import java.util.List;
import java.util.Map;

@ToString
@NoArgsConstructor(access = AccessLevel.PRIVATE,force = true)
Expand All @@ -17,10 +19,7 @@ public class SchedulerConfig {
public final String namespace;
public final String costFunction;
public final String strategy;

public final Integer maxCopyTasksPerNode;

public final Integer maxWaitingCopyTasksPerNode;
public final Map<String, JsonNode> additional;

@ToString
@NoArgsConstructor(access = AccessLevel.PRIVATE,force = true)
Expand All @@ -30,4 +29,4 @@ public static class VolumeClaim {
public final String subPath;
}

}
}
2 changes: 2 additions & 0 deletions src/main/java/cws/k8s/scheduler/model/TaskInput.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package cws.k8s.scheduler.model;

import lombok.AccessLevel;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.RequiredArgsConstructor;

import java.util.List;

@NoArgsConstructor( access = AccessLevel.PRIVATE, force = true )
@Getter
/**
* Only for testing
*/
Expand Down
15 changes: 13 additions & 2 deletions src/main/java/cws/k8s/scheduler/rest/SchedulerRestController.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
import cws.k8s.scheduler.dag.Vertex;
import cws.k8s.scheduler.model.SchedulerConfig;
import cws.k8s.scheduler.model.TaskConfig;
import cws.k8s.scheduler.scheduler.NodeLabelAssign;
import cws.k8s.scheduler.scheduler.PrioritizeAssignScheduler;
import cws.k8s.scheduler.scheduler.Scheduler;
import cws.k8s.scheduler.scheduler.prioritize.*;
import cws.k8s.scheduler.scheduler.nodeassign.FairAssign;
import cws.k8s.scheduler.scheduler.nodeassign.LabelAssign;
import cws.k8s.scheduler.scheduler.nodeassign.NodeAssign;
import cws.k8s.scheduler.scheduler.nodeassign.RandomNodeAssign;
import cws.k8s.scheduler.scheduler.nodeassign.RoundRobinAssign;
Expand All @@ -32,6 +34,8 @@
import java.util.List;
import java.util.Map;

import com.fasterxml.jackson.databind.ObjectMapper;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove this unused import


@RestController
@Slf4j
@EnableScheduling
Expand Down Expand Up @@ -99,16 +103,23 @@ ResponseEntity<String> registerScheduler(

Scheduler scheduler;

// ObjectMapper objetMapper = new ObjectMapper();
// Map<String,String> nodelabel = objectMapper.convertValue(config.additional.get("tasklabelconfig"),Map.class);
Comment on lines +106 to +107
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this


if ( schedulerHolder.containsKey( execution ) ) {
return noSchedulerFor( execution );
}


Prioritize prioritize;
NodeAssign assign;

switch ( strategy.toLowerCase() ){
case "nodelabelassign":
scheduler = new NodeLabelAssign(execution, client, namespace, config);
break;
default: {
final String[] split = strategy.split( "-" );
Prioritize prioritize;
NodeAssign assign;
Comment on lines -110 to -111
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can undo this change.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or make prioritize and assign configurable

if ( split.length <= 2 ) {
switch ( split[0].toLowerCase() ) {
case "fifo": prioritize = new FifoPrioritize(); break;
Expand Down
97 changes: 97 additions & 0 deletions src/main/java/cws/k8s/scheduler/scheduler/NodeLabelAssign.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package cws.k8s.scheduler.scheduler;

import cws.k8s.scheduler.model.*;
import cws.k8s.scheduler.scheduler.prioritize.Prioritize;
import cws.k8s.scheduler.scheduler.prioritize.RankMaxPrioritize;
import cws.k8s.scheduler.client.Informable;
import cws.k8s.scheduler.client.KubernetesClient;
import cws.k8s.scheduler.scheduler.nodeassign.LabelAssign;
import cws.k8s.scheduler.scheduler.nodeassign.NodeAssign;
import cws.k8s.scheduler.scheduler.nodeassign.FairAssign;
import cws.k8s.scheduler.util.NodeTaskAlignment;
import lombok.extern.slf4j.Slf4j;

import java.util.*;
import java.util.stream.Collector;
import java.util.stream.Collectors;

@Slf4j
public class NodeLabelAssign extends Scheduler {

private final Prioritize prioritize;
private final NodeAssign nodeAssigner;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe name this defaultNodeAssigner or fallbackNodeAssigner to clearify its function

private final NodeAssign nodeLabelAssigner;

public NodeLabelAssign(String execution, KubernetesClient client, String namespace, SchedulerConfig config) {
this(execution, client, namespace, config, new RankMaxPrioritize(), new LabelAssign(config), new FairAssign());
}

public NodeLabelAssign( String execution,
KubernetesClient client,
String namespace,
SchedulerConfig config,
Prioritize prioritize,
NodeAssign nodeLabelAssigner,
NodeAssign nodeAssigner ) {
super(execution, client, namespace, config);
this.prioritize = (prioritize != null) ? prioritize : new RankMaxPrioritize();
this.nodeLabelAssigner = (nodeLabelAssigner != null) ? nodeLabelAssigner : new LabelAssign(config);
this.nodeAssigner = (nodeAssigner != null) ? nodeAssigner : new FairAssign();
nodeAssigner.registerScheduler( this );
if ( nodeAssigner instanceof Informable ){
client.addInformable( (Informable) nodeAssigner );
}
}

@Override
public void close() {
super.close();
if ( nodeAssigner instanceof Informable ){
client.removeInformable( (Informable) nodeAssigner );
}
}

@Override
public ScheduleObject getTaskNodeAlignment(
final List<Task> unscheduledTasks,
final Map<NodeWithAlloc, Requirements> availableByNode
){
long start = System.currentTimeMillis();
if ( traceEnabled ) {
int index = 1;
for ( Task unscheduledTask : unscheduledTasks ) {
unscheduledTask.getTraceRecord().setSchedulerPlaceInQueue( index++ );
}
}
prioritize.sortTasks( unscheduledTasks );


unscheduledTasks.stream().map(obj -> obj.getConfig().getName()).forEach(System.out::println);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not log this in stable version

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now you use Stdout.
Do not log this at all, or log at debug level if you need


// first alignemnt (LabelAssign)
List<NodeTaskAlignment> alignmentLabelAssign = nodeLabelAssigner.getTaskNodeAlignment(unscheduledTasks, availableByNode);
List<String> namesList = alignmentLabelAssign.stream().map(obj -> obj.task.getConfig().getName()).collect(Collectors.toList());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

listOfUnassignedTasks

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is better to make variable alignmentLabelAssign from type NodeLabelAssign and then store a list of unscheduled tasks and fetch it here. Then you don't need to calculate this here again.


List<Task> filteredTasks = new LinkedList<>();

for (final Task task : unscheduledTasks) {
if (!namesList.contains(task.getConfig().getName())) {
filteredTasks.add(task);
}
}

// second alignemnt (FairAssign)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

alignemnt -> alignment
Please add a more informative description:
Second Alignment: Here, all nodes are assigned that do not have a label.
This is not necessarily a FairAssign (The constructor can be called with different strategies)

List<NodeTaskAlignment> alignment = nodeAssigner.getTaskNodeAlignment(filteredTasks, availableByNode);


alignmentLabelAssign.addAll(alignment);
long timeDelta = System.currentTimeMillis() - start;
for ( Task unscheduledTask : unscheduledTasks ) {
unscheduledTask.getTraceRecord().setSchedulerTimeToSchedule( (int) timeDelta );
}

final ScheduleObject scheduleObject = new ScheduleObject(alignmentLabelAssign);
scheduleObject.setCheckStillPossible( false );
return scheduleObject;
}
}
3 changes: 2 additions & 1 deletion src/main/java/cws/k8s/scheduler/scheduler/Scheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public boolean validSchedulePlan( List<NodeTaskAlignment> taskNodeAlignment ){
return true;
}

abstract ScheduleObject getTaskNodeAlignment(
public abstract ScheduleObject getTaskNodeAlignment(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why making this public?

final List<Task> unscheduledTasks,
final Map<NodeWithAlloc, Requirements> availableByNode
);
Expand Down Expand Up @@ -468,6 +468,7 @@ Map<NodeWithAlloc, Requirements> getAvailableByNode(){
}
logInfo.add("------------------------------------");
log.info(String.join("\n", logInfo));

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can leave this class as it was.

return availableByNode;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package cws.k8s.scheduler.scheduler.nodeassign;

import cws.k8s.scheduler.model.NodeWithAlloc;
import cws.k8s.scheduler.model.PodWithAge;
import cws.k8s.scheduler.model.Requirements;
import cws.k8s.scheduler.model.Task;
import cws.k8s.scheduler.model.SchedulerConfig;
import cws.k8s.scheduler.util.NodeTaskAlignment;
import lombok.extern.slf4j.Slf4j;

import com.fasterxml.jackson.databind.ObjectMapper;

import java.util.*;

@Slf4j
public class LabelAssign extends NodeAssign {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please name this NodeLabelAssign to show it is a node assigner


final SchedulerConfig config;

public LabelAssign(
final SchedulerConfig config
){
this.config = config;
}
Comment on lines +20 to +24
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use the lombok constructor


@Override
public List<NodeTaskAlignment> getTaskNodeAlignment( List<Task> unscheduledTasks, Map<NodeWithAlloc, Requirements> availableByNode ) {

// get the node-label map
ObjectMapper objectMapper = new ObjectMapper();
Map<String,String> nodelabel = objectMapper.convertValue(config.additional.get("tasklabelconfig"),Map.class);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you force to create a map Map<String,String>. Doesn't it create a Map<Object,Object> here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this will fail with a Map containing Objects. Please write a test case.


LinkedList<NodeTaskAlignment> alignment = new LinkedList<>();
// final ArrayList<Map.Entry<NodeWithAlloc, Requirements>> entries = new ArrayList<>( availableByNode.entrySet() );
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove

for ( final Task task : unscheduledTasks ) {

if ( nodelabel == null ){
log.warn("No tasklabelconfig exist in the nextflow.config file. Define a tasklabelconfig or use another scheduling strategy.");
break;
}

String taskName = null;
String taskLabel = null;

try {
taskName = task.getConfig().getName();
taskLabel = taskName.split("~")[1];
// ~ is used for a special case in which subtasks from one process in nextflow are generated
// the labels in the nextflow config have to be named like this: ~label~

log.info("Label for task: " + task.getConfig().getName() + " : " + taskLabel);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not log this

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please change to debug level or remove

} catch ( Exception e ){
log.warn( "Cannot find a label for task: " + task.getConfig().getName(), e );
continue;
}

final PodWithAge pod = task.getPod();
// log.info("Pod: " + pod.getName() + " Requested Resources: " + pod.getRequest() );
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove


if(nodelabel.containsKey(taskLabel)){
String nodeName = nodelabel.get(taskLabel);

for ( Map.Entry<NodeWithAlloc, Requirements> e : availableByNode.entrySet() ) {
final NodeWithAlloc node = e.getKey();

if(nodeName.equals(node.getName())){
log.info("Aligned Pod to node: " + node.getName());
alignment.add( new NodeTaskAlignment( node, task ) );
availableByNode.get( node ).subFromThis(pod.getRequest());
log.info("--> " + node.getName());
task.getTraceRecord().foundAlignment();
break;
}
}
Comment on lines +63 to +74
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please request this by the key and do not iterate the Map

} else {
log.warn( "Task Label: " + taskLabel + " does not exist in config file.");
}
}
return alignment;
}
}