Skip to content

Commit 54fbe28

Browse files
committedJul 19, 2018
Merge branch 'feature/remove-rest-server' into develop
2 parents 331d848 + a19d183 commit 54fbe28

6 files changed

+83
-200
lines changed
 

‎hpc_cluster_manager.py

+63-72
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@
99

1010

1111
class HpcState:
12-
Unknown, Provisioning, Configuring, Running, Draining, Closing, Closed = range(7)
13-
Names = ["Unknown", "Provisioning", "Configuring", "Running", "Draining", "Closing", "Closed"]
12+
Unknown, Provisioning, Running, Draining, Closing, Closed = range(6)
13+
Names = ["Unknown", "Provisioning", "Running", "Draining", "Closing", "Closed"]
1414

1515

1616
def _upper_strings(strs):
@@ -72,16 +72,15 @@ class HpcClusterManager(object):
7272
NODE_IDLE_TIMEOUT = 180.0
7373

7474
# TODO: add configuration_timeout
75-
def __init__(self, hpc_rest_client, provisioning_timeout=timedelta(minutes=15),
76-
heartbeat_timeout=timedelta(minutes=3), idle_timeout=timedelta(minutes=3), node_group=""):
77-
# type: (HpcRestClient, timedelta, timedelta, timedelta, str) -> ()
78-
self._heart_beat_table = {} # type: Dict[str, SlaveInfo]
75+
def __init__(self, hpc_rest_client, provisioning_timeout=timedelta(minutes=15), idle_timeout=timedelta(minutes=3),
76+
node_group=""):
77+
# type: (HpcRestClient, timedelta, timedelta, str) -> ()
78+
self._slave_info_table = {} # type: Dict[str, SlaveInfo]
7979
self._removed_nodes = set() # type: Set[str]
8080
self._node_idle_check_table = {}
8181
self.logger = logging_aux.init_logger_aux("hpcframework.clustermanager", "hpcframework.clustermanager.log")
8282
self._table_lock = threading.Lock()
8383
self._provisioning_timeout = provisioning_timeout
84-
self._heartbeat_timeout = heartbeat_timeout
8584
self._hpc_client = hpc_rest_client
8685
self._node_group = node_group # TODO: change to a centralized config
8786

@@ -104,40 +103,43 @@ def add_slaveinfo(self, fqdn, agent_id, task_id, cpus, last_heartbeat=None):
104103
last_heartbeat = datetime.now(pytz.utc)
105104
u_fqdn = fqdn.upper()
106105
hostname = _get_hostname_from_fqdn(u_fqdn)
107-
if hostname in self._heart_beat_table:
108-
if self._heart_beat_table[hostname].fqdn != u_fqdn:
106+
if hostname in self._slave_info_table:
107+
if self._slave_info_table[hostname].fqdn != u_fqdn:
109108
self.logger.error(
110109
"Duplicated hostname {} detected. Existing fqdn: {}, new fqdn {}. Ignore new heartbeat entry.".format(
111-
hostname, self._heart_beat_table[hostname].fqdn, u_fqdn))
110+
hostname, self._slave_info_table[hostname].fqdn, u_fqdn))
112111
return
113-
elif self._heart_beat_table[hostname].state != HpcState.Closed:
112+
elif self._slave_info_table[hostname].state != HpcState.Closed:
114113
self.logger.warn("Heart beat entry of {} existed. old value: {}.".format(
115-
hostname, str(self._heart_beat_table[hostname])))
114+
hostname, str(self._slave_info_table[hostname])))
116115
slaveinfo = SlaveInfo(hostname, u_fqdn, agent_id, task_id, cpus, last_heartbeat, HpcState.Provisioning)
117-
self._heart_beat_table[hostname] = slaveinfo
116+
self._slave_info_table[hostname] = slaveinfo
118117
self.logger.info("Heart beat entry added: {}".format(str(slaveinfo)))
119118

