1
1
import threading
2
2
from datetime import datetime , timedelta
3
3
4
+ import pytz
4
5
from typing import Iterable , Callable , NamedTuple , Set , Dict , List , Tuple
5
6
6
7
import logging_aux
@@ -25,7 +26,7 @@ def _check_node_health_unapproved(node_status):
25
26
26
27
def _find_missing_nodes (rq_nodes , res_nodes ):
27
28
# type: (List[str], List[str]) -> List[str]
28
- return [name for name in rq_nodes if name not in res_nodes ]
29
+ return [name for name in _upper_strings ( rq_nodes ) if name not in _upper_strings ( res_nodes ) ]
29
30
30
31
31
32
def _check_node_state (node_status , target_state ):
@@ -97,7 +98,7 @@ def subscribe_node_closed_callback(self, callback):
97
98
# type: (Callable[[list[str]], ()]) -> ()
98
99
self ._node_closed_callbacks .append (callback )
99
100
100
- def add_slaveinfo (self , fqdn , agent_id , task_id , cpus , last_heartbeat = datetime .utcnow ( )):
101
+ def add_slaveinfo (self , fqdn , agent_id , task_id , cpus , last_heartbeat = datetime .now ( pytz . utc )):
101
102
# type: (str, str, str, float, datetime) -> ()
102
103
u_fqdn = fqdn .upper ()
103
104
hostname = _get_hostname_from_fqdn (u_fqdn )
@@ -114,7 +115,7 @@ def add_slaveinfo(self, fqdn, agent_id, task_id, cpus, last_heartbeat=datetime.u
114
115
self ._heart_beat_table [hostname ] = slaveinfo
115
116
self .logger .info ("Heart beat entry added: {}" .format (str (slaveinfo )))
116
117
117
- def on_slave_heartbeat (self , hostname , now = datetime .utcnow ( )):
118
+ def on_slave_heartbeat (self , hostname , now = datetime .now ( pytz . utc )):
118
119
# type: (str, datetime) -> ()
119
120
u_hostname = hostname .upper ()
120
121
if u_hostname in self ._heart_beat_table :
@@ -123,7 +124,7 @@ def on_slave_heartbeat(self, hostname, now=datetime.utcnow()):
123
124
if self ._heart_beat_table [u_hostname ].state == HpcState .Provisioning :
124
125
with self ._table_lock :
125
126
if self ._heart_beat_table [u_hostname ].state == HpcState .Provisioning :
126
- self ._set_nodes_configuring (u_hostname )
127
+ self ._set_nodes_configuring ([ u_hostname ] )
127
128
else :
128
129
self .logger .error ("Host {} is not recognized. Heartbeat ignored." .format (u_hostname ))
129
130
self .logger .debug ("_table {} " .format (self ._heart_beat_table ))
@@ -165,7 +166,7 @@ def check_fqdn_collision(self, fqdn):
165
166
return True
166
167
return False
167
168
168
- def _check_timeout (self , now = datetime .utcnow ( )):
169
+ def _check_timeout (self , now = datetime .now ( pytz . utc )):
169
170
# type: (datetime) -> ([SlaveInfo], [SlaveInfo], [SlaveInfo])
170
171
# TODO: Check configuring timeout
171
172
provision_timeout_list = []
@@ -276,8 +277,8 @@ def _configure_compute_nodes_state_machine(self):
276
277
self .logger .info ("Nodes configured: {}" .format (configured_node_names ))
277
278
self ._set_nodes_running (configured_node_names )
278
279
if missing_nodes :
280
+ # Missing is valid state of nodes in configuring.
279
281
self .logger .info ("Nodes missing when configuring: {}" .format (missing_nodes ))
280
- self ._set_nodes_closed (missing_nodes )
281
282
282
283
def _check_runaway_and_idle_compute_nodes (self ):
283
284
# type: () -> ()
@@ -312,7 +313,7 @@ def _check_runaway_and_idle_compute_nodes(self):
312
313
self .logger .info ("Get idle_timeout_nodes:{}" .format (str (idle_timeout_nodes )))
313
314
self ._set_nodes_draining (idle_timeout_nodes )
314
315
315
- def _check_node_idle_timeout (self , node_names , now = datetime .utcnow ( )):
316
+ def _check_node_idle_timeout (self , node_names , now = datetime .now ( pytz . utc )):
316
317
# type: (Iterable[str], datetime) -> [str]
317
318
new_node_idle_check_table = {}
318
319
for u_node_name in _upper_strings (node_names ):
@@ -325,6 +326,7 @@ def _check_node_idle_timeout(self, node_names, now=datetime.utcnow()):
325
326
else :
326
327
new_node_idle_check_table [u_node_name ] = now
327
328
self ._node_idle_check_table = new_node_idle_check_table
329
+ self .logger .info ("_check_node_idle_timeout: now - " + str (now ))
328
330
self .logger .info ("_check_node_idle_timeout: " + str (self ._node_idle_check_table ))
329
331
return [name for name , value in self ._node_idle_check_table .iteritems () if
330
332
(now - value ) >= self ._node_idle_timedelta ]
0 commit comments