Skip to content

Commit a8a82fa

Browse files
committed
Code format
1 parent 6e53b52 commit a8a82fa

5 files changed

+57
-67
lines changed

heartbeat_table.py

+8-18
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,7 @@ class HeartBeatTable(object):
99

1010
def __init__(self, provisioning_timeout=timedelta(minutes=15), heartbeat_timeout=timedelta(minutes=3)):
1111
self._table = {}
12-
self.logger = logging_aux.init_logger_aux(
13-
"hpcframework.heartbeat", "hpcframework.heartbeat.log")
12+
self.logger = logging_aux.init_logger_aux("hpcframework.heartbeat", "hpcframework.heartbeat.log")
1413
self.on_host_running = []
1514
self._table_lock = threading.Lock()
1615
self._provisioning_timeout = provisioning_timeout
@@ -20,8 +19,7 @@ def add_slaveinfo(self, hostname, agent_id, task_id, cpus, last_heartbeat=dateti
2019
if hostname in self._table and self._table[hostname].state != HpcState.Closed:
2120
self.logger.warn("Heart beat entry of {} existed. old value: {}.".format(
2221
hostname, str(self._table[hostname])))
23-
slaveinfo = SlaveInfo(hostname, agent_id, task_id,
24-
cpus, last_heartbeat, HpcState.Provisioning)
22+
slaveinfo = SlaveInfo(hostname, agent_id, task_id, cpus, last_heartbeat, HpcState.Provisioning)
2523
self._table[hostname] = slaveinfo
2624
self.logger.info("Heart beat entry added: {}".format(str(slaveinfo)))
2725

@@ -34,39 +32,31 @@ def on_slave_heartbeat(self, hostname):
3432
if self._table[hostname].state == HpcState.Provisioning:
3533
self._table[hostname].state = HpcState.Running
3634
self.__exec_callback(self.on_host_running)
37-
self.logger.info(
38-
"Host {} start running".format(hostname))
35+
self.logger.info("Host {} start running".format(hostname))
3936
else:
40-
self.logger.error(
41-
"Host {} is not recognized. Heartbeat ignored.".format(hostname))
37+
self.logger.error("Host {} is not recognized. Heartbeat ignored.".format(hostname))
4238

4339
def on_slave_close(self, hostname):
4440
if hostname in self._table:
4541
self._table[hostname].state = HpcState.Closed
4642
self.logger.info("Host {} closed".format(hostname))
4743
else:
48-
self.logger.error(
49-
"Host {} is not recognized. Close event ignored.".format(hostname))
44+
self.logger.error("Host {} is not recognized. Close event ignored.".format(hostname))
5045

5146
def get_task_info(self, hostname):
5247
if hostname in self._table:
5348
entry = self._table[hostname]
5449
return (entry.task_id, entry.agent_id)
5550
else:
56-
self.logger.error(
57-
"Host {} is not recognized. Failed to get task info.".format(hostname))
51+
self.logger.error("Host {} is not recognized. Failed to get task info.".format(hostname))
5852

5953
def __exec_callback(self, callbacks):
6054
for callback in callbacks:
6155
try:
62-
self.logger.debug(
63-
'Callback %s on %s' % (callback.__name__)
64-
)
56+
self.logger.debug('Callback %s on %s' % (callback.__name__))
6557
callback()
6658
except Exception as e:
67-
self.logger.exception(
68-
'Error in %s callback: %s' % (callback.__name__, str(e))
69-
)
59+
self.logger.exception('Error in %s callback: %s' % (callback.__name__, str(e)))
7060

7161
def check_timeout(self, now=datetime.utcnow()):
7262
provision_timeout_list = []

hpcframework.py

+13-19
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,11 @@
1414
from mesoshttp.client import MesosClient
1515
from mesoshttp.offers import Offer
1616

17+
import heartbeat_table
1718
import logging_aux
1819
import restclient
1920
import restserver
2021
from restclient import AutoScaleRestClient
21-
import heartbeat_table
2222

2323

2424
class Test(object):
@@ -36,8 +36,7 @@ def run(self):
3636

3737
def __init__(self):
3838
logging.basicConfig()
39-
self.logger = logging_aux.init_logger_aux(
40-
"hpcframework", "hpcframework.log")
39+
self.logger = logging_aux.init_logger_aux("hpcframework", "hpcframework.log")
4140
# signal.signal(signal.SIGINT, signal.SIG_IGN)
4241
logging.getLogger('mesoshttp').setLevel(logging.DEBUG)
4342

@@ -49,8 +48,7 @@ def __init__(self):
4948
hpc_setup_ps1 = scriptfile.read()
5049
self.logger.info("Loaded HPC setup script:\n{}".format(hpc_setup_ps1))
5150
hpc_setup_ps1_utf16 = hpc_setup_ps1.encode('utf-16')
52-
hpc_setup_ps1_utf16_nobom = hpc_setup_ps1_utf16[2:] if hpc_setup_ps1_utf16[
53-
0:2] == codecs.BOM_UTF16 else hpc_setup_ps1_utf16
51+
hpc_setup_ps1_utf16_nobom = hpc_setup_ps1_utf16[2:] if hpc_setup_ps1_utf16[0:2] == codecs.BOM_UTF16 else hpc_setup_ps1_utf16
5452
self.hpc_setup_ps1_b64 = base64.b64encode(hpc_setup_ps1_utf16_nobom)
5553

5654
self.driver = None # type: MesosClient.SchedulerDriver
@@ -62,7 +60,7 @@ def __init__(self):
6260
self.th = Test.MesosFramework(self.mesos_client)
6361
self.th.start()
6462

65-
self.heartbeat_server = restserver.RestServer(self.heartbeat_table ,8088)
63+
self.heartbeat_server = restserver.RestServer(self.heartbeat_table, 8088)
6664
self.heartbeat_server.start()
6765

6866
while True and self.th.isAlive():
@@ -92,21 +90,19 @@ def status_update(self, update):
9290
def offer_received(self, offers):
9391
# self.logger.info('OFFER: %s' % (str(offers)))
9492
grow_decision = self.hpc_client.get_grow_decision()
93+
cores_to_grow = grow_decision.cores_to_grow - self.heartbeat_table.get_cores_in_provisioning()
9594

96-
if grow_decision.cores_to_grow - self.core_provisioning > 0:
95+
if cores_to_grow > 0:
9796
for offer in offers: # type: Offer
9897
mesos_offer = offer.get_offer()
99-
self.logger.info("offer_received: {}".format(
100-
(str(mesos_offer))))
98+
self.logger.info("offer_received: {}".format((str(mesos_offer))))
10199
if 'attributes' in mesos_offer:
102100
attributes = mesos_offer['attributes']
103101
if self.get_text(attributes, 'os') != 'windows_server':
104102
offer.decline()
105103
else:
106104
cores = self.get_scalar(attributes, 'cores')
107-
cpus = self.get_scalar(
108-
mesos_offer['resources'], 'cpus')
109-
105+
cpus = self.get_scalar(mesos_offer['resources'], 'cpus')
110106
if cores == cpus:
111107
self.accept_offer(offer)
112108
else:
@@ -118,10 +114,9 @@ def offer_received(self, offers):
118114
offer.decline()
119115

120116
def accept_offer(self, offer):
121-
self.logger.info("Offer %s meets HPC's requirement" %
122-
offer.get_offer()['id']['value'])
117+
self.logger.info("Offer %s meets HPC's requirement" % offer.get_offer()['id']['value'])
123118
self.run_job(offer)
124-
119+
125120
def get_scalar(self, collection, name):
126121
for i in collection:
127122
if i['name'] == name:
@@ -137,7 +132,6 @@ def get_text(self, collection, name):
137132
def run_job(self, mesos_offer):
138133
offer = mesos_offer.get_offer()
139134
self.logger.info("Accepting offer: {}".format(str(offer)))
140-
141135
agent_id = offer['agent_id']['value']
142136
hostname = offer['hostname']
143137
task_id = uuid.uuid4().hex
@@ -162,10 +156,10 @@ def run_job(self, mesos_offer):
162156
],
163157
'command': {'value': 'powershell -EncodedCommand ' + self.hpc_setup_ps1_b64}
164158
}
165-
self.logger.debug(
166-
"Sending command:\n{}".format(task['command']['value']))
159+
self.logger.debug("Sending command:\n{}".format(task['command']['value']))
167160
mesos_offer.accept([task])
168-
self.heartbeat_table.add_slaveinfo(hostname, agent_id, task, cpus)
161+
self.heartbeat_table.add_slaveinfo(hostname, agent_id, task, cpus)
162+
169163

