Skip to content

Commit e65a61f

Browse files
committed
Support YARN endpoints protected by Kerberos/SPNEGO
1 parent ae003d7 commit e65a61f

File tree

10 files changed

+141
-85
lines changed

10 files changed

+141
-85
lines changed

.travis.yml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,11 @@ matrix:
1313

1414

1515
install:
16-
- pip install tox coveralls
16+
- pip install --upgrade setuptools pip
17+
- pip install --upgrade requests requests_mock requests_kerberos tox coveralls
18+
- python setup.py bdist_wheel
19+
- pip install dist/*.whl
20+
- pip freeze
1721

1822
script:
1923
tox

setup.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,17 @@ def find_version(*file_paths):
1919
return version_match.group(1)
2020
raise RuntimeError("Unable to find version string.")
2121

22-
23-
install_requires = []
24-
2522
setup(
2623
name = 'yarn-api-client',
2724
version = find_version('yarn_api_client', '__init__.py'),
2825
description='Python client for Hadoop® YARN API',
2926
long_description=read('README.rst'),
30-
packages = find_packages(exclude=['tests']),
27+
packages = find_packages(exclude=['tests','itests']),
3128

32-
install_requires = install_requires,
29+
install_requires = [
30+
'requests>=2.7,<3.0',
31+
'requests-kerberos==0.12.0',
32+
],
3333
entry_points = {
3434
'console_scripts': [
3535
'yarn_client = yarn_api_client.main:main',

tests/test_base.py

Lines changed: 37 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -4,56 +4,66 @@
44
except ImportError:
55
from http.client import OK
66

7-
from mock import patch
8-
from tests import TestCase
7+
import json
8+
import requests
9+
import requests_mock
910

11+
from tests import TestCase
1012
from yarn_api_client import base
1113
from yarn_api_client.errors import APIError, ConfigurationError
1214

1315

1416
class BaseYarnAPITestCase(TestCase):
15-
def test_request(self):
16-
client = self.get_client()
17-
with patch('yarn_api_client.base.HTTPConnection') as http_conn_mock:
18-
with patch('yarn_api_client.base.json'):
19-
http_conn_mock().getresponse().status = OK
17+
@staticmethod
18+
def success_response():
19+
return {
20+
'status':'success'
21+
}
2022

21-
client.request('/ololo', foo='bar')
23+
def test_valid_request(self):
24+
with requests_mock.mock() as requests_get_mock:
25+
requests_get_mock.get('/ololo', text=json.dumps(BaseYarnAPITestCase.success_response()))
2226

23-
http_conn_mock().request.assert_called_with('GET', '/ololo?foo=bar')
27+
client = self.get_client()
28+
response = client.request('/ololo', foo='bar')
2429

25-
http_conn_mock.reset_mock()
26-
client.request('/ololo')
30+
assert requests_get_mock.called
31+
self.assertIn(response.data['status'], 'success')
32+
33+
34+
def test_valid_request_with_parameters(self):
35+
with requests_mock.mock() as requests_get_mock:
36+
requests_get_mock.get('/ololo?foo=bar', text=json.dumps(BaseYarnAPITestCase.success_response()))
2737

28-
http_conn_mock()
38+
client = self.get_client()
39+
response = client.request('/ololo', foo='bar')
2940

30-
http_conn_mock().request.assert_called_with('GET', '/ololo')
41+
assert requests_get_mock.called
42+
self.assertIn(response.data['status'], 'success')
3143

3244
def test_bad_request(self):
33-
client = self.get_client()
34-
with patch('yarn_api_client.base.HTTPConnection') as http_conn_mock:
35-
http_conn_mock().getresponse().status = 404
45+
with requests_mock.mock() as requests_get_mock:
46+
requests_get_mock.get('/ololo', status_code=404)
3647

48+
client = self.get_client()
3749
with self.assertRaises(APIError):
3850
client.request('/ololo')
39-
40-
def test_http_configuration(self):
41-
client = self.get_client()
42-
client.address = None
43-
client.port = 80
4451

45-
with self.assertRaises(ConfigurationError):
46-
conn = client.http_conn
52+
def test_http_configuration(self):
53+
with requests_mock.mock() as requests_get_mock:
54+
requests_get_mock.get('/ololo', text=json.dumps(BaseYarnAPITestCase.success_response()))
4755

48-
client.address = 'localhost'
49-
client.port = None
56+
client = self.get_client()
57+
client.address = None
58+
client.port = 80
5059

51-
with self.assertRaises(ConfigurationError):
52-
conn = client.http_conn
60+
with self.assertRaises(ConfigurationError):
61+
client.request('/ololo')
5362

5463
def get_client(self):
5564
client = base.BaseYarnAPI()
5665
client.address = 'example.com'
5766
client.port = 80
5867
client.timeout = 0
68+
client.kerberos_enabled = False
5969
return client

tox.ini

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,11 @@
11
[tox]
2-
envlist = py26,py27,py34,py35
2+
envlist = py27,py35,py36
33

44
[testenv]
55
deps =
6+
requests
7+
requests-kerberos
8+
requests_mock
69
coverage
710
mock
811
commands = coverage run --source=yarn_api_client setup.py test
9-
10-
[testenv:py26]
11-
deps =
12-
argparse
13-
coverage
14-
mock
15-
unittest2

yarn_api_client/application_master.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,10 @@ class ApplicationMaster(BaseYarnAPI):
1818
:param str address: Proxy HTTP address
1919
:param int port: Proxy HTTP port
2020
:param int timeout: API connection timeout in seconds
21+
:param boolean kerberos_enabled: Flag identifying is Kerberos Security has been enabled for YARN
2122
"""
22-
def __init__(self, address=None, port=8088, timeout=30):
23-
self.address, self.port, self.timeout = address, port, timeout
23+
def __init__(self, address=None, port=8088, timeout=30, kerberos_enabled=False):
24+
self.address, self.port, self.timeout, self.kerberos_enabled = address, port, timeout, kerberos_enabled
2425
if address is None:
2526
self.logger.debug('Get configuration from hadoop conf dir')
2627
address, port = get_webproxy_host_port()

yarn_api_client/base.py

Lines changed: 43 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,60 +1,72 @@
11
# -*- coding: utf-8 -*-
22
from __future__ import unicode_literals
3-
try:
4-
from httplib import HTTPConnection, OK
5-
except ImportError:
6-
from http.client import HTTPConnection, OK
3+
74
import json
85
import logging
9-
try:
10-
from urllib import urlencode
11-
except ImportError:
12-
from urllib.parse import urlencode
6+
import requests
137

8+
from requests_kerberos import HTTPKerberosAuth
149
from .errors import APIError, ConfigurationError
1510

1611

1712
class Response(object):
18-
def __init__(self, http_response):
19-
self.data = json.load(http_response)
13+
def __init__(self, response):
14+
self.data = response.json()
2015

2116

2217
class BaseYarnAPI(object):
18+
__logger = None
2319
response_class = Response
2420

21+
def _validate_configuration(self):
22+
if self.address is None:
23+
raise ConfigurationError('API address is not set')
24+
elif self.port is None:
25+
raise ConfigurationError('API port is not set')
26+
2527
def request(self, api_path, **query_args):
26-
params = urlencode(query_args)
27-
if params:
28-
path = api_path + '?' + params
28+
params = query_args
29+
api_endpoint = 'http://{}:{}{}'.format(self.address, self.port, api_path)
30+
31+
self.logger.info('API Endpoint {}'.format(api_endpoint))
32+
33+
self._validate_configuration()
34+
35+
response = None
36+
if self.kerberos_enabled:
37+
response = requests.get(api_endpoint, params, auth=HTTPKerberosAuth())
2938
else:
30-
path = api_path
39+
response = requests.get(api_endpoint, params)
3140

32-
self.logger.info('Request http://%s:%s%s', self.address, self.port, path)
33-
34-
http_conn = self.http_conn
35-
http_conn.request('GET', path)
36-
response = http_conn.getresponse()
41+
if response.status_code == requests.codes.ok:
42+
return self.response_class(response)
43+
else:
44+
msg = 'Response finished with status: %s. Details: %s' % (response.status_code, response.text)
45+
raise APIError(msg)
3746

38-
if response.status == OK:
47+
def update(self, api_path, data):
48+
api_endpoint = 'http://{}:{}{}'.format(self.address, self.port, api_path)
49+
50+
self.logger.info('API Endpoint {}'.format(api_endpoint))
51+
52+
self._validate_configuration()
53+
54+
response = None
55+
if self.kerberos_enabled:
56+
response = requests.put(api_endpoint, data=data, auth=HTTPKerberosAuth())
57+
else:
58+
response = requests.put(api_endpoint, data=data)
59+
60+
if response.status_code == requests.codes.ok:
3961
return self.response_class(response)
4062
else:
41-
explanation = response.read()
42-
msg = 'Response finished with status: %s. Details: %s' % (response.status, explanation)
63+
msg = 'Response finished with status: %s. Details: %s' % (response.status_code, response.text)
4364
raise APIError(msg)
4465

4566
def construct_parameters(self, arguments):
4667
params = dict((key, value) for key, value in arguments if value is not None)
4768
return params
4869

49-
@property
50-
def http_conn(self):
51-
if self.address is None:
52-
raise ConfigurationError('API address is not set')
53-
elif self.port is None:
54-
raise ConfigurationError('API port is not set')
55-
return HTTPConnection(self.address, self.port, timeout=self.timeout)
56-
57-
__logger = None
5870
@property
5971
def logger(self):
6072
if self.__logger is None:

yarn_api_client/history_server.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,10 @@ class HistoryServer(BaseYarnAPI):
1818
:param str address: HistoryServer HTTP address
1919
:param int port: HistoryServer HTTP port
2020
:param int timeout: API connection timeout in seconds
21+
:param boolean kerberos_enabled: Flag identifying is Kerberos Security has been enabled for YARN
2122
"""
22-
def __init__(self, address=None, port=19888, timeout=30):
23-
self.address, self.port, self.timeout = address, port, timeout
23+
def __init__(self, address=None, port=19888, timeout=30, kerberos_enabled=False):
24+
self.address, self.port, self.timeout, self.kerberos_enabled = address, port, timeout, kerberos_enabled
2425
if address is None:
2526
self.logger.debug('Get information from hadoop conf dir')
2627
address, port = get_jobhistory_host_port()

yarn_api_client/main.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ def main():
256256
method_args = [getattr(opts, arg) for arg in opts.method_args]
257257
else:
258258
method_args = []
259-
# Construce key arguments for method
259+
# Construct key arguments for method
260260
if 'method_kwargs' in opts:
261261
method_kwargs = dict((key, getattr(opts, key)) for key in opts.method_kwargs)
262262
else:

yarn_api_client/node_manager.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,10 @@ class NodeManager(BaseYarnAPI):
1212
:param str address: NodeManager HTTP address
1313
:param int port: NodeManager HTTP port
1414
:param int timeout: API connection timeout in seconds
15+
:param boolean kerberos_enabled: Flag identifying is Kerberos Security has been enabled for YARN
1516
"""
16-
def __init__(self, address=None, port=8042, timeout=30):
17-
self.address, self.port, self.timeout = address, port, timeout
17+
def __init__(self, address=None, port=8042, timeout=30, kerberos_enabled=False):
18+
self.address, self.port, self.timeout, self.kerberos_enabled = address, port, timeout, kerberos_enabled
1819

1920
def node_information(self):
2021
"""

yarn_api_client/resource_manager.py

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,10 @@ class ResourceManager(BaseYarnAPI):
1919
:param str address: ResourceManager HTTP address
2020
:param int port: ResourceManager HTTP port
2121
:param int timeout: API connection timeout in seconds
22+
:param boolean kerberos_enabled: Flag identifying is Kerberos Security has been enabled for YARN
2223
"""
23-
def __init__(self, address=None, port=8088, timeout=30):
24-
self.address, self.port, self.timeout = address, port, timeout
24+
def __init__(self, address=None, port=8088, timeout=30, kerberos_enabled=False):
25+
self.address, self.port, self.timeout, self.kerberos_enabled = address, port, timeout, kerberos_enabled
2526
if address is None:
2627
self.logger.debug('Get configuration from hadoop conf dir')
2728
address, port = get_resource_manager_host_port()
@@ -132,7 +133,7 @@ def cluster_application_statistics(self, state_list=None,
132133
comma-separated list. If states is not provided, the API will
133134
enumerate all application states and return the counts of them.
134135
:param list application_type_list: types of the applications,
135-
specified as a comma-separated list. If applicationTypes is not
136+
specified as a comma-separated list. If application_types is not
136137
provided, the API will count the applications of any application
137138
type. In this case, the response shows * to indicate any
138139
application type. Note that we only support at most one
@@ -146,13 +147,13 @@ def cluster_application_statistics(self, state_list=None,
146147
# TODO: validate state argument
147148
states = ','.join(state_list) if state_list is not None else None
148149
if application_type_list is not None:
149-
applicationTypes = ','.join(application_type_list)
150+
application_types = ','.join(application_type_list)
150151
else:
151-
applicationTypes = None
152+
application_types = None
152153

153154
loc_args = (
154155
('states', states),
155-
('applicationTypes', applicationTypes))
156+
('applicationTypes', application_types))
156157
params = self.construct_parameters(loc_args)
157158

158159
return self.request(path, **params)
@@ -184,6 +185,36 @@ def cluster_application_attempts(self, application_id):
184185

185186
return self.request(path)
186187

188+
def cluster_application_state(self, application_id):
189+
"""
190+
With the application state API, you can obtain the current
191+
state of an application.
192+
193+
:param str application_id: The application id
194+
:returns: API response object with JSON data
195+
:rtype: :py:class:`yarn_api_client.base.Response`
196+
"""
197+
path = '/ws/v1/cluster/apps/{appid}/state'.format(
198+
appid=application_id)
199+
200+
return self.request(path)
201+
202+
def cluster_application_kill(self, application_id):
203+
"""
204+
With the application kill API, you can kill an application
205+
that is not in FINISHED or FAILED state.
206+
207+
:param str application_id: The application id
208+
:returns: API response object with JSON data
209+
:rtype: :py:class:`yarn_api_client.base.Response`
210+
"""
211+
212+
data = '{"state": "KILLED"}'
213+
path = '/ws/v1/cluster/apps/{appid}/state'.format(
214+
appid=application_id)
215+
216+
return self.put(path, data)
217+
187218
def cluster_nodes(self, state=None, healthy=None):
188219
"""
189220
With the Nodes API, you can obtain a collection of resources, each of

0 commit comments

Comments
 (0)