Skip to content
This repository was archived by the owner on Mar 31, 2023. It is now read-only.

Commit 76902c7

Browse files
author
tbak
authored
ENI fitness (#173)
* Add pluggable ENI fitness evaluator (#170) * Make the number of threads in TaskScheduler configurable
1 parent 30b1874 commit 76902c7

10 files changed

+351
-124
lines changed

fenzo-core/src/main/java/com/netflix/fenzo/AssignableVMs.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ private static class HostDisablePair {
7777
private final Map<String, Map<VMResource, Double>> maxResourcesMap;
7878
private final Map<VMResource, Double> totalResourcesMap;
7979
private final VMRejectLimiter vmRejectLimiter;
80-
private final AssignableVirtualMachine dummyVM = new AssignableVirtualMachine(null, null, "", null, 0L, null) {
80+
private final AssignableVirtualMachine dummyVM = new AssignableVirtualMachine(null, null, null, "", null, 0L, null) {
8181
@Override
8282
void assignResult(TaskAssignmentResult result) {
8383
throw new UnsupportedOperationException();
@@ -88,11 +88,12 @@ void assignResult(TaskAssignmentResult result) {
8888
private final BlockingQueue<String> unknownLeaseIdsToExpire = new LinkedBlockingQueue<>();
8989

9090
AssignableVMs(TaskTracker taskTracker, Action1<VirtualMachineLease> leaseRejectAction,
91+
PreferentialNamedConsumableResourceEvaluator preferentialNamedConsumableResourceEvaluator,
9192
long leaseOfferExpirySecs, int maxOffersToReject,
9293
String attrNameToGroupMaxResources, boolean singleLeaseMode, String autoScaleByAttributeName) {
9394
this.taskTracker = taskTracker;
9495
vmCollection = new VMCollection(
95-
hostname -> new AssignableVirtualMachine(vmIdToHostnameMap, leaseIdToHostnameMap, hostname,
96+
hostname -> new AssignableVirtualMachine(preferentialNamedConsumableResourceEvaluator, vmIdToHostnameMap, leaseIdToHostnameMap, hostname,
9697
leaseRejectAction, leaseOfferExpirySecs, taskTracker, singleLeaseMode),
9798
autoScaleByAttributeName
9899
);

fenzo-core/src/main/java/com/netflix/fenzo/AssignableVirtualMachine.java

+10-6
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ public ResAsgmntResult(List<AssignmentFailure> failures, double fitness) {
9999
}
100100
}
101101

102+
private final PreferentialNamedConsumableResourceEvaluator preferentialNamedConsumableResourceEvaluator;
102103
private final Map<String, VirtualMachineLease> leasesMap;
103104
private final BlockingQueue<String> workersToUnAssign;
104105
private final BlockingQueue<String> leasesToExpire;
@@ -140,17 +141,20 @@ public ResAsgmntResult(List<AssignmentFailure> failures, double fitness) {
140141
private boolean firstLeaseAdded=false;
141142
private final List<TaskRequest> consumedResourcesToAssign = new ArrayList<>();
142143

143-
public AssignableVirtualMachine(ConcurrentMap<String, String> vmIdToHostnameMap,
144+
public AssignableVirtualMachine(PreferentialNamedConsumableResourceEvaluator preferentialNamedConsumableResourceEvaluator,
145+
ConcurrentMap<String, String> vmIdToHostnameMap,
144146
ConcurrentMap<String, String> leaseIdToHostnameMap,
145147
String hostname, Action1<VirtualMachineLease> leaseRejectAction,
146148
long leaseOfferExpirySecs, TaskTracker taskTracker) {
147-
this(vmIdToHostnameMap, leaseIdToHostnameMap, hostname, leaseRejectAction, leaseOfferExpirySecs, taskTracker, false);
149+
this(preferentialNamedConsumableResourceEvaluator, vmIdToHostnameMap, leaseIdToHostnameMap, hostname, leaseRejectAction, leaseOfferExpirySecs, taskTracker, false);
148150
}
149151

150-
public AssignableVirtualMachine(ConcurrentMap<String, String> vmIdToHostnameMap,
152+
public AssignableVirtualMachine(PreferentialNamedConsumableResourceEvaluator preferentialNamedConsumableResourceEvaluator,
153+
ConcurrentMap<String, String> vmIdToHostnameMap,
151154
ConcurrentMap<String, String> leaseIdToHostnameMap,
152155
String hostname, Action1<VirtualMachineLease> leaseRejectAction,
153156
long leaseOfferExpirySecs, TaskTracker taskTracker, boolean singleLeaseMode) {
157+
this.preferentialNamedConsumableResourceEvaluator = preferentialNamedConsumableResourceEvaluator;
154158
this.vmIdToHostnameMap = vmIdToHostnameMap;
155159
this.leaseIdToHostnameMap = leaseIdToHostnameMap;
156160
this.hostname = hostname;
@@ -221,7 +225,7 @@ private void addToAvailableResources(VirtualMachineLease l) {
221225
int val0 = Integer.parseInt(val0Str);
222226
int val1 = Integer.parseInt(val1Str);
223227
final PreferentialNamedConsumableResourceSet crs =
224-
new PreferentialNamedConsumableResourceSet(name, val0, val1);
228+
new PreferentialNamedConsumableResourceSet(hostname, name, val0, val1);
225229
final Iterator<TaskRequest> iterator = consumedResourcesToAssign.iterator();
226230
while(iterator.hasNext()) {
227231
TaskRequest request = iterator.next();
@@ -786,7 +790,7 @@ private ResAsgmntResult evalAndGetResourceAssignmentFailures(TaskRequest request
786790
for (Map.Entry<String, PreferentialNamedConsumableResourceSet> entry : resourceSets.entrySet()) {
787791
if (!requestedNamedResNames.isEmpty())
788792
requestedNamedResNames.remove(entry.getKey());
789-
final double fitness = entry.getValue().getFitness(request);
793+
final double fitness = entry.getValue().getFitness(request, preferentialNamedConsumableResourceEvaluator);
790794
if (fitness == 0.0) {
791795
AssignmentFailure failure = new AssignmentFailure(VMResource.ResourceSet, 0.0, 0.0, 0.0,
792796
"ResourceSet " + entry.getValue().getName() + " unavailable"
@@ -951,7 +955,7 @@ void assignResult(TaskAssignmentResult result) {
951955
result.addPort(currPortRanges.consumeNextPort());
952956
}
953957
for(Map.Entry<String, PreferentialNamedConsumableResourceSet> entry: resourceSets.entrySet()) {
954-
result.addResourceSet(entry.getValue().consume(result.getRequest()));
958+
result.addResourceSet(entry.getValue().consume(result.getRequest(), preferentialNamedConsumableResourceEvaluator));
955959
}
956960
if(!taskTracker.addAssignedTask(result.getRequest(), this))
957961
logger.error("Unexpected to re-add task to assigned state, id=" + result.getRequest().getId());
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package com.netflix.fenzo;
2+
3+
import org.slf4j.Logger;
4+
import org.slf4j.LoggerFactory;
5+
6+
import java.util.ArrayList;
7+
import java.util.Collection;
8+
import java.util.List;
9+
import java.util.function.Consumer;
10+
11+
class CompositeSchedulingEventListener implements SchedulingEventListener {
12+
13+
private static final Logger logger = LoggerFactory.getLogger(CompositeSchedulingEventListener.class);
14+
15+
private final List<SchedulingEventListener> listeners;
16+
17+
private CompositeSchedulingEventListener(Collection<SchedulingEventListener> listeners) {
18+
this.listeners = new ArrayList<>(listeners);
19+
}
20+
21+
@Override
22+
public void onScheduleStart() {
23+
safely(SchedulingEventListener::onScheduleStart);
24+
}
25+
26+
@Override
27+
public void onAssignment(TaskAssignmentResult taskAssignmentResult) {
28+
safely(listener -> listener.onAssignment(taskAssignmentResult));
29+
}
30+
31+
@Override
32+
public void onScheduleFinish() {
33+
safely(SchedulingEventListener::onScheduleFinish);
34+
}
35+
36+
private void safely(Consumer<SchedulingEventListener> action) {
37+
listeners.forEach(listener -> {
38+
try {
39+
action.accept(listener);
40+
} catch (Exception e) {
41+
logger.warn("Scheduling event dispatching error: {} -> {}", listener.getClass().getSimpleName(), e.getMessage());
42+
if (logger.isDebugEnabled()) {
43+
logger.debug("Details", e);
44+
}
45+
}
46+
});
47+
}
48+
49+
static SchedulingEventListener of(Collection<SchedulingEventListener> listeners) {
50+
if (listeners.isEmpty()) {
51+
return NoOpSchedulingEventListener.INSTANCE;
52+
}
53+
return new CompositeSchedulingEventListener(listeners);
54+
}
55+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package com.netflix.fenzo;
2+
3+
/**
4+
* Default {@link PreferentialNamedConsumableResourceEvaluator} implementation.
5+
*/
6+
public class DefaultPreferentialNamedConsumableResourceEvaluator implements PreferentialNamedConsumableResourceEvaluator {
7+
8+
public static final PreferentialNamedConsumableResourceEvaluator INSTANCE = new DefaultPreferentialNamedConsumableResourceEvaluator();
9+
10+
@Override
11+
public double evaluateIdle(String hostname, String resourceName, int index, double subResourcesNeeded, double subResourcesLimit) {
12+
// unassigned: 0.0 indicates no fitness, so return 0.5, which is less than the case of assigned with 0 sub-resources
13+
return 0.5 / (subResourcesLimit + 1);
14+
}
15+
16+
@Override
17+
public double evaluate(String hostname, String resourceName, int index, double subResourcesNeeded, double subResourcesUsed, double subResourcesLimit) {
18+
return Math.min(1.0, (subResourcesUsed + subResourcesNeeded + 1.0) / (subResourcesLimit + 1));
19+
}
20+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package com.netflix.fenzo;
2+
3+
class NoOpSchedulingEventListener implements SchedulingEventListener {
4+
5+
static final SchedulingEventListener INSTANCE = new NoOpSchedulingEventListener();
6+
7+
@Override
8+
public void onScheduleStart() {
9+
}
10+
11+
@Override
12+
public void onAssignment(TaskAssignmentResult taskAssignmentResult) {
13+
}
14+
15+
@Override
16+
public void onScheduleFinish() {
17+
}
18+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package com.netflix.fenzo;
2+
3+
import com.netflix.fenzo.PreferentialNamedConsumableResourceSet.PreferentialNamedConsumableResource;
4+
5+
/**
6+
* Evaluator for {@link PreferentialNamedConsumableResource} selection process. Given an agent with matching
7+
* ENI slot (either empty or with a matching name), this evaluator computes the fitness score.
8+
* A custom implementation can provide fitness calculators augmented with additional information not available to
9+
* Fenzo for making best placement decision.
10+
*
11+
* <h1>Example</h1>
12+
* {@link PreferentialNamedConsumableResource} can be used to model AWS ENI interfaces together with IP and security
13+
* group assignments. To minimize number of AWS API calls and to improve efficiency, it is beneficial to place a task
14+
* on an agent which has ENI profile with matching security group profile so the ENI can be reused. Or if a task
15+
* is terminated, but agent releases its resources lazily, they can be reused by another task with a matching profile.
16+
*/
17+
public interface PreferentialNamedConsumableResourceEvaluator {
18+
19+
/**
20+
* Provide fitness score for an idle consumable resource.
21+
*
22+
* @param hostname hostname of an agent
23+
* @param resourceName name to be associated with a resource with the given index
24+
* @param index a consumable resource index
25+
* @param subResourcesNeeded an amount of sub-resources required by a scheduled task
26+
* @param subResourcesLimit a total amount of sub-resources available
27+
* @return fitness score
28+
*/
29+
double evaluateIdle(String hostname, String resourceName, int index, double subResourcesNeeded, double subResourcesLimit);
30+
31+
/**
32+
* Provide fitness score for a consumable resource that is already associated with some tasks. These tasks and
33+
* the current one having profiles so can share the resource.
34+
*
35+
* @param hostname hostname of an agent
36+
* @param resourceName name associated with a resource with the given index
37+
* @param index a consumable resource index
38+
* @param subResourcesNeeded an amount of sub-resources required by a scheduled task
39+
* @param subResourcesUsed an amount of sub-resources already used by other tasks
40+
* @param subResourcesLimit a total amount of sub-resources available
41+
* @return fitness score
42+
*/
43+
double evaluate(String hostname, String resourceName, int index, double subResourcesNeeded, double subResourcesUsed, double subResourcesLimit);
44+
}

fenzo-core/src/main/java/com/netflix/fenzo/PreferentialNamedConsumableResourceSet.java

+44-22
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919
import com.fasterxml.jackson.annotation.JsonCreator;
2020
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
2121
import com.fasterxml.jackson.annotation.JsonProperty;
22-
import org.slf4j.Logger;
23-
import org.slf4j.LoggerFactory;
2422

2523
import java.util.ArrayList;
2624
import java.util.HashMap;
@@ -92,14 +90,16 @@ public double getFitness() {
9290

9391
public static class PreferentialNamedConsumableResource {
9492
private final double maxFitness;
93+
private final String hostname;
9594
private final int index;
9695
private final String attrName;
9796
private String resName=null;
9897
private final int limit;
9998
private final Map<String, TaskRequest.NamedResourceSetRequest> usageBy;
10099
private int usedSubResources=0;
101100

102-
PreferentialNamedConsumableResource(int i, String attrName, int limit) {
101+
PreferentialNamedConsumableResource(String hostname, int i, String attrName, int limit) {
102+
this.hostname = hostname;
103103
this.index = i;
104104
this.attrName = attrName;
105105
this.limit = limit;
@@ -131,19 +131,41 @@ public Map<String, TaskRequest.NamedResourceSetRequest> getUsageBy() {
131131
return usedSubResources;
132132
}
133133

134-
double getFitness(TaskRequest request) {
135-
String r = getResNameVal(attrName, request);
136-
if(resName == null)
137-
return 0.5 / maxFitness; // unassigned: 0.0 indicates no fitness, so return 0.5, which is less than
138-
// the case of assigned with 0 sub-resources
139-
if(!resName.equals(r))
134+
double getFitness(TaskRequest request, PreferentialNamedConsumableResourceEvaluator evaluator) {
135+
TaskRequest.NamedResourceSetRequest setRequest = request.getCustomNamedResources()==null
136+
? null
137+
: request.getCustomNamedResources().get(attrName);
138+
139+
// This particular resource type is not requested. We assign to it virtual resource name 'CustomResAbsentKey',
140+
// and request 0 sub-resources.
141+
if(setRequest == null) {
142+
if(resName == null) {
143+
return evaluator.evaluateIdle(hostname, CustomResAbsentKey, index, 0, limit);
144+
}
145+
if(resName.equals(CustomResAbsentKey)) {
146+
return evaluator.evaluate(hostname, CustomResAbsentKey, index, 0, usedSubResources, limit);
147+
}
140148
return 0.0;
141-
final TaskRequest.NamedResourceSetRequest setRequest = request.getCustomNamedResources()==null?
142-
null : request.getCustomNamedResources().get(attrName);
143-
double subResNeed = setRequest==null? 0.0 : setRequest.getNumSubResources();
144-
if(usedSubResources + subResNeed > limit)
149+
}
150+
151+
double subResNeed = setRequest.getNumSubResources();
152+
153+
// Resource not assigned yet to any task
154+
if(resName == null) {
155+
if(subResNeed > limit) {
156+
return 0.0;
157+
}
158+
return evaluator.evaluateIdle(hostname, setRequest.getResValue(), index, subResNeed, limit);
159+
}
160+
161+
// Resource assigned different name than requested
162+
if(!resName.equals(setRequest.getResValue())) {
163+
return 0.0;
164+
}
165+
if(usedSubResources + subResNeed > limit) {
145166
return 0.0;
146-
return Math.min(1.0, usedSubResources + subResNeed + 1.0 / maxFitness);
167+
}
168+
return evaluator.evaluate(hostname, setRequest.getResValue(), index, subResNeed, usedSubResources, limit);
147169
}
148170

149171
void consume(TaskRequest request) {
@@ -190,11 +212,11 @@ boolean release(TaskRequest request) {
190212
private final String name;
191213
private final List<PreferentialNamedConsumableResource> usageBy;
192214

193-
public PreferentialNamedConsumableResourceSet(String name, int val0, int val1) {
215+
public PreferentialNamedConsumableResourceSet(String hostname, String name, int val0, int val1) {
194216
this.name = name;
195217
usageBy = new ArrayList<>(val0);
196218
for(int i=0; i<val0; i++)
197-
usageBy.add(new PreferentialNamedConsumableResource(i, name, val1));
219+
usageBy.add(new PreferentialNamedConsumableResource(hostname, i, name, val1));
198220
}
199221

200222
public String getName() {
@@ -209,8 +231,8 @@ public String getName() {
209231
// return false;
210232
// }
211233

212-
ConsumeResult consume(TaskRequest request) {
213-
return consumeIntl(request, false);
234+
ConsumeResult consume(TaskRequest request, PreferentialNamedConsumableResourceEvaluator evaluator) {
235+
return consumeIntl(request, false, evaluator);
214236
}
215237

216238
void assign(TaskRequest request) {
@@ -233,15 +255,15 @@ void assign(TaskRequest request) {
233255
}
234256

235257
// returns 0.0 for no fitness at all, or <=1.0 for fitness
236-
double getFitness(TaskRequest request) {
237-
return consumeIntl(request, true).fitness;
258+
double getFitness(TaskRequest request, PreferentialNamedConsumableResourceEvaluator evaluator) {
259+
return consumeIntl(request, true, evaluator).fitness;
238260
}
239261

240-
private ConsumeResult consumeIntl(TaskRequest request, boolean skipConsume) {
262+
private ConsumeResult consumeIntl(TaskRequest request, boolean skipConsume, PreferentialNamedConsumableResourceEvaluator evaluator) {
241263
PreferentialNamedConsumableResource best = null;
242264
double bestFitness=0.0;
243265
for(PreferentialNamedConsumableResource r: usageBy) {
244-
double f = r.getFitness(request);
266+
double f = r.getFitness(request, evaluator);
245267
if(f == 0.0)
246268
continue;
247269
if(bestFitness < f) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package com.netflix.fenzo;
2+
3+
/**
4+
* A callback API providing notification about Fenzo task placement decisions during the scheduling process.
5+
*/
6+
public interface SchedulingEventListener {
7+
8+
/**
9+
* Called before a new scheduling iteration is started.
10+
*/
11+
void onScheduleStart();
12+
13+
/**
14+
* Called when a new task placement decision is made (a task gets resources allocated on a server).
15+
*
16+
* @param taskAssignmentResult task assignment result
17+
*/
18+
void onAssignment(TaskAssignmentResult taskAssignmentResult);
19+
20+
/**
21+
* Called when the scheduling iteration completes.
22+
*/
23+
void onScheduleFinish();
24+
}

0 commit comments

Comments
 (0)