Skip to content

Commit 8a4c747

Browse files
authored
Add Support for User Queue Level Routing (#176)
* Add User Queue Level Routing * PR comments * Bump version to 1.9.0
1 parent 3d3159e commit 8a4c747

File tree

11 files changed

+253
-39
lines changed

11 files changed

+253
-39
lines changed

baseapp/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
<parent>
88
<groupId>com.lyft.data</groupId>
99
<artifactId>prestogateway-parent</artifactId>
10-
<version>1.8.9</version>
10+
<version>1.9.0</version>
1111
<relativePath>../</relativePath>
1212
</parent>
1313

gateway-ha/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
<parent>
99
<groupId>com.lyft.data</groupId>
1010
<artifactId>prestogateway-parent</artifactId>
11-
<version>1.8.9</version>
11+
<version>1.9.0</version>
1212
<relativePath>../</relativePath>
1313
</parent>
1414

gateway-ha/src/main/java/com/lyft/data/gateway/ha/clustermonitor/ActiveClusterMonitor.java

Lines changed: 62 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
package com.lyft.data.gateway.ha.clustermonitor;
22

3+
import static com.lyft.data.gateway.ha.handler.QueryIdCachingProxyHandler.UI_API_QUEUED_LIST_PATH;
34
import static com.lyft.data.gateway.ha.handler.QueryIdCachingProxyHandler.UI_API_STATS_PATH;
45

6+
import com.fasterxml.jackson.core.type.TypeReference;
57
import com.fasterxml.jackson.databind.ObjectMapper;
8+
import com.google.common.base.Strings;
69
import com.google.inject.Inject;
710
import com.lyft.data.gateway.ha.config.MonitorConfiguration;
811
import com.lyft.data.gateway.ha.config.ProxyBackendConfiguration;
@@ -16,6 +19,7 @@
1619
import java.util.ArrayList;
1720
import java.util.HashMap;
1821
import java.util.List;
22+
import java.util.Map;
1923
import java.util.concurrent.ExecutorService;
2024
import java.util.concurrent.Executors;
2125
import java.util.concurrent.Future;
@@ -31,6 +35,8 @@ public class ActiveClusterMonitor implements Managed {
3135
public static final int MONITOR_TASK_DELAY_MIN = 1;
3236
public static final int DEFAULT_THREAD_POOL_SIZE = 10;
3337

38+
private static final String SESSION_USER = "sessionUser";
39+
3440
private final List<PrestoClusterStatsObserver> clusterStatsObservers;
3541
private final GatewayBackendManager gatewayBackendManager;
3642
private final int connectionTimeout;
@@ -94,10 +100,7 @@ public void start() {
94100
});
95101
}
96102