120-
def on_slave_heartbeat(self, hostname, now=None):
119+
def update_slaves_last_seen(self, hostname_arr, now=None):
120+
# type:(Iterable[str], datetime) -> ()
121+
if now is None:
122+
now = datetime.now(pytz.utc)
123+
for hostname in hostname_arr:
124+
self.update_slave_last_seen(hostname, now)
125+
126+
def update_slave_last_seen(self, hostname, now=None):
121127
# type: (str, datetime) -> ()
122128
if now is None:
123129
now = datetime.now(pytz.utc)
124130
u_hostname = hostname.upper()
125-
if u_hostname in self._heart_beat_table:
126-
self._heart_beat_table[u_hostname] = self._heart_beat_table[u_hostname]._replace(last_heartbeat=now)
127-
self.logger.info("Heatbeat from host {}".format(u_hostname))
128-
if self._heart_beat_table[u_hostname].state == HpcState.Provisioning:
129-
with self._table_lock:
130-
if self._heart_beat_table[u_hostname].state == HpcState.Provisioning:
131-
self._set_nodes_configuring([u_hostname])
131+
if u_hostname in self._slave_info_table:
132+
self._slave_info_table[u_hostname] = self._slave_info_table[u_hostname]._replace(last_seen=now)
133+
self.logger.info("Slave seen: {}".format(u_hostname))
132134
else:
133-
self.logger.error("Host {} is not recognized. Heartbeat ignored.".format(u_hostname))
134-
self.logger.debug("_table {} ".format(self._heart_beat_table))
135+
self.logger.error("Host {} is not recognized. No entry will be updated.".format(u_hostname))
136+
self.logger.debug("_table {} ".format(self._slave_info_table))
135137

136138
def get_task_info(self, hostname):
137139
# type: (str) -> (str, str)
138140
u_hostname = hostname.upper()
139-
if u_hostname in self._heart_beat_table:
140-
entry = self._heart_beat_table[u_hostname]
141+
if u_hostname in self._slave_info_table:
142+
entry = self._slave_info_table[u_hostname]
141143
return entry.task_id, entry.agent_id
142144
else:
143145
self.logger.error("Host {} is not recognized. Failed to get task info.".format(u_hostname))
@@ -146,8 +148,8 @@ def get_task_info(self, hostname):
146148
def get_host_state(self, hostname):
147149
# type: (str) -> int
148150
u_hostname = hostname.upper()
149-
if u_hostname in self._heart_beat_table:
150-
entry = self._heart_beat_table[u_hostname]
151+
if u_hostname in self._slave_info_table:
152+
entry = self._slave_info_table[u_hostname]
151153
return entry.state
152154
else:
153155
self.logger.error("Host {} is not recognized. Failed to get host state.".format(u_hostname))
@@ -165,52 +167,48 @@ def check_fqdn_collision(self, fqdn):
165167
# type: (str) -> bool
166168
u_fqdn = fqdn.upper()
167169
hostname = _get_hostname_from_fqdn(u_fqdn)
168-
if hostname in self._heart_beat_table:
169-
if self._heart_beat_table[hostname].fqdn != u_fqdn:
170+
if hostname in self._slave_info_table:
171+
if self._slave_info_table[hostname].fqdn != u_fqdn:
170172
return True
171173
return False
172174

173175
def _check_timeout(self, now=None):
174-
# type: (datetime) -> ([SlaveInfo], [SlaveInfo], [SlaveInfo])
176+
# type: (datetime) -> ([SlaveInfo], [SlaveInfo])
175177
# TODO: Check configuring timeout
176178
if now is None:
177179
now = datetime.now(pytz.utc)
178180
provision_timeout_list = []
179-
heartbeat_timeout_list = []
180181
running_list = []
181-
for host in dict(self._heart_beat_table).itervalues():
182-
if host.state == HpcState.Provisioning and now - host.last_heartbeat >= self._provisioning_timeout:
182+
for host in dict(self._slave_info_table).itervalues():
183+
if host.state == HpcState.Provisioning and now - host.last_seen >= self._provisioning_timeout:
183184
self.logger.warn("Provisioning timeout: {}".format(str(host)))
184185
provision_timeout_list.append(host)
185186
elif host.state == HpcState.Running:
186-
if now - host.last_heartbeat >= self._heartbeat_timeout:
187-
self.logger.warn("Heartbeat lost: {}".format(str(host)))
188-
heartbeat_timeout_list.append(host)
189-
else:
190-
running_list.append(host)
191-
return provision_timeout_list, heartbeat_timeout_list, running_list
187+
running_list.append(host)
188+
189+
return provision_timeout_list, running_list
192190

