8
8
import signal
9
9
import sys
10
10
import uuid
11
+ import base64
12
+ import codecs
11
13
import restserver
12
14
import restclient
13
15
import logging_aux
14
16
15
17
from mesoshttp .client import MesosClient
18
+ from restclient import AutoScaleRestClient
16
19
20
+ from mesoshttp .offers import Offer
17
21
18
22
class Test (object ):
19
23
class MesosFramework (threading .Thread ):
@@ -34,13 +38,24 @@ def __init__(self):
34
38
"hpcframework" , "hpcframework.log" )
35
39
# signal.signal(signal.SIGINT, signal.SIG_IGN)
36
40
logging .getLogger ('mesoshttp' ).setLevel (logging .DEBUG )
41
+
42
+ self .hpc_client = AutoScaleRestClient ()
43
+ self .core_provisioning = 0.0
44
+ with open ("setupscript.ps1" ) as scriptfile :
45
+ hpc_setup_ps1 = scriptfile .read ()
46
+ self .logger .info ("Loaded HPC setup script:/n{}" .format (hpc_setup_ps1 ))
47
+ hpc_setup_ps1_utf16 = hpc_setup_ps1 .encode ('utf-16' )
48
+ hpc_setup_ps1_utf16_nobom = hpc_setup_ps1_utf16 [2 :] if hpc_setup_ps1_utf16 [0 :2 ] == codecs .BOM_UTF16 else hpc_setup_ps1_utf16
49
+ self .hpc_setup_ps1_b64 = base64 .b64encode (hpc_setup_ps1_utf16_nobom )
50
+
51
+
37
52
self .driver = None # type: MesosClient.SchedulerDriver
38
- self .client = MesosClient (mesos_urls = ['http://172.16.1.4:5050' ])
53
+ self .mesos_client = MesosClient (mesos_urls = ['http://172.16.1.4:5050' ])
39
54
# self.client = MesosClient(mesos_urls=['zk://127.0.0.1:2181/mesos'])
40
- self .client .on (MesosClient .SUBSCRIBED , self .subscribed )
41
- self .client .on (MesosClient .OFFERS , self .offer_received )
42
- self .client .on (MesosClient .UPDATE , self .status_update )
43
- self .th = Test .MesosFramework (self .client )
55
+ self .mesos_client .on (MesosClient .SUBSCRIBED , self .subscribed )
56
+ self .mesos_client .on (MesosClient .OFFERS , self .offer_received )
57
+ self .mesos_client .on (MesosClient .UPDATE , self .status_update )
58
+ self .th = Test .MesosFramework (self .mesos_client )
44
59
self .th .start ()
45
60
while True and self .th .isAlive ():
46
61
try :
@@ -53,7 +68,7 @@ def shutdown(self):
53
68
print ('Stop requested by user, stopping framework....' )
54
69
self .logger .warn ('Stop requested by user, stopping framework....' )
55
70
self .driver .tearDown ()
56
- self .client .stop = True
71
+ self .mesos_client .stop = True
57
72
self .stop = True
58
73
59
74
def subscribed (self , driver ):
@@ -67,25 +82,30 @@ def status_update(self, update):
67
82
68
83
def offer_received (self , offers ):
69
84
# self.logger.info('OFFER: %s' % (str(offers)))
85
+ grow_decision = self .hpc_client .get_grow_decision ()
86
+
87
+ if (grow_decision .cores_to_grow - self .core_provisioning > 0 ):
88
+ for offer in offers : # type: Offer
89
+ self .logger .info ("offer_received: {}" .format (
90
+ (str (offer .get_offer ()))))
91
+ mesos_offer = offer .get_offer ()
92
+ if 'attributes' in mesos_offer :
93
+ attributes = mesos_offer ['attributes' ]
94
+ if self .get_text (attributes , 'os' ) != 'windows_server' :
95
+ offer .decline ()
96
+ else :
97
+ cores = self .get_scalar (attributes , 'cores' )
98
+ cpus = self .get_scalar (mesos_offer ['resources' ], 'cpus' )
70
99
71
- for offer in offers :
72
- self .logger .info ("offer_received: {}" .format (
73
- (str (offer .get_offer ()))))
74
- mesos_offer = offer .get_offer ()
75
- if 'attributes' in mesos_offer :
76
- attributes = mesos_offer ['attributes' ]
77
- if self .get_text (attributes , 'os' ) != 'windows_server' :
78
- offer .decline ()
100
+ if cores == cpus :
101
+ self .accept_offer (offer )
102
+ else :
103
+ offer .decline ()
79
104
else :
80
- cores = self .get_scalar (attributes , 'cores' )
81
- cpus = self .get_scalar (mesos_offer ['resources' ], 'cpus' )
82
-
83
- if cores == cpus :
84
- self .accept_offer (offer )
85
- else :
86
- offer .decline ()
87
- else :
88
- offer .decline ()
105
+ offer .decline ()
106
+ else :
107
+ for offer in offers :
108
+ offer .decline ()
89
109
90
110
def accept_offer (self , offer ):
91
111
self .logger .info ("Offer %s meets hpc's requiremnt" %
@@ -130,9 +150,9 @@ def run_job(self, mesos_offer):
130
150
'scalar' : {'value' : self .get_scalar (offer ['resources' ], 'mem' )}
131
151
}
132
152
],
133
- 'command' : {'value' : 'powershell sleep 3000' }
153
+ 'command' : {'value' : 'powershell -EncodedCommand ' + self . hpc_setup_ps1_b64 }
134
154
}
135
-
155
+ self . logger . debug ( "Sending command:/n{}" . format ( task [ 'command' ][ 'value' ]))
136
156
mesos_offer .accept ([task ])
137
157
138
158
0 commit comments