97-
private ClusterStats getPrestoClusterStats(ProxyBackendConfiguration backend) {
98-
ClusterStats clusterStats = new ClusterStats();
99-
clusterStats.setClusterId(backend.getName());
100-
String target = backend.getProxyTo() + UI_API_STATS_PATH;
103+
private String queryCluster(String target) {
101104
HttpURLConnection conn = null;
102105
try {
103106
URL url = new URL(target);
@@ -108,22 +111,15 @@ private ClusterStats getPrestoClusterStats(ProxyBackendConfiguration backend) {
108111
conn.connect();
109112
int responseCode = conn.getResponseCode();
110113
if (responseCode == HttpStatus.SC_OK) {
111-
clusterStats.setHealthy(true);
112114
BufferedReader reader =
113-
new BufferedReader(new InputStreamReader((InputStream) conn.getContent()));
115+
new BufferedReader(new InputStreamReader((InputStream) conn.getContent()));
114116
StringBuilder sb = new StringBuilder();
115117
String line;
116118
while ((line = reader.readLine()) != null) {
117119
sb.append(line + "\n");
118120
}
119-
HashMap<String, Object> result = OBJECT_MAPPER.readValue(sb.toString(), HashMap.class);
120-
clusterStats.setNumWorkerNodes((int) result.get("activeWorkers"));
121-
clusterStats.setQueuedQueryCount((int) result.get("queuedQueries"));
122-
clusterStats.setRunningQueryCount((int) result.get("runningQueries"));
123-
clusterStats.setBlockedQueryCount((int) result.get("blockedQueries"));
124-
clusterStats.setProxyTo(backend.getProxyTo());
125-
clusterStats.setExternalUrl(backend.getExternalUrl());
126-
clusterStats.setRoutingGroup(backend.getRoutingGroup());
121+
122+
return sb.toString();
127123
} else {
128124
log.warn("Received non 200 response, response code: {}", responseCode);
129125
}
@@ -134,6 +130,58 @@ private ClusterStats getPrestoClusterStats(ProxyBackendConfiguration backend) {
134130
conn.disconnect();
135131
}
136132
}
133+
return null;
134+
}
135+
136+
private ClusterStats getPrestoClusterStats(ProxyBackendConfiguration backend) {
137+
ClusterStats clusterStats = new ClusterStats();
138+
clusterStats.setClusterId(backend.getName());
139+
140+
// Fetch Cluster level Stats.
141+
String target = backend.getProxyTo() + UI_API_STATS_PATH;
142+
String response = queryCluster(target);
143+
if (Strings.isNullOrEmpty(response)) {
144+
log.error("Received null/empty response for {}", target);
145+
return clusterStats;
146+
}
147+
clusterStats.setHealthy(true);
148+
try {
149+
HashMap<String, Object> result = null;
150+
result = OBJECT_MAPPER.readValue(response, HashMap.class);
151+
152+
clusterStats.setNumWorkerNodes((int) result.get("activeWorkers"));
153+
clusterStats.setQueuedQueryCount((int) result.get("queuedQueries"));
154+
clusterStats.setRunningQueryCount((int) result.get("runningQueries"));
155+
clusterStats.setBlockedQueryCount((int) result.get("blockedQueries"));
156+
clusterStats.setProxyTo(backend.getProxyTo());
157+
clusterStats.setExternalUrl(backend.getExternalUrl());
158+
clusterStats.setRoutingGroup(backend.getRoutingGroup());
159+
160+
} catch (Exception e) {
161+
log.error("Error parsing cluster stats from [{}]", response, e);
162+
}
163+
164+
// Fetch User Level Stats.
165+
Map<String, Integer> clusterUserStats = new HashMap<>();
166+
target = backend.getProxyTo() + UI_API_QUEUED_LIST_PATH;
167+
response = queryCluster(target);
168+
if (Strings.isNullOrEmpty(response)) {
169+
log.error("Received null/empty response for {}", target);
170+
return clusterStats;
171+
}
172+
try {
173+
List<Map<String, Object>> queries = OBJECT_MAPPER.readValue(response,
174+
new TypeReference<List<Map<String, Object>>>(){});
175+
176+
for (Map<String, Object> q : queries) {
177+
String user = (String) q.get(SESSION_USER);
178+
clusterUserStats.put(user, clusterUserStats.getOrDefault(user, 0) + 1);
179+
}
180+
} catch (Exception e) {
181+
log.error("Error parsing cluster user stats: {}", e);
182+
}
183+
clusterStats.setUserQueuedCount(clusterUserStats);
184+
137185
return clusterStats;
138186
}
139187

gateway-ha/src/main/java/com/lyft/data/gateway/ha/clustermonitor/ClusterStats.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package com.lyft.data.gateway.ha.clustermonitor;
22

3+
import java.util.Map;
4+
35
import lombok.Data;
46
import lombok.ToString;
57

@@ -15,4 +17,5 @@ public class ClusterStats {
1517
private String proxyTo;
1618
private String externalUrl;
1719
private String routingGroup;
20+
private Map<String, Integer> userQueuedCount;
1821
}

gateway-ha/src/main/java/com/lyft/data/gateway/ha/clustermonitor/PrestoQueueLengthChecker.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ public void observe(List<ClusterStats> stats) {
2222
Map<String, Map<String, Integer>> clusterQueueMap = new HashMap<String, Map<String, Integer>>();
2323
Map<String, Map<String, Integer>> clusterRunningMap
2424
= new HashMap<String, Map<String, Integer>>();
25+
Map<String, Map<String, Integer>> userClusterQueuedCount
26+
= new HashMap<>();
2527

2628
for (ClusterStats stat : stats) {
2729
if (!clusterQueueMap.containsKey(stat.getRoutingGroup())) {
@@ -43,8 +45,16 @@ public void observe(List<ClusterStats> stats) {
4345
clusterRunningMap.get(stat.getRoutingGroup()).put(stat.getClusterId(),
4446
stat.getRunningQueryCount());
4547
}
48+
49+
// Create inverse map from user -> {cluster-> count}
50+
for (Map.Entry<String, Integer> queueCount : stat.getUserQueuedCount().entrySet()) {
51+
Map<String, Integer> clusterQueue = userClusterQueuedCount.getOrDefault(queueCount.getKey(),
52+
new HashMap<>());
53+
clusterQueue.put(stat.getClusterId(), queueCount.getValue());
54+
userClusterQueuedCount.put(queueCount.getKey(), clusterQueue);
55+
}
4656
}
4757

48-
routingManager.updateRoutingTable(clusterQueueMap, clusterRunningMap);
58+
routingManager.updateRoutingTable(clusterQueueMap, clusterRunningMap, userClusterQueuedCount);
4959
}
5060
}

gateway-ha/src/main/java/com/lyft/data/gateway/ha/handler/QueryIdCachingProxyHandler.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ public class QueryIdCachingProxyHandler extends ProxyHandler {
3535
public static final String V1_QUERY_PATH = "/v1/query";
3636
public static final String V1_INFO_PATH = "/v1/info";
3737
public static final String UI_API_STATS_PATH = "/ui/api/stats";
38+
public static final String UI_API_QUEUED_LIST_PATH = "/ui/api/query?state=QUEUED";
3839
public static final String PRESTO_UI_PATH = "/ui";
3940
public static final String USER_HEADER = "X-Trino-User";
4041
public static final String ALTERNATE_USER_HEADER = "X-Presto-User";
@@ -121,11 +122,13 @@ public String rewriteTarget(HttpServletRequest request) {
121122
backendAddress = routingManager.findBackendForQueryId(queryId);
122123
} else {
123124
String routingGroup = routingGroupSelector.findRoutingGroup(request);
125+
String user = Optional.ofNullable(request.getHeader(USER_HEADER))
126+
.orElse(request.getHeader(ALTERNATE_USER_HEADER));
124127
if (!Strings.isNullOrEmpty(routingGroup)) {
125128
// This falls back on adhoc backend if there are no cluster found for the routing group.
126-
backendAddress = routingManager.provideBackendForRoutingGroup(routingGroup);
129+
backendAddress = routingManager.provideBackendForRoutingGroup(routingGroup, user);
127130
} else {
128-
backendAddress = routingManager.provideAdhocBackend();
131+
backendAddress = routingManager.provideAdhocBackend(user);
129132
}
130133
}
131134
// set target backend so that we could save queryId to backend mapping later.

gateway-ha/src/main/java/com/lyft/data/gateway/ha/router/PrestoQueueLengthRoutingTable.java

Lines changed: 57 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.lyft.data.gateway.ha.router;
22

3+
import com.google.common.base.Strings;
34
import com.lyft.data.gateway.ha.config.ProxyBackendConfiguration;
45
import java.util.Collection;
56
import java.util.Collections;
@@ -14,6 +15,7 @@
1415
import java.util.TreeMap;
1516
import java.util.concurrent.ConcurrentHashMap;
1617
import java.util.stream.Collectors;
18+
1719
import lombok.extern.slf4j.Slf4j;
1820

1921
/**
@@ -35,6 +37,8 @@ public class PrestoQueueLengthRoutingTable extends HaRoutingManager {
3537
private ConcurrentHashMap<String, Integer> routingGroupWeightSum;
3638
private ConcurrentHashMap<String, ConcurrentHashMap<String, Integer>> clusterQueueLengthMap;
3739

40+
private ConcurrentHashMap<String, ConcurrentHashMap<String, Integer>> userClusterQueueLengthMap;
41+
3842
private Map<String, TreeMap<Integer, String>> weightedDistributionRouting;
3943

4044
/**
@@ -47,6 +51,7 @@ public PrestoQueueLengthRoutingTable(GatewayBackendManager gatewayBackendManager
4751
routingGroupWeightSum = new ConcurrentHashMap<String, Integer>();
4852
clusterQueueLengthMap = new ConcurrentHashMap<String, ConcurrentHashMap<String, Integer>>();
4953
weightedDistributionRouting = new HashMap<String, TreeMap<Integer, String>>();
54+
userClusterQueueLengthMap = new ConcurrentHashMap<>();
5055
}
5156

5257
/**
@@ -182,7 +187,7 @@ private void computeWeightsBasedOnQueueLength(ConcurrentHashMap<String,
182187
/**
183188
* Update the Routing Table only if a previously known backend has been deactivated.
184189
* Newly added backends are handled through
185-
* {@link PrestoQueueLengthRoutingTable#updateRoutingTable(Map, Map)}
190+
* {@link PrestoQueueLengthRoutingTable#updateRoutingTable(Map, Map, Map)}
186191
* updateRoutingTable}
187192
*/
188193
public void updateRoutingTable(String routingGroup, Set<String> backends) {
@@ -212,11 +217,21 @@ public void updateRoutingTable(String routingGroup, Set<String> backends) {
212217
* Update routing Table with new Queue Lengths.
213218
*/
214219
public void updateRoutingTable(Map<String, Map<String, Integer>> updatedQueueLengthMap,
215-
Map<String, Map<String, Integer>> updatedRunningLengthMap) {
220+
Map<String, Map<String, Integer>> updatedRunningLengthMap,
221+
Map<String, Map<String, Integer>> updatedUserQueueLengthMap) {
216222
synchronized (lockObject) {
217223
log.debug("Update Routing table with new cluster queue lengths : [{}]",
218224
updatedQueueLengthMap.toString());
219225
clusterQueueLengthMap.clear();
226+
userClusterQueueLengthMap.clear();
227+
228+
if (updatedUserQueueLengthMap != null) {
229+
for (String user : updatedUserQueueLengthMap.keySet()) {
230+
ConcurrentHashMap<String, Integer> clusterQueueMap =
231+
new ConcurrentHashMap<>(updatedUserQueueLengthMap.get(user));
232+
userClusterQueueLengthMap.put(user, clusterQueueMap);
233+
}
234+
}
220235

221236
for (String grp : updatedQueueLengthMap.keySet()) {
222237
if (grp == null) {
@@ -227,7 +242,8 @@ public void updateRoutingTable(Map<String, Map<String, Integer>> updatedQueueLen
227242
int maxQueueLen = Collections.max(updatedQueueLengthMap.get(grp).values());
228243
int minQueueLen = Collections.min(updatedQueueLengthMap.get(grp).values());
229244

230-
if (minQueueLen == maxQueueLen && updatedQueueLengthMap.get(grp).size() > 1) {
245+
if (minQueueLen == maxQueueLen && updatedQueueLengthMap.get(grp).size() > 1
246+
&& updatedRunningLengthMap.containsKey(grp)) {
231247
log.info("Queue lengths equal: {} for all clusters in the group {}."
232248
+ " Falling back to Running Counts : {}", maxQueueLen, grp,
233249
updatedRunningLengthMap.get(grp));
@@ -268,9 +284,40 @@ public Map<String, Integer> getInternalClusterQueueLength(String routingGroup) {
268284
}
269285

270286
/**
271-
* Looks up the closest weight to random number generated for a given routing group.
287+
* Find the cluster with least user queue else fall back to overall cluster weight based routing.
272288
*/
273-
public String getEligibleBackEnd(String routingGroup) {
289+
public String getEligibleBackEnd(String routingGroup, String user) {
290+
291+
// Route to the least queued backend for the user out of all backends for that group
292+
if (!Strings.isNullOrEmpty(user)) {
293+
Map<String, Integer> clusterQueueCountForUser = userClusterQueueLengthMap.get(user);
294+
295+
if (clusterQueueCountForUser != null && !clusterQueueCountForUser.isEmpty()) {
296+
Set<String> backends = clusterQueueLengthMap.get(routingGroup).keySet();
297+
String leastQueuedCluster = null;
298+
Integer minQueueCount = Integer.MAX_VALUE;
299+
Integer maxQueueCount = Integer.MIN_VALUE;
300+
for (String b : backends) {
301+
// If missing, we assume no queued queries for the user on that cluster.
302+
Integer queueCount = clusterQueueCountForUser.getOrDefault(b, 0);
303+
304+
if (queueCount < minQueueCount) {
305+
leastQueuedCluster = b;
306+
minQueueCount = queueCount;
307+
}
308+
if (queueCount > maxQueueCount) {
309+
maxQueueCount = queueCount;
310+
}
311+
}
312+
// If all clusters have the same queue count, then fallback to the older weighted logic.
313+
if (!Strings.isNullOrEmpty(leastQueuedCluster) && minQueueCount != maxQueueCount) {
314+
log.debug("{} routing to:{}. userQueueCount:{}", user, leastQueuedCluster, minQueueCount);
315+
316+
return leastQueuedCluster;
317+
}
318+
}
319+
}
320+
// Looks up the closest weight to random number generated for a given routing group.
274321
if (routingGroupWeightSum.containsKey(routingGroup)
275322
&& weightedDistributionRouting.containsKey(routingGroup)) {
276323
int rnd = RANDOM.nextInt(routingGroupWeightSum.get(routingGroup));
@@ -285,20 +332,20 @@ public String getEligibleBackEnd(String routingGroup) {
285332
* backend is found.
286333
*/
287334
@Override
288-
public String provideBackendForRoutingGroup(String routingGroup) {
335+
public String provideBackendForRoutingGroup(String routingGroup, String user) {
289336
List<ProxyBackendConfiguration> backends =
290337
getGatewayBackendManager().getActiveBackends(routingGroup);
291338

292339
if (backends.isEmpty()) {
293-
return provideAdhocBackend();
340+
return provideAdhocBackend(user);
294341
}
295342
Map<String, String> proxyMap = new HashMap<>();
296343
for (ProxyBackendConfiguration backend : backends) {
297344
proxyMap.put(backend.getName(), backend.getProxyTo());
298345
}
299346

300347
updateRoutingTable(routingGroup, proxyMap.keySet());
301-
String clusterId = getEligibleBackEnd(routingGroup);
348+
String clusterId = getEligibleBackEnd(routingGroup, user);
302349
log.debug("Routing to eligible backend : [{}] for routing group: [{}]",
303350
clusterId, routingGroup);
304351

@@ -318,7 +365,7 @@ public String provideBackendForRoutingGroup(String routingGroup) {
318365
* <p>d.
319366
*/
320367
@Override
321-
public String provideAdhocBackend() {
368+
public String provideAdhocBackend(String user) {
322369
Map<String, String> proxyMap = new HashMap<>();
323370
List<ProxyBackendConfiguration> backends = getGatewayBackendManager().getActiveAdhocBackends();
324371
if (backends.size() == 0) {
@@ -331,7 +378,7 @@ public String provideAdhocBackend() {
331378

332379
updateRoutingTable("adhoc", proxyMap.keySet());
333380

334-
String clusterId = getEligibleBackEnd("adhoc");
381+
String clusterId = getEligibleBackEnd("adhoc", user);
335382
log.debug("Routing to eligible backend : " + clusterId + " for routing group: adhoc");
336383
if (clusterId != null) {
337384
return proxyMap.get(clusterId);

gateway-ha/src/main/java/com/lyft/data/gateway/ha/router/RoutingManager.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public void setBackendForQueryId(String queryId, String backend) {
6161
*
6262
* @return
6363
*/
64-
public String provideAdhocBackend() {
64+
public String provideAdhocBackend(String user) {
6565
List<ProxyBackendConfiguration> backends = this.gatewayBackendManager.getActiveAdhocBackends();
6666
if (backends.size() == 0) {
6767
throw new IllegalStateException("Number of active backends found zero");
@@ -76,11 +76,11 @@ public String provideAdhocBackend() {
7676
*
7777
* @return
7878
*/
79-
public String provideBackendForRoutingGroup(String routingGroup) {
79+
public String provideBackendForRoutingGroup(String routingGroup, String user) {
8080
List<ProxyBackendConfiguration> backends =
8181
gatewayBackendManager.getActiveBackends(routingGroup);
8282
if (backends.isEmpty()) {
83-
return provideAdhocBackend();
83+
return provideAdhocBackend(user);
8484
}
8585
int backendId = Math.abs(RANDOM.nextInt()) % backends.size();
8686
return backends.get(backendId).getProxyTo();

0 commit comments

Comments
 (0)