193191
def get_cores_in_provisioning(self):
194192
cores = 0.0
195-
for host in dict(self._heart_beat_table).itervalues():
196-
if host.state == HpcState.Provisioning or host.state == HpcState.Configuring:
193+
for host in dict(self._slave_info_table).itervalues():
194+
if host.state == HpcState.Provisioning:
197195
cores += host.cpus
198196
self.logger.info("Cores in provisioning: {}".format(cores))
199197
return cores
200198

201199
def _get_nodes_name_in_state(self, state):
202200
# type: (HpcState) -> [str]
203-
return [host.hostname for host in dict(self._heart_beat_table).itervalues() if host.state == state]
201+
return [host.hostname for host in dict(self._slave_info_table).itervalues() if host.state == state]
204202

205203
# TODO: make state_machine methods more testable
206-
def _configure_compute_nodes_state_machine(self):
204+
def _provision_compute_nodes_state_machine(self):
207205
# type: () -> ()
208-
configuring_node_names = self._get_nodes_name_in_state(HpcState.Configuring)
206+
provisioning_node_names = self._get_nodes_name_in_state(HpcState.Provisioning)
209207

210-
if not configuring_node_names:
208+
if not provisioning_node_names:
211209
return
212210

213-
self.logger.info("Nodes in configuring: {}".format(configuring_node_names))
211+
self.logger.info("Nodes in provisioning: {}".format(provisioning_node_names))
214212
groups = self._hpc_client.list_node_groups(self.MESOS_NODE_GROUP_NAME)
215213
if self.MESOS_NODE_GROUP_NAME not in groups:
216214
self._hpc_client.add_node_group(self.MESOS_NODE_GROUP_NAME, self.MESOS_NODE_GROUP_DESCRIPTION)
@@ -225,13 +223,13 @@ def _configure_compute_nodes_state_machine(self):
225223
return
226224

227225
# state check
228-
node_status_list = self._hpc_client.get_node_status_exact(configuring_node_names)
226+
node_status_list = self._hpc_client.get_node_status_exact(provisioning_node_names)
229227
self.logger.info("Get node_status_list:{}".format(str(node_status_list)))
230228
unapproved_node_list = []
231229
take_offline_node_list = []
232230
bring_online_node_list = []
233231
change_node_group_node_list = []
234-
configured_node_names = []
232+
provisioned_node_names = []
235233
invalid_state_node_dict = {}
236234
for node_status in node_status_list:
237235
node_name = _get_node_name_from_status(node_status)
@@ -252,14 +250,14 @@ def _configure_compute_nodes_state_machine(self):
252250
bring_online_node_list.append(node_name)
253251
elif _check_node_state_online(node_status):
254252
# this node is all set
255-
configured_node_names.append(node_name)
253+
provisioned_node_names.append(node_name)
256254
else:
257255
invalid_state_node_dict[node_name] = node_state
258256

259-
missing_nodes = _find_missing_nodes(configuring_node_names, (_get_node_names_from_status(node_status_list)))
257+
missing_nodes = _find_missing_nodes(provisioning_node_names, (_get_node_names_from_status(node_status_list)))
260258
try:
261259
if invalid_state_node_dict:
262-
self.logger.info("Node(s) in invalid state when configuring: {}".format(str(invalid_state_node_dict)))
260+
self.logger.info("Node(s) in invalid state when provisioning: {}".format(str(invalid_state_node_dict)))
263261
if unapproved_node_list:
264262
self.logger.info("Assigning node template for node(s): {}".format(str(unapproved_node_list)))
265263
self._hpc_client.assign_default_compute_node_template(unapproved_node_list)
@@ -275,26 +273,23 @@ def _configure_compute_nodes_state_machine(self):
275273
if self._node_group_specified():
276274
self._hpc_client.add_node_to_node_group(self._node_group, change_node_group_node_list)
277275
except:
278-
# Swallow all exceptions here. As we don't want any exception to prevent configured nodes to work
276+
# Swallow all exceptions here. As we don't want any exception to prevent provisioned nodes to work
279277
self.logger.exception('Exception happened when configuring compute node.')
280278