170164
if __name__ == "__main__":
171165
test_mesos = Test()

logging_aux.py

+4-3
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import logging
22

3-
def init_logger_aux(logger_name, filelog_name, console_level=logging.WARNING, file_level=logging.DEBUG):
4-
logger = logging.getLogger(logger_name) # type: logging.Logger
3+
4+
def init_logger_aux(logger_name, filelog_name, console_level=logging.WARNING, file_level=logging.DEBUG):
5+
logger = logging.getLogger(logger_name) # type: logging.Logger
56
logger.setLevel(file_level)
67
fh = logging.FileHandler(filelog_name)
78
fh.setLevel(file_level)
@@ -12,4 +13,4 @@ def init_logger_aux(logger_name, filelog_name, console_level=logging.WARNING, fi
1213
ch.setFormatter(formatter)
1314
logger.addHandler(fh)
1415
logger.addHandler(ch)
15-
return logger
16+
return logger

restclient.py

+10-6
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
1-
import requests
21
import json
32
import logging
43
from collections import namedtuple
4+
5+
import requests
6+
57
import logging_aux
68

79
GrowDecision = namedtuple("GrowDecision", "cores_to_grow nodes_to_grow sockets_to_grow")
810
IdleNode = namedtuple("IdleNode", "node_name idle_since")
911

12+
1013
class AutoScaleRestClient(object):
1114
def __init__(self, hostname="localhost"):
1215
self.hostname = hostname
@@ -16,27 +19,28 @@ def __init__(self, hostname="localhost"):
1619

1720
def get_grow_decision(self):
1821
url = self.grow_decision_api_route.format(self.hostname)
19-
res = requests.post(url, verify = False)
22+
res = requests.post(url, verify=False)
2023
if res.ok:
2124
self.logger.info(res.content)
2225
jobj = json.loads(res.content)
2326
return GrowDecision(jobj['CoresToGrow'], jobj['NodesToGrow'], jobj['SocketsToGrow'])
2427
else:
2528
self.logger.error("status_code:{} content:{}".format(res.status_code, res.content))
26-
29+
2730
def check_nodes_idle(self, nodes):
2831
headers = {"Content-Type": "application/json"}
2932
url = self.check_nodes_idle_route.format(self.hostname)
30-
res = requests.post(url, data = nodes, headers = headers, verify = False)
33+
res = requests.post(url, data=nodes, headers=headers, verify=False)
3134
if res.ok:
3235
self.logger.info(res.content)
33-
jobjs = json.loads(res.content)
36+
jobjs = json.loads(res.content)
3437
return [IdleNode(idle_info['NodeName'], idle_info['IdleSince']) for idle_info in jobjs]
3538
else:
3639
self.logger.error("status_code:{} content:{}".format(res.status_code, res.content))
3740

41+
3842
if __name__ == '__main__':
3943
client = AutoScaleRestClient()
4044
ans = client.get_grow_decision()
4145
print ans.cores_to_grow
42-
print client.check_nodes_idle(json.dumps(['mesoswinagent', 'mesoswinagent2']))
46+
print client.check_nodes_idle(json.dumps(['mesoswinagent', 'mesoswinagent2']))

restserver.py

+22-21
Original file line numberDiff line numberDiff line change
@@ -5,34 +5,35 @@
55
import logging_aux
66
import logging
77

8-
class RestServer(object): # TODO: replace this implementation with twisted based implementation
9-
def __init__(self, heartbeat_table, port = 80):
10-
self.logger = logging_aux.init_logger_aux(
11-
"hpcframework.heatbeat_server", "hpcframework.heatbeat_server.log")
12-
self._heartbeat_table = heartbeat_table # type: HeartBeatTable
8+
9+
class RestServer(object): # TODO: replace this implementation with twisted based implementation
10+
def __init__(self, heartbeat_table, port=80):
11+
self.logger = logging_aux.init_logger_aux("hpcframework.heatbeat_server", "hpcframework.heatbeat_server.log")
12+
self._heartbeat_table = heartbeat_table # type: HeartBeatTable
1313
self._server_address = ('', port)
14-
self._server_class = HTTPServer
14+
self._server_class = HTTPServer
1515
self._handler_class = HeartBeatHandler
1616
self._port = port
1717
self._httpd = self._server_class(self._server_address, self._handler_class)
18-
self._server_thread = threading.Thread(target=self.run)
18+
self._server_thread = threading.Thread(target=self.run)
1919
HeartBeatHandler.logger = self.logger
2020
HeartBeatHandler.heartbeat_table = self._heartbeat_table
2121

22-
def run(self):
22+
def run(self):
2323
self.logger.debug('Starting httpd...')
24-
self._httpd.serve_forever()
25-
24+
self._httpd.serve_forever()
25+
2626
def stop(self):
27-
self._httpd.shutdown()
27+
self._httpd.shutdown()
2828
self._server_thread.join()
2929

3030
def start(self):
3131
self._server_thread.start()
3232

33+
3334
class HeartBeatHandler(BaseHTTPRequestHandler):
34-
logger = None # type: logging.Logger
35-
heartbeat_table = None # type: HeartBeatTable
35+
logger = None # type: logging.Logger
36+
heartbeat_table = None # type: HeartBeatTable
3637

3738
def _set_headers(self):
3839
self.send_response(200)
@@ -45,25 +46,25 @@ def do_GET(self):
4546

4647
def do_HEAD(self):
4748
self._set_headers()
48-
49+
4950
def do_POST(self):
5051
# Doesn't do anything with posted data
51-
content_length = int(self.headers['Content-Length']) # <--- Gets the size of data
52-
post_data = self.rfile.read(content_length) # <--- Gets the data itself
52+
content_length = int(self.headers['Content-Length']) # <--- Gets the size of data
53+
post_data = self.rfile.read(content_length) # <--- Gets the data itself
5354
self._set_headers()
54-
json_obj = json.loads(post_data)
55+
json_obj = json.loads(post_data)
5556
# self.wfile.write("<html><body><h1>POST!</h1><pre>" + str(json_obj) + "</pre></body></html>")
5657
self.logger.debug("Received heartbeat object {}".format(str(json_obj)))
5758
try:
5859
self.heartbeat_table.on_slave_heartbeat(json_obj['hostname'])
5960
except Exception as ex:
60-
self.logger.exception(ex)
61-
61+
self.logger.exception(ex)
62+
6263

6364
# if __name__ == "__main__":
6465
# from sys import argv
65-
#
66+
#
6667
# if len(argv) == 2:
6768
# run(port=int(argv[1]))
6869
# else:
69-
# run()
70+
# run()

0 commit comments

Comments
 (0)