Skip to content

Commit 33eb832

Browse files
authored
Merge pull request #6 from toidi/xiaop-master
HA enabled ResourceManager compatible
2 parents 9c13c50 + 648c19e commit 33eb832

File tree

9 files changed

+168
-38
lines changed

9 files changed

+168
-38
lines changed

.travis.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ script:
77
env:
88
- TOXENV=py26
99
- TOXENV=py27
10-
- TOXENV=py32
11-
- TOXENV=py33
10+
- TOXENV=py34
11+
- TOXENV=py35
1212
after_success:
1313
coveralls

docs/conf.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,9 @@
5454
# built documents.
5555
#
5656
# The short X.Y version.
57-
version = '0.2.3'
57+
version = '0.2.4'
5858
# The full version, including alpha/beta/rc tags.
59-
release = '0.2.3'
59+
release = '0.2.4'
6060

6161
# The language for content autogenerated by Sphinx. Refer to documentation
6262
# for a list of supported languages.

setup.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,8 @@ def find_version(*file_paths):
5050
'Operating System :: OS Independent',
5151
'Programming Language :: Python :: 2.6',
5252
'Programming Language :: Python :: 2.7',
53-
'Programming Language :: Python :: 3.2',
54-
'Programming Language :: Python :: 3.3',
53+
'Programming Language :: Python :: 3.4',
54+
'Programming Language :: Python :: 3.5',
5555
'Topic :: System :: Distributed Computing',
5656
],
5757
)

tests/test_base.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,6 @@
1212

1313

1414
class BaseYarnAPITestCase(TestCase):
15-
def test_http_property_cache(self):
16-
client = self.get_client()
17-
http_conn1 = client.http_conn
18-
http_conn2 = client.http_conn
19-
20-
self.assertIs(http_conn1, http_conn2)
21-
2215
def test_request(self):
2316
client = self.get_client()
2417
with patch('yarn_api_client.base.HTTPConnection') as http_conn_mock:

tests/test_hadoop_conf.py

Lines changed: 102 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,25 @@
11
# -*- coding: utf-8 -*-
22
from tempfile import NamedTemporaryFile
33

4+
import mock
45
from mock import patch
56
from tests import TestCase
67

78
from yarn_api_client import hadoop_conf
89

10+
11+
_http_request_method = ''
12+
_http_getresponse_method = ''
13+
14+
try:
15+
from httplib import HTTPConnection, OK, NOT_FOUND
16+
_http_request_method = 'httplib.HTTPConnection.request'
17+
_http_getresponse_method = 'httplib.HTTPConnection.getresponse'
18+
except ImportError:
19+
from http.client import HTTPConnection, OK, NOT_FOUND
20+
_http_request_method = 'http.client.HTTPConnection.request'
21+
_http_getresponse_method = 'http.client.HTTPConnection.getresponse'
22+
923
empty_config = '<configuration></configuration>'.encode('latin1')
1024