281279
# state change
282-
if configured_node_names:
283-
self.logger.info("Nodes configured: {}".format(configured_node_names))
284-
self._set_nodes_running(configured_node_names)
280+
if provisioned_node_names:
281+
self.logger.info("Nodes provisioned: {}".format(provisioned_node_names))
282+
self._set_nodes_running(provisioned_node_names)
285283
if missing_nodes:
286-
# Missing is valid state of nodes in configuring.
287-
self.logger.info("Nodes missing when configuring: {}".format(missing_nodes))
284+
# Missing is valid state of nodes in provisioning.
285+
self.logger.info("Nodes missing when provisioning: {}".format(missing_nodes))
288286

289287
def _check_runaway_and_idle_compute_nodes(self):
290288
# type: () -> ()
291-
(provision_timeout_list, heartbeat_timeout_list, running_list) = self._check_timeout()
289+
(provision_timeout_list, running_list) = self._check_timeout()
292290
if provision_timeout_list:
293291
self.logger.info("Get provision_timeout_list:{}".format(str(provision_timeout_list)))
294292
self._set_nodes_draining(host.hostname for host in provision_timeout_list)
295-
if heartbeat_timeout_list:
296-
self.logger.info("Get heartbeat_timeout_list:{}".format(str(heartbeat_timeout_list)))
297-
self._set_nodes_draining(host.hostname for host in heartbeat_timeout_list)
298293
if running_list:
299294
running_node_names = [host.hostname for host in running_list]
300295
node_status_list = self._hpc_client.get_node_status_exact(running_node_names)
@@ -343,7 +338,7 @@ def _check_node_idle_timeout(self, node_names, now=None):
343338

344339
def _start_configure_cluster_timer(self):
345340
# type: () -> ()
346-
self._configure_compute_nodes_state_machine()
341+
self._provision_compute_nodes_state_machine()
347342
self._check_runaway_and_idle_compute_nodes()
348343
self._drain_and_stop_nodes()
349344
timer = threading.Timer(self.CHECK_CONFIGURING_NODES_INTERVAL, self._start_configure_cluster_timer)
@@ -356,7 +351,7 @@ def start(self):
356351
def _check_deploy_failure(self, set_nodes):
357352
# type: (List[Tuple[str, int]]) -> ()
358353
for node, old_state in set_nodes:
359-
if old_state == HpcState.Provisioning or old_state == HpcState.Configuring:
354+
if old_state == HpcState.Provisioning:
360355
self.logger.error(
361356
"Node {} failed to deploy. Previous state: {}".format(node, HpcState.Names[old_state]))
362357

@@ -383,19 +378,15 @@ def _set_nodes_running(self, node_names):
383378
# type: (Iterable[str]) -> ()
384379
self._set_node_state(node_names, HpcState.Running, "Running")
385380

386-
def _set_nodes_configuring(self, node_names):
387-
# type: (Iterable[str]) -> ()
388-
self._set_node_state(node_names, HpcState.Configuring, "Configuring")
389-
390381
def _set_node_state(self, node_names, node_state, state_name):
391382
# type: (Iterable[str], int, str) -> [(str, int)]
392383
set_nodes = []
393384
for node_name in node_names:
394385
u_hostname = node_name.upper()
395-
if u_hostname in self._heart_beat_table:
396-
if self._heart_beat_table[u_hostname].state != node_state:
397-
old_state = self._heart_beat_table[u_hostname].state
398-
self._heart_beat_table[u_hostname] = self._heart_beat_table[u_hostname]._replace(state=node_state)
386+
if u_hostname in self._slave_info_table:
387+
if self._slave_info_table[u_hostname].state != node_state:
388+
old_state = self._slave_info_table[u_hostname].state
389+
self._slave_info_table[u_hostname] = self._slave_info_table[u_hostname]._replace(state=node_state)
399390
set_nodes.append((u_hostname, old_state))
400391
self.logger.info("Host {} set to {} from {}.".format(
401392
u_hostname, state_name, HpcState.Names[old_state]))
@@ -505,4 +496,4 @@ def _check_node_in_specified_group(self, node_status):
505496

506497
SlaveInfo = NamedTuple("SlaveInfo",
507498
[("hostname", str), ("fqdn", str), ("agent_id", str), ("task_id", str), ("cpus", float),
508-
("last_heartbeat", datetime), ("state", int)])
499+
("last_seen", datetime), ("state", int)])

0 commit comments

Comments
 (0)
Please sign in to comment.