Skip to content

Commit 37233cb

Browse files
committed
Init commit
0 parents  commit 37233cb

File tree

1 file changed

+138
-0
lines changed

1 file changed

+138
-0
lines changed

hpcframework.py

+138
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
import json
2+
import datetime
3+
import time
4+
import os
5+
import sys
6+
import threading
7+
import logging
8+
import signal
9+
import sys
10+
import uuid
11+
12+
from mesoshttp.client import MesosClient
13+
14+
class Test(object):
15+
16+
17+
class MesosFramework(threading.Thread):
18+
19+
def __init__(self, client):
20+
threading.Thread.__init__(self)
21+
self.client = client
22+
self.stop = False
23+
24+
def run(self):
25+
try:
26+
self.client.register()
27+
except KeyboardInterrupt:
28+
print('Stop requested by user, stopping framework....')
29+
30+
31+
def __init__(self):
32+
logging.basicConfig()
33+
self.logger = logging.getLogger(__name__)
34+
#signal.signal(signal.SIGINT, signal.SIG_IGN)
35+
logging.getLogger('mesoshttp').setLevel(logging.DEBUG)
36+
self.driver = None
37+
self.client = MesosClient(mesos_urls=['http://172.16.1.4:5050'])
38+
#self.client = MesosClient(mesos_urls=['zk://127.0.0.1:2181/mesos'])
39+
self.client.on(MesosClient.SUBSCRIBED, self.subscribed)
40+
self.client.on(MesosClient.OFFERS, self.offer_received)
41+
self.client.on(MesosClient.UPDATE, self.status_update)
42+
self.th = Test.MesosFramework(self.client)
43+
self.th.start()
44+
while True and self.th.isAlive():
45+
try:
46+
self.th.join(1)
47+
except KeyboardInterrupt:
48+
self.shutdown()
49+
break
50+
51+
52+
def shutdown(self):
53+
print('Stop requested by user, stopping framework....')
54+
self.logger.warn('Stop requested by user, stopping framework....')
55+
self.driver.tearDown()
56+
self.client.stop = True
57+
self.stop = True
58+
59+
60+
def subscribed(self, driver):
61+
self.logger.warn('SUBSCRIBED')
62+
self.driver = driver
63+
64+
def status_update(self, update):
65+
# if update['status']['state'] == 'TASK_RUNNING':
66+
# self.driver.kill(update['status']['agent_id']['value'], update['status']['task_id']['value'])
67+
pass
68+
69+
def offer_received(self, offers):
70+
# self.logger.warn('OFFER: %s' % (str(offers)))
71+
72+
for offer in offers:
73+
print(str(offer.get_offer()))
74+
mesos_offer = offer.get_offer()
75+
if 'attributes' in mesos_offer:
76+
attributes = mesos_offer['attributes']
77+
if self.get_text(attributes, 'os') != 'windows_server':
78+
offer.decline()
79+
else:
80+
cores = self.get_scalar(attributes, 'cores')
81+
cpus = self.get_scalar(mesos_offer['resources'], 'cpus')
82+
83+
if cores == cpus:
84+
self.accept_offer(offer)
85+
else:
86+
offer.decline()
87+
else:
88+
offer.decline()
89+
90+
91+
def accept_offer(self, offer):
92+
print("Offer %s meets hpc's requiremnt" % offer.get_offer()['id']['value'])
93+
self.run_job(offer)
94+
95+
# i = 0
96+
# for offer in offers:
97+
# if i == 0:
98+
# self.run_job(offer)
99+
# else:
100+
# offer.decline()
101+
# i+=1
102+
def get_scalar(self, dict, name):
103+
for i in dict:
104+
if i['name'] == name:
105+
return i['scalar']['value']
106+
return 0.0
107+
108+
def get_text(self, dict, name):
109+
for i in dict:
110+
if i['name'] == name:
111+
return i['text']['value']
112+
return ""
113+
114+
def run_job(self, mesos_offer):
115+
offer = mesos_offer.get_offer()
116+
print(str(offer))
117+
task = {
118+
'name': 'sample test',
119+
'task_id': {'value': uuid.uuid4().hex},
120+
'agent_id': {'value': offer['agent_id']['value']},
121+
'resources': [
122+
{
123+
'name': 'cpus',
124+
'type': 'SCALAR',
125+
'scalar': {'value': 3.9}
126+
},
127+
{
128+
'name': 'mem',
129+
'type': 'SCALAR',
130+
'scalar': {'value': 1000}
131+
}
132+
],
133+
'command': {'value': 'powershell c:\\HPCPack2016\\5.1.6086.0\\setupscript.ps1'}
134+
}
135+
136+
mesos_offer.accept([task])
137+
138+
test_mesos = Test()

0 commit comments

Comments
 (0)