1125
yarn_site_xml = """\
@@ -37,19 +51,104 @@ def test_parse(self):
3751
self.assertEqual(None, value)
3852

3953
def test_get_resource_host_port(self):
54+
with patch('yarn_api_client.hadoop_conf.parse') as parse_mock:
55+
with patch('yarn_api_client.hadoop_conf._get_rm_ids') as get_rm_ids_mock:
56+
parse_mock.return_value = 'example.com:8022'
57+
get_rm_ids_mock.return_value = None
58+
59+
host_port = hadoop_conf.get_resource_manager_host_port()
60+
61+
self.assertEqual(('example.com', '8022'), host_port)
62+
parse_mock.assert_called_with('/etc/hadoop/conf/yarn-site.xml',
63+
'yarn.resourcemanager.webapp.address')
64+
65+
parse_mock.reset_mock()
66+
parse_mock.return_value = None
67+
68+
host_port = hadoop_conf.get_resource_manager_host_port()
69+
self.assertIsNone(host_port)
70+
71+
@mock.patch('yarn_api_client.hadoop_conf._get_rm_ids')
72+
@mock.patch('yarn_api_client.hadoop_conf.parse')
73+
@mock.patch('yarn_api_client.hadoop_conf._check_is_active_rm')
74+
def test_get_resource_host_port_with_ha(self, check_is_active_rm_mock, parse_mock, get_rm_ids_mock):
75+
get_rm_ids_mock.return_value = ['rm1', 'rm2']
76+
parse_mock.return_value = 'example.com:8022'
77+
check_is_active_rm_mock.return_value = True
78+
host_port = hadoop_conf.get_resource_manager_host_port()
79+
80+
self.assertEqual(('example.com', '8022'), host_port)
81+
parse_mock.assert_called_with('/etc/hadoop/conf/yarn-site.xml',
82+
'yarn.resourcemanager.webapp.address.rm1')
83+
84+
parse_mock.reset_mock()
85+
parse_mock.return_value = None
86+
87+
host_port = hadoop_conf.get_resource_manager_host_port()
88+
self.assertIsNone(host_port)
89+
90+
def test_get_rm_ids(self):
91+
with patch('yarn_api_client.hadoop_conf.parse') as parse_mock:
92+
parse_mock.return_value = 'rm1,rm2'
93+
rm_list = hadoop_conf._get_rm_ids(hadoop_conf.CONF_DIR)
94+
self.assertEqual(['rm1', 'rm2'], rm_list)
95+
parse_mock.assert_called_with('/etc/hadoop/conf/yarn-site.xml', 'yarn.resourcemanager.ha.rm-ids')
96+
97+
parse_mock.reset_mock()
98+
parse_mock.return_value = None
99+
100+
rm_list = hadoop_conf._get_rm_ids(hadoop_conf.CONF_DIR)
101+
self.assertIsNone(rm_list)
102+
103+
@mock.patch(_http_request_method)
104+
@mock.patch(_http_getresponse_method)
105+
def test_check_is_active_rm(self, http_getresponse_mock, http_conn_request_mock):
106+
class ResponseMock():
107+
def __init__(self, status, header_dict):
108+
self.status = status
109+
self.header_dict = header_dict
110+
111+
def getheader(self, header_key, default_return):
112+
if header_key in self.header_dict:
113+
return self.header_dict[header_key]
114+
else:
115+
return default_return
116+
117+
http_conn_request_mock.return_value = None
118+
http_getresponse_mock.return_value = ResponseMock(OK, {})
119+
self.assertTrue(hadoop_conf._check_is_active_rm('example2', '8022'))
120+
http_getresponse_mock.reset_mock()
121+
http_getresponse_mock.return_value = ResponseMock(OK, {'Refresh':"testing"})
122+
self.assertFalse(hadoop_conf._check_is_active_rm('example2', '8022'))
123+
http_getresponse_mock.reset_mock()
124+
http_getresponse_mock.return_value = ResponseMock(NOT_FOUND, {'Refresh':"testing"})
125+
self.assertFalse(hadoop_conf._check_is_active_rm('example2', '8022'))
126+
http_conn_request_mock.side_effect = Exception('error')
127+
http_conn_request_mock.reset_mock()
128+
http_conn_request_mock.return_value = None
129+
self.assertFalse(hadoop_conf._check_is_active_rm('example2', '8022'))
130+
pass
131+
132+
def test_get_resource_manager(self):
40133
with patch('yarn_api_client.hadoop_conf.parse') as parse_mock:
41134
parse_mock.return_value = 'example.com:8022'
42135

43-
host_port = hadoop_conf.get_resource_manager_host_port()
136+
host_port = hadoop_conf._get_resource_manager(hadoop_conf.CONF_DIR, None)
137+
138+
self.assertEqual(('example.com', '8022'), host_port)
139+
parse_mock.assert_called_with('/etc/hadoop/conf/yarn-site.xml',
140+
'yarn.resourcemanager.webapp.address')
141+
142+
host_port = hadoop_conf._get_resource_manager(hadoop_conf.CONF_DIR, 'rm1')
44143

45144
self.assertEqual(('example.com', '8022'), host_port)
46145
parse_mock.assert_called_with('/etc/hadoop/conf/yarn-site.xml',
47-
'yarn.resourcemanager.webapp.address')
146+
'yarn.resourcemanager.webapp.address.rm1')
48147

49148
parse_mock.reset_mock()
50149
parse_mock.return_value = None
51150

52-
host_port = hadoop_conf.get_resource_manager_host_port()
151+
host_port = hadoop_conf._get_resource_manager(hadoop_conf.CONF_DIR, 'rm1')
53152
self.assertIsNone(host_port)
54153

55154
def test_get_jobhistory_host_port(self):

tox.ini

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
[tox]
2-
envlist = py26,py27,py32,py33
2+
envlist = py26,py27,py34,py35
33

44
[testenv]
55
deps =

yarn_api_client/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# -*- coding: utf-8 -*-
2-
__version__ = '0.2.3'
2+
__version__ = '0.2.4'
33
__all__ = ['ApplicationMaster', 'HistoryServer', 'NodeManager',
44
'ResourceManager']
55

yarn_api_client/base.py

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,10 @@ def request(self, api_path, **query_args):
3030
path = api_path
3131

3232
self.logger.info('Request http://%s:%s%s', self.address, self.port, path)
33-
self.http_conn.request('GET', path)
34-
35-
response = self.http_conn.getresponse()
33+
34+
http_conn = self.http_conn
35+
http_conn.request('GET', path)
36+
response = http_conn.getresponse()
3637

3738
if response.status == OK:
3839
return self.response_class(response)
@@ -44,19 +45,13 @@ def construct_parameters(self, arguments):
4445
params = dict((key, value) for key, value in arguments if value is not None)
4546
return params
4647

47-
48-
__http_conn = None
4948
@property
5049
def http_conn(self):
51-
if self.__http_conn is None:
52-
if self.address is None:
53-
raise ConfigurationError('API address is not set')
54-
elif self.port is None:
55-
raise ConfigurationError('API port is not set')
56-
self.__http_conn = HTTPConnection(self.address, self.port,
57-
timeout=self.timeout)
58-
59-
return self.__http_conn
50+
if self.address is None:
51+
raise ConfigurationError('API address is not set')
52+
elif self.port is None:
53+
raise ConfigurationError('API port is not set')
54+
return HTTPConnection(self.address, self.port, timeout=self.timeout)
6055

6156
__logger = None
6257
@property

yarn_api_client/hadoop_conf.py

Lines changed: 49 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,64 @@
11
# -*- coding: utf-8 -*-
22
import os
33
import xml.etree.ElementTree as ET
4+
try:
5+
from httplib import HTTPConnection, OK
6+
except ImportError:
7+
from http.client import HTTPConnection, OK
48

59
CONF_DIR = '/etc/hadoop/conf'
610

711

8-
def get_resource_manager_host_port():
9-
config_path = os.path.join(CONF_DIR, 'yarn-site.xml')
12+
def _get_rm_ids(hadoop_conf_path):
13+
rm_ids = parse(os.path.join(hadoop_conf_path, 'yarn-site.xml'), 'yarn.resourcemanager.ha.rm-ids')
14+
if rm_ids is not None:
15+
rm_ids = rm_ids.split(',')
16+
return rm_ids
17+
18+
19+
def _get_resource_manager(hadoop_conf_path, rm_id = None):
1020
prop_name = 'yarn.resourcemanager.webapp.address'
11-
value = parse(config_path, prop_name)
12-
if value is not None:
13-
host, _, port = value.partition(':')
14-
return host, port
21+
if rm_id is not None:
22+
rm_webapp_address = parse(os.path.join(hadoop_conf_path, 'yarn-site.xml'), '%s.%s' % (prop_name, rm_id))
23+
else:
24+
rm_webapp_address = parse(os.path.join(hadoop_conf_path, 'yarn-site.xml'), prop_name)
25+
if rm_webapp_address is not None:
26+
[host, port] = rm_webapp_address.split(':')
27+
return (host, port)
1528
else:
1629
return None
1730

1831

32+
def _check_is_active_rm(rm_web_host, rm_web_port):
33+
conn = HTTPConnection(rm_web_host, rm_web_port)
34+
try:
35+
conn.request('GET', '/cluster')
36+
except:
37+
return False
38+
response = conn.getresponse()
39+
if response.status != OK:
40+
return False
41+
else:
42+
if response.getheader('Refresh', None) is not None:
43+
return False
44+
return True
45+
46+
47+
def get_resource_manager_host_port():
48+
hadoop_conf_path = CONF_DIR
49+
rm_ids = _get_rm_ids(hadoop_conf_path)
50+
if rm_ids is not None:
51+
for rm_id in rm_ids:
52+
ret = _get_resource_manager(hadoop_conf_path, rm_id)
53+
if ret is not None:
54+
(host, port) = ret
55+
if _check_is_active_rm(host, port):
56+
return host, port
57+
return None
58+
else:
59+
return _get_resource_manager(hadoop_conf_path, None)
60+
61+
1962
def get_jobhistory_host_port():
2063
config_path = os.path.join(CONF_DIR, 'mapred-site.xml')
2164
prop_name = 'mapreduce.jobhistory.webapp.address'

0 commit comments

Comments
 (0)