|
2 | 2 | import codecs
|
3 | 3 | import logging
|
4 | 4 | import threading
|
| 5 | +import time |
5 | 6 | import uuid
|
6 | 7 |
|
7 | 8 | from mesoshttp.client import MesosClient
|
8 | 9 | from mesoshttp.offers import Offer
|
| 10 | +from typing import List |
9 | 11 |
|
10 | 12 | import hpc_cluster_manager
|
11 | 13 | import logging_aux
|
@@ -101,6 +103,8 @@ def status_update(self, update):
|
101 | 103 | self.logger.info("Update received:\n{}".format(str(update)))
|
102 | 104 |
|
103 | 105 | def offer_received(self, offers):
|
| 106 | + handled_offer = [] # type: List[Offer] |
| 107 | + |
104 | 108 | try:
|
105 | 109 | # self.logger.info('OFFER: %s' % (str(offers)))
|
106 | 110 | if self.node_group == "":
|
@@ -142,13 +146,31 @@ def offer_received(self, offers):
|
142 | 146 | if take_offer:
|
143 | 147 | cores_to_grow -= cpus
|
144 | 148 | self.accept_offer(offer)
|
| 149 | + handled_offer.append(offer) |
145 | 150 | else:
|
146 | 151 | self.decline_offer(offer)
|
| 152 | + handled_offer.append(offer) |
147 | 153 |
|
148 | 154 | except (KeyboardInterrupt, SystemExit):
|
149 | 155 | raise
|
150 | 156 | except Exception as ex:
|
151 | 157 | self.logger.exception(ex)
|
| 158 | + finally: |
| 159 | + # We have to either accept or decline an offer |
| 160 | + # TODO: Retry in a separate thread |
| 161 | + while True: |
| 162 | + try: |
| 163 | + if not offers: |
| 164 | + return |
| 165 | + else: |
| 166 | + to_decline = [offer for offer in offers if offer not in handled_offer] |
| 167 | + for offer in to_decline: |
| 168 | + self.decline_offer(offer) |
| 169 | + handled_offer.append(offer) |
| 170 | + return |
| 171 | + except Exception as ex: |
| 172 | + self.logger.exception(ex) |
| 173 | + time.sleep(10) |
152 | 174 |
|
153 | 175 | def decline_offer(self, offer):
|
154 | 176 | self.logger.info("Decline offer %s" % offer.get_offer()['id']['value'])
|
|
0 commit comments