Skip to content

Commit 648c19e

Browse files
author
tianjin.gutj
committed
1. Reusing HttpConnection cause ResponseNotReady(see also http://stackoverflow.com/questions/3231543/python-httplib-responsenotready), fix this bug
2. Make compatible to resource manager which ha enabled 3. Add cases for new feature 4. Remove run tests in py32 py33 and add py34 py35 5. Bump version to 0.2.4
1 parent 9c13c50 commit 648c19e

File tree

9 files changed

+168
-38
lines changed

9 files changed

+168
-38
lines changed

.travis.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ script:
77
env:
88
- TOXENV=py26
99
- TOXENV=py27
10-
- TOXENV=py32
11-
- TOXENV=py33
10+
- TOXENV=py34
11+
- TOXENV=py35
1212
after_success:
1313
coveralls

docs/conf.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,9 @@
5454
# built documents.
5555
#
5656
# The short X.Y version.
57-
version = '0.2.3'
57+
version = '0.2.4'
5858
# The full version, including alpha/beta/rc tags.
59-
release = '0.2.3'
59+
release = '0.2.4'
6060

6161
# The language for content autogenerated by Sphinx. Refer to documentation
6262
# for a list of supported languages.

setup.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,8 @@ def find_version(*file_paths):
5050
'Operating System :: OS Independent',
5151
'Programming Language :: Python :: 2.6',
5252
'Programming Language :: Python :: 2.7',
53-
'Programming Language :: Python :: 3.2',
54-
'Programming Language :: Python :: 3.3',
53+
'Programming Language :: Python :: 3.4',
54+
'Programming Language :: Python :: 3.5',
5555
'Topic :: System :: Distributed Computing',
5656
],
5757
)

tests/test_base.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,6 @@
1212

1313

1414
class BaseYarnAPITestCase(TestCase):
15-
def test_http_property_cache(self):
16-
client = self.get_client()
17-
http_conn1 = client.http_conn
18-
http_conn2 = client.http_conn
19-
20-
self.assertIs(http_conn1, http_conn2)
21-
2215
def test_request(self):
2316
client = self.get_client()
2417
with patch('yarn_api_client.base.HTTPConnection') as http_conn_mock:

tests/test_hadoop_conf.py

Lines changed: 102 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,25 @@
11
# -*- coding: utf-8 -*-
22
from tempfile import NamedTemporaryFile
33

4+
import mock
45
from mock import patch
56
from tests import TestCase
67

78
from yarn_api_client import hadoop_conf
89

10+
11+
_http_request_method = ''
12+
_http_getresponse_method = ''
13+
14+
try:
15+
from httplib import HTTPConnection, OK, NOT_FOUND
16+
_http_request_method = 'httplib.HTTPConnection.request'
17+
_http_getresponse_method = 'httplib.HTTPConnection.getresponse'
18+
except ImportError:
19+
from http.client import HTTPConnection, OK, NOT_FOUND
20+
_http_request_method = 'http.client.HTTPConnection.request'
21+
_http_getresponse_method = 'http.client.HTTPConnection.getresponse'
22+
923
empty_config = '<configuration></configuration>'.encode('latin1')
1024

