Skip to content

Commit 2a4ec01

Browse files
IMAM9AISkevin-bates
authored andcommitted
Changes in yarn api client for Enterprise gateway yarn checks. (#39)
* Changes in yarn api client to acess maximum container allocation and partition availability
1 parent 6b2a287 commit 2a4ec01

File tree

2 files changed

+77
-2
lines changed

2 files changed

+77
-2
lines changed

yarn_api_client/hadoop_conf.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ def _get_rm_ids(hadoop_conf_path):
1515
rm_ids = rm_ids.split(',')
1616
return rm_ids
1717

18+
def _get_maximum_container_memory(hadoop_conf_path):
19+
container_memory = int(parse(os.path.join(hadoop_conf_path,'yarn-site.xml'), 'yarn.nodemanager.resource.memory-mb'))
20+
return container_memory
1821

1922
def _get_resource_manager(hadoop_conf_path, rm_id=None):
2023
prop_name = 'yarn.resourcemanager.webapp.address'

yarn_api_client/resource_manager.py

Lines changed: 74 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@
33
from .base import BaseYarnAPI
44
from .constants import YarnApplicationState, FinalApplicationStatus
55
from .errors import IllegalArgumentError
6-
from .hadoop_conf import get_resource_manager_host_port, check_is_active_rm, CONF_DIR
7-
6+
from .hadoop_conf import get_resource_manager_host_port,\
7+
check_is_active_rm, _get_maximum_container_memory, CONF_DIR
8+
from collections import deque
89

910
class ResourceManager(BaseYarnAPI):
1011
"""
@@ -421,3 +422,74 @@ def cluster_change_application_priority(self, application_id, priority):
421422
path = '/ws/v1/cluster/apps/{appid}/priority'.format(appid=application_id)
422423

423424
return self.request(path, 'PUT', data={"priority": priority})
425+
426+
def cluster_node_container_memory(self):
427+
"""
428+
This endpoint allows clients to gather info on the maximum memory that
429+
can be allocated per container in the cluster.
430+
:returns: integer specifying the maximum memory that can be allocated in
431+
a container in the cluster
432+
"""
433+
434+
maximum_container_memory = _get_maximum_container_memory(CONF_DIR)
435+
return maximum_container_memory
436+
437+
def cluster_scheduler_queue(self, yarn_queue_name):
438+
"""
439+
Given a queue name, this function tries to locate the given queue in the object
440+
returned by scheduler endpoint.
441+
442+
The queue can be present inside a multilevel structure. This solution tries
443+
to locate the queue using breadth-first-search algorithm.
444+
445+
:param str yarn_queue_name: case sensitive queue name
446+
:return: queue Dictionary, None if not found
447+
"""
448+
scheduler = self.cluster_scheduler().data
449+
scheduler_info = scheduler['scheduler']['schedulerInfo']
450+
451+
bfs_deque = deque([scheduler_info])
452+
while bfs_deque:
453+
vertex = bfs_deque.popleft()
454+
if vertex['queueName'] == yarn_queue_name:
455+
return vertex
456+
elif 'queues' in vertex:
457+
for q in vertex['queues']['queue']:
458+
bfs_deque.append(q)
459+
460+
return None
461+
462+
463+
def cluster_scheduler_queue_availability(self, candidate_partition, availability_threshold):
464+
"""
465+
Checks whether the requested memory satisfies the available space of the queue
466+
This solution takes into consideration the node label concept in cluster.
467+
Following node labelling, the resources can be available in various partition.
468+
Given the partition data it tells you if the used capacity of this partition is spilling
469+
the threshold specified.
470+
471+
:param str candidate_parition: node label partition (case sensitive)
472+
:param float availability_threshold: value can range between 0 - 100 .
473+
:return: Boolean
474+
"""
475+
476+
if candidate_partition['absoluteUsedCapacity'] > availability_threshold:
477+
return False
478+
return True
479+
480+
481+
def cluster_queue_partition(self, candidate_queue, cluster_node_label):
482+
"""
483+
A queue can be divided into multiple partitions having different node labels.
484+
Given the candidate queue and parition node label, this extracts the partition
485+
we are interested in
486+
:param dict candidate_queue: queue dictionary
487+
:param str cluster_node_label: case sensitive node label name
488+
:return: partition Dict, None if not Found.
489+
"""
490+
for partition in candidate_queue['capacities']['queueCapacitiesByPartition']:
491+
if partition['partitionName'] == cluster_node_label:
492+
return partition
493+
return None
494+
495+

0 commit comments

Comments
 (0)