1125
yarn_site_xml = """\
@@ -37,19 +51,104 @@ def test_parse(self):
3751
self.assertEqual(None, value)
3852

3953
def test_get_resource_host_port(self):
54+
with patch('yarn_api_client.hadoop_conf.parse') as parse_mock:
55+
with patch('yarn_api_client.hadoop_conf._get_rm_ids') as get_rm_ids_mock:
56+
parse_mock.return_value = 'example.com:8022'
57+
get_rm_ids_mock.return_value = None
58+
59+
host_port = hadoop_conf.get_resource_manager_host_port()
60+
61+
self.assertEqual(('example.com', '8022'), host_port)
62+
parse_mock.assert_called_with('/etc/hadoop/conf/yarn-site.xml',
63+
'yarn.resourcemanager.webapp.address')
64+
65+
parse_mock.reset_mock()
66+
parse_mock.return_value = None
67+
68+
host_port = hadoop_conf.get_resource_manager_host_port()
69+
self.assertIsNone(host_port)
70+
71+
@mock.patch('yarn_api_client.hadoop_conf._get_rm_ids')
72+
@mock.patch('yarn_api_client.hadoop_conf.parse')
73+
@mock.patch('yarn_api_client.hadoop_conf._check_is_active_rm')
74+
def test_get_resource_host_port_with_ha(self, check_is_active_rm_mock, parse_mock, get_rm_ids_mock):
75+
get_rm_ids_mock.return_value = ['rm1', 'rm2']
76+
parse_mock.return_value = 'example.com:8022'
77+
check_is_active_rm_mock.return_value = True
78+
host_port = hadoop_conf.get_resource_manager_host_port()
79+
80+
self.assertEqual(('example.com', '8022'), host_port)
81+
parse_mock.assert_called_with('/etc/hadoop/conf/yarn-site.xml',
82+
'yarn.resourcemanager.webapp.address.rm1')
83+
84+
parse_mock.reset_mock()
85+
parse_mock.return_value = None
86+
87+
host_port = hadoop_conf.get_resource_manager_host_port()
88+
self.assertIsNone(host_port)
89+
90+
def test_get_rm_ids(self):
91+
with patch('yarn_api_client.hadoop_conf.parse') as parse_mock:
92+
parse_mock.return_value = 'rm1,rm2'
93+
rm_list = hadoop_conf._get_rm_ids(hadoop_conf.CONF_DIR)
94+
self.assertEqual(['rm1', 'rm2'], rm_list)
95+
parse_mock.assert_called_with('/etc/hadoop/conf/yarn-site.xml', 'yarn.resourcemanager.ha.rm-ids')
96+
97+
parse_mock.reset_mock()
98+
parse_mock.return_value = None
99+
100+
rm_list = hadoop_conf._get_rm_ids(hadoop_conf.CONF_DIR)
101+
self.assertIsNone(rm_list)
102+
103+
@mock.patch(_http_request_method)
104+
@mock.patch(_http_getresponse_method)
105+
def test_check_is_active_rm(self, http_getresponse_mock, http_conn_request_mock):
106+
class ResponseMock():
107+
def __init__(self, status, header_dict):
108+
self.status = status
109+
self.header_dict = header_dict
110+
111+
def getheader(self, header_key, default_return):
112+
if header_key in self.header_dict:
113+
return self.header_dict[header_key]
114+
else:
115+
return default_return
116+
117+
http_conn_request_mock.return_value = None
118+
http_getresponse_mock.return_value = ResponseMock(OK, {})
119+
self.assertTrue(hadoop_conf._check_is_active_rm('example2', '8022'))
120+
http_getresponse_mock.reset_mock()
121+
http_getresponse_mock.return_value = ResponseMock(OK, {'Refresh':"testing"})
122+
self.assertFalse(hadoop_conf._check_is_active_rm('example2', '8022'))
123+
http_getresponse_mock.reset_mock()
124+
http_getresponse_mock.return_value = ResponseMock(NOT_FOUND, {'Refresh':"testing"})
125+
self.assertFalse(hadoop_conf._check_is_active_rm('example2', '8022'))
126+
http_conn_request_mock.side_effect = Exception('error')
127+
http_conn_request_mock.reset_mock()
128+
http_conn_request_mock.return_value = None
129+
self.assertFalse(hadoop_conf._check_is_active_rm('example2', '8022'))
130+
pass
131+
132+
def test_get_resource_manager(self):
40133
with patch('yarn_api_client.hadoop_conf.parse') as parse_mock:
41134
parse_mock.return_value = 'example.com:8022'
42135

43-
host_port = hadoop_conf.get_resource_manager_host_port()
136+
host_port = hadoop_conf._get_resource_manager(hadoop_conf.CONF_DIR, None)
137+
138+
self.assertEqual(('example.com', '8022'), host_port)
139+
parse_mock.assert_called_with('/etc/hadoop/conf/yarn-site.xml',
140+
'yarn.resourcemanager.webapp.address')
141+
142+
host_port = hadoop_conf._get_resource_manager(hadoop_conf.CONF_DIR, 'rm1')
44143

45144
self.assertEqual(('example.com', '8022'), host_port)
46145
parse_mock.assert_called_with('/etc/hadoop/conf/yarn-site.xml',
47-
'yarn.resourcemanager.webapp.address')
146+
'yarn.resourcemanager.webapp.address.rm1')
48147

49148
parse_mock.reset_mock()
50149
parse_mock.return_value = None
51150

52-
host_port = hadoop_conf.get_resource_manager_host_port()
151+
host_port = hadoop_conf._get_resource_manager(hadoop_conf.CONF_DIR, 'rm1')
53152
self.assertIsNone(host_port)
54153

55154
def test_get_jobhistory_host_port(self):

tox.ini

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
[tox]
2-
envlist = py26,py27,py32,py33
2+
envlist = py26,py27,py34,py35
33

44
[testenv]
55
deps =

yarn_api_client/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# -*- coding: utf-8 -*-
2-
__version__ = '0.2.3'
2+
__version__ = '0.2.4'
33
__all__ = ['ApplicationMaster', 'HistoryServer', 'NodeManager',
44
'ResourceManager']
55

yarn_api_client/base.py

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,10 @@ def request(self, api_path, **query_args):
3030
path = api_path
3131

3232
self.logger.info('Request http://%s:%s%s', self.address, self.port, path)
33-
self.http_conn.request('GET', path)
34-
35-
response = self.http_conn.getresponse()
33+
34+
http_conn = self.http_conn
35+
http_conn.request('GET', path)
36+
response = http_conn.getresponse()
3637

3738
if response.status == OK:
3839
return self.response_class(response)
@@ -44,19 +45,13 @@ def construct_parameters(self, arguments):
4445
params = dict((key, value) for key, value in arguments if value is not None)
4546
return params
4647

47-
48-
__http_conn = None
4948
@property
5049
def http_conn(self):
51-
if self.__http_conn is None:
52-
if self.address is None:
53-
raise ConfigurationError('API address is not set')
54-
elif self.port is None:
55-
raise ConfigurationError('API port is not set')
56-
self.__http_conn = HTTPConnection(self.address, self.port,
57-
timeout=self.timeout)
58-
59-
return self.__http_conn
50+
if self.address is None:
51+
raise ConfigurationError('API address is not set')
52+
elif self.port is None:
53+
raise ConfigurationError('API port is not set')
54+
return HTTPConnection(self.address, self.port, timeout=self.timeout)
6055

6156
__logger = None
6257
@property

yarn_api_client/hadoop_conf.py

Lines changed: 49 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,64 @@
11
# -*- coding: utf-8 -*-
22
import os
33
import xml.etree.ElementTree as ET
4+
try:
5+
from httplib import HTTPConnection, OK
6+
except ImportError:
7+
from http.client import HTTPConnection, OK
48

59
CONF_DIR = '/etc/hadoop/conf'
610

711

8-
def get_resource_manager_host_port():
9-
config_path = os.path.join(CONF_DIR, 'yarn-site.xml')
12+
def _get_rm_ids(hadoop_conf_path):
13+
rm_ids = parse(os.path.join(hadoop_conf_path, 'yarn-site.xml'), 'yarn.resourcemanager.ha.rm-ids')
14+
if rm_ids is not None:
15+
rm_ids = rm_ids.split(',')
16+
return rm_ids
17+
18+
19+
def _get_resource_manager(hadoop_conf_path, rm_id = None):
1020
prop_name = 'yarn.resourcemanager.webapp.address'
11-
value = parse(config_path, prop_name)
12-
if value is not None:
13-
host, _, port = value.partition(':')
14-
return host, port
21+
if rm_id is not None:
22+
rm_webapp_address = parse(os.path.join(hadoop_conf_path, 'yarn-site.xml'), '%s.%s' % (prop_name, rm_id))
23+
else:
24+
rm_webapp_address = parse(os.path.join(hadoop_conf_path, 'yarn-site.xml'), prop_name)
25+
if rm_webapp_address is not None:
26+
[host, port] = rm_webapp_address.split(':')
27+
return (host, port)
1528
else:
1629
return None
1730

1831

32+
def _check_is_active_rm(rm_web_host, rm_web_port):
33+
conn = HTTPConnection(rm_web_host, rm_web_port)
34+
try:
35+
conn.request('GET', '/cluster')
36+
except:
37+
return False
38+
response = conn.getresponse()
39+
if response.status != OK:
40+
return False
41+
else:
42+
if response.getheader('Refresh', None) is not None:
43+
return False
44+
return True
45+
46+
47+
def get_resource_manager_host_port():
48+
hadoop_conf_path = CONF_DIR
49+
rm_ids = _get_rm_ids(hadoop_conf_path)
50+
if rm_ids is not None:
51+
for rm_id in rm_ids:
52+
ret = _get_resource_manager(hadoop_conf_path, rm_id)
53+
if ret is not None:
54+
(host, port) = ret
55+
if _check_is_active_rm(host, port):
56+
return host, port
57+
return None
58+
else:
59+
return _get_resource_manager(hadoop_conf_path, None)
60+
61+
1962
def get_jobhistory_host_port():
2063
config_path = os.path.join(CONF_DIR, 'mapred-site.xml')
2164
prop_name = 'mapreduce.jobhistory.webapp.address'

0 commit comments

Comments
 (0)