Skip to content

Commit 7a29df7

Browse files
Fix graceful shutdown on linux (#58)
1 parent 0511776 commit 7a29df7

File tree

5 files changed

+127
-119
lines changed

5 files changed

+127
-119
lines changed

ton-http-api/pyTON/manager.py

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,12 @@
22
import time
33
import traceback
44
import random
5-
import aioprocessing as ap
5+
import queue
66

77
from collections import defaultdict
88
from collections.abc import Mapping
99
from copy import deepcopy
10+
from concurrent.futures import ThreadPoolExecutor
1011

1112
from pyTON.worker import TonlibWorker
1213
from pyTON.models import TonlibWorkerMsgType, TonlibClientResult, ConsensusBlock
@@ -38,6 +39,8 @@ def __init__(self,
3839
# cache setup
3940
self.setup_cache()
4041

42+
self.threadpool_executor = ThreadPoolExecutor(max_workers=len(self.tonlib_settings.liteserver_config['liteservers']) * 2)
43+
4144
# workers spawn
4245
self.loop = loop or asyncio.get_running_loop()
4346
for ls_index in range(len(self.tonlib_settings.liteserver_config['liteservers'])):
@@ -60,6 +63,8 @@ async def shutdown(self):
6063
for i in self.workers:
6164
await self.worker_control(i, enabled=False)
6265

66+
self.threadpool_executor.shutdown()
67+
6368
def setup_cache(self):
6469
self.raw_get_transactions = self.cache_manager.cached(expire=5)(self.raw_get_transactions)
6570
self.get_transactions = self.cache_manager.cached(expire=15, check_error=False)(self.get_transactions)
@@ -84,14 +89,12 @@ def spawn_worker(self, ls_index, force_restart=False):
8489
return
8590
try:
8691
worker_info['reader'].cancel()
92+
worker_info['worker'].exit_event.set()
93+
worker_info['worker'].output_queue.cancel_join_thread()
94+
worker_info['worker'].input_queue.cancel_join_thread()
95+
worker_info['worker'].output_queue.close()
96+
worker_info['worker'].input_queue.close()
8797
worker_info['worker'].join(timeout=3)
88-
if worker_info['worker'].is_alive():
89-
worker_info['worker'].terminate()
90-
worker_info['worker'].join()
91-
self.workers[ls_index]['worker'].output_queue.close()
92-
self.workers[ls_index]['worker'].output_queue.join_thread()
93-
self.workers[ls_index]['worker'].input_queue.close()
94-
self.workers[ls_index]['worker'].input_queue.join_thread()
9598
except Exception as ee:
9699
logger.error('Failed to delete existing process: {exc}', exc=ee)
97100
# running new worker
@@ -112,15 +115,17 @@ def spawn_worker(self, ls_index, force_restart=False):
112115

113116
async def worker_control(self, ls_index, enabled):
114117
if enabled == False:
115-
self.workers[ls_index]['worker'].terminate()
116-
self.workers[ls_index]['worker'].join()
117118
self.workers[ls_index]['reader'].cancel()
118-
await self.workers[ls_index]['reader']
119+
self.workers[ls_index]['worker'].exit_event.set()
119120

121+
self.workers[ls_index]['worker'].output_queue.cancel_join_thread()
122+
self.workers[ls_index]['worker'].input_queue.cancel_join_thread()
120123
self.workers[ls_index]['worker'].output_queue.close()
121-
self.workers[ls_index]['worker'].output_queue.join_thread()
122124
self.workers[ls_index]['worker'].input_queue.close()
123-
self.workers[ls_index]['worker'].input_queue.join_thread()
125+
126+
self.workers[ls_index]['worker'].join()
127+
128+
await self.workers[ls_index]['reader']
124129

125130
self.workers[ls_index]['is_enabled'] = enabled
126131

@@ -152,7 +157,10 @@ async def read_results(self, ls_index):
152157
worker = self.workers[ls_index]['worker']
153158
while True:
154159
try:
155-
msg_type, msg_content = await worker.output_queue.coro_get()
160+
try:
161+
msg_type, msg_content = await self.loop.run_in_executor(self.threadpool_executor, worker.output_queue.get, True, 1)
162+
except queue.Empty:
163+
continue
156164
if msg_type == TonlibWorkerMsgType.TASK_RESULT:
157165
task_id = msg_content.task_id
158166
result = msg_content.result
@@ -173,9 +181,6 @@ async def read_results(self, ls_index):
173181

174182
if msg_type == TonlibWorkerMsgType.ARCHIVAL_UPDATE:
175183
worker.is_archival = msg_content
176-
177-
if msg_type == TonlibWorkerMsgType.DEAD_REPORT:
178-
self.spawn_worker(ls_index, force_restart=True)
179184
except asyncio.CancelledError:
180185
logger.info("Task read_results was cancelled")
181186
return
@@ -269,7 +274,7 @@ async def dispatch_request_to_worker(self, method, ls_index, *args, **kwargs):
269274

270275
logger.info("Sending request method: {method}, task_id: {task_id}, ls_index: {ls_index}",
271276
method=method, task_id=task_id, ls_index=ls_index)
272-
await self.workers[ls_index]['worker'].input_queue.coro_put((task_id, timeout, method, args, kwargs))
277+
await self.loop.run_in_executor(self.threadpool_executor, self.workers[ls_index]['worker'].input_queue.put, (task_id, timeout, method, args, kwargs))
273278

274279
try:
275280
self.futures[task_id] = self.loop.create_future()
@@ -332,7 +337,7 @@ async def raw_send_message(self, serialized_boc):
332337
for ls_index in ls_index_list:
333338
task_id = "{}:{}".format(time.time(), random.random())
334339
timeout = time.time() + self.tonlib_settings.request_timeout
335-
await self.workers[ls_index]['worker'].input_queue.coro_put((task_id, timeout, 'raw_send_message', [serialized_boc], {}))
340+
await self.loop.run_in_executor(self.threadpool_executor, self.workers[ls_index]['worker'].input_queue.put, (task_id, timeout, 'raw_send_message', [serialized_boc], {}))
336341

337342
self.futures[task_id] = self.loop.create_future()
338343
task_ids.append(task_id)

ton-http-api/pyTON/models.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,9 @@ class TonlibClientResult:
1717

1818

1919
class TonlibWorkerMsgType(Enum):
20-
DEAD_REPORT = 0
21-
TASK_RESULT = 1
22-
LAST_BLOCK_UPDATE = 2
23-
ARCHIVAL_UPDATE = 3
24-
20+
TASK_RESULT = 0
21+
LAST_BLOCK_UPDATE = 1
22+
ARCHIVAL_UPDATE = 2
2523

2624

2725
@dataclass

ton-http-api/pyTON/worker.py

Lines changed: 100 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,14 @@
22
import traceback
33
import random
44
import time
5-
import aioprocessing as ap
5+
import queue
66
import multiprocessing as mp
77

88
from pyTON.settings import TonlibSettings
99
from pyTON.models import TonlibWorkerMsgType, TonlibClientResult
1010
from pytonlib import TonlibClient
1111
from datetime import datetime
12+
from concurrent.futures import ThreadPoolExecutor
1213

1314
from enum import Enum
1415
from dataclasses import dataclass
@@ -21,14 +22,13 @@ class TonlibWorker(mp.Process):
2122
def __init__(self,
2223
ls_index: int,
2324
tonlib_settings: TonlibSettings,
24-
input_queue: Optional[ap.AioQueue]=None,
25-
output_queue: Optional[ap.AioQueue]=None):
25+
input_queue: Optional[mp.Queue]=None,
26+
output_queue: Optional[mp.Queue]=None):
2627
super(TonlibWorker, self).__init__(daemon=True)
2728

28-
self.input_queue = input_queue or ap.AioQueue()
29-
self.output_queue = output_queue or ap.AioQueue()
30-
self.input_queue.cancel_join_thread()
31-
self.output_queue.cancel_join_thread()
29+
self.input_queue = input_queue or mp.Queue()
30+
self.output_queue = output_queue or mp.Queue()
31+
self.exit_event = mp.Event()
3232

3333
self.ls_index = ls_index
3434
self.tonlib_settings = tonlib_settings
@@ -39,11 +39,14 @@ def __init__(self,
3939
self.loop = None
4040
self.tasks = {}
4141
self.tonlib = None
42+
self.threadpool_executor = None
4243

4344
self.timeout_count = 0
4445
self.is_dead = False
4546

4647
def run(self):
48+
self.threadpool_executor = ThreadPoolExecutor(max_workers=6)
49+
4750
policy = asyncio.get_event_loop_policy()
4851
policy.set_event_loop(policy.new_event_loop())
4952
self.loop = asyncio.new_event_loop()
@@ -61,7 +64,27 @@ def run(self):
6164
self.tasks['report_last_block'] = self.loop.create_task(self.report_last_block())
6265
self.tasks['report_archival'] = self.loop.create_task(self.report_archival())
6366
self.tasks['main_loop'] = self.loop.create_task(self.main_loop())
64-
self.loop.run_until_complete(self.idle_loop())
67+
self.tasks['idle_loop'] = self.loop.create_task(self.idle_loop())
68+
69+
finished, unfinished = self.loop.run_until_complete(asyncio.wait([
70+
self.tasks['report_last_block'], self.tasks['report_archival'], self.tasks['main_loop'], self.tasks['idle_loop']],
71+
return_when=asyncio.FIRST_COMPLETED))
72+
73+
self.exit_event.set()
74+
75+
for to_cancel in unfinished:
76+
to_cancel.cancel()
77+
try:
78+
self.loop.run_until_complete(to_cancel)
79+
except:
80+
pass
81+
82+
self.threadpool_executor.shutdown()
83+
84+
self.output_queue.cancel_join_thread()
85+
self.input_queue.cancel_join_thread()
86+
self.output_queue.close()
87+
self.input_queue.close()
6588

6689
@property
6790
def info(self):
@@ -73,97 +96,81 @@ def info(self):
7396
'number': self.ls_index,
7497
}
7598

76-
async def report_dead(self):
77-
if not self.is_dead:
78-
self.is_dead = True
79-
80-
format_exc = traceback.format_exc()
81-
logger.error('Dead report: {format_exc}', format_exc=format_exc)
82-
await self.output_queue.coro_put((TonlibWorkerMsgType.DEAD_REPORT, format_exc))
83-
8499
async def report_last_block(self):
85-
try:
86-
while not self.is_dead:
87-
last_block = -1
88-
try:
89-
masterchain_info = await self.tonlib.get_masterchain_info()
90-
last_block = masterchain_info["last"]["seqno"]
91-
self.timeout_count = 0
92-
except asyncio.CancelledError:
93-
logger.warning('Client #{ls_index:03d} report_last_block timeout', ls_index=self.ls_index)
94-
self.timeout_count += 1
95-
except Exception as e:
96-
logger.error("Client #{ls_index:03d} report_last_block exception: {exc}", ls_index=self.ls_index, exc=e)
97-
self.timeout_count += 1
98-
99-
if self.timeout_count >= 10:
100-
raise RuntimeError(f'Client #{self.ls_index:03d} got {self.timeout_count} timeouts in report_last_block')
101-
102-
self.last_block = last_block
103-
await self.output_queue.coro_put((TonlibWorkerMsgType.LAST_BLOCK_UPDATE, self.last_block))
104-
await asyncio.sleep(1)
105-
except:
106-
await self.report_dead()
100+
while not self.exit_event.is_set():
101+
last_block = -1
102+
try:
103+
masterchain_info = await self.tonlib.get_masterchain_info()
104+
last_block = masterchain_info["last"]["seqno"]
105+
self.timeout_count = 0
106+
except asyncio.CancelledError:
107+
logger.warning('Client #{ls_index:03d} report_last_block timeout', ls_index=self.ls_index)
108+
self.timeout_count += 1
109+
except Exception as e:
110+
logger.error("Client #{ls_index:03d} report_last_block exception: {exc}", ls_index=self.ls_index, exc=e)
111+
self.timeout_count += 1
112+
113+
if self.timeout_count >= 10:
114+
raise RuntimeError(f'Client #{self.ls_index:03d} got {self.timeout_count} timeouts in report_last_block')
115+
116+
self.last_block = last_block
117+
await self.loop.run_in_executor(self.threadpool_executor, self.output_queue.put, (TonlibWorkerMsgType.LAST_BLOCK_UPDATE, self.last_block))
118+
await asyncio.sleep(1)
107119

108120
async def report_archival(self):
109-
try:
110-
while not self.is_dead:
111-
is_archival = False
112-
try:
113-
block_transactions = await self.tonlib.get_block_transactions(-1, -9223372036854775808, random.randint(2, 4096), count=10)
114-
is_archival = block_transactions.get("@type", "") == "blocks.transactions"
115-
except asyncio.CancelledError:
116-
logger.warning('Client #{ls_index:03d} report_archival timeout', ls_index=self.ls_index)
117-
except Exception as e:
118-
logger.error("Client #{ls_index:03d} report_archival exception: {exc}", ls_index=self.ls_index, exc=e)
119-
self.is_archival = is_archival
120-
await self.output_queue.coro_put((TonlibWorkerMsgType.ARCHIVAL_UPDATE, self.is_archival))
121-
await asyncio.sleep(600)
122-
except:
123-
await self.report_dead()
121+
while not self.exit_event.is_set():
122+
is_archival = False
123+
try:
124+
block_transactions = await self.tonlib.get_block_transactions(-1, -9223372036854775808, random.randint(2, 4096), count=10)
125+
is_archival = block_transactions.get("@type", "") == "blocks.transactions"
126+
except asyncio.CancelledError:
127+
logger.warning('Client #{ls_index:03d} report_archival timeout', ls_index=self.ls_index)
128+
except Exception as e:
129+
logger.error("Client #{ls_index:03d} report_archival exception: {exc}", ls_index=self.ls_index, exc=e)
130+
self.is_archival = is_archival
131+
await self.loop.run_in_executor(self.threadpool_executor, self.output_queue.put, (TonlibWorkerMsgType.ARCHIVAL_UPDATE, self.is_archival))
132+
await asyncio.sleep(600)
124133

125134
async def main_loop(self):
126-
try:
127-
while not self.is_dead:
135+
while not self.exit_event.is_set():
136+
try:
137+
task_id, timeout, method, args, kwargs = await self.loop.run_in_executor(self.threadpool_executor, self.input_queue.get, True, 1)
138+
except queue.Empty:
139+
continue
140+
141+
result = None
142+
exception = None
143+
144+
start_time = datetime.now()
145+
if time.time() < timeout:
128146
try:
129-
task_id, timeout, method, args, kwargs = await self.input_queue.coro_get(timeout=3)
130-
except:
131-
continue
132-
133-
result = None
134-
exception = None
135-
136-
start_time = datetime.now()
137-
if time.time() < timeout:
138-
try:
139-
result = await self.tonlib.__getattribute__(method)(*args, **kwargs)
140-
except asyncio.CancelledError:
141-
exception = Exception("Liteserver timeout")
142-
logger.warning("Client #{ls_index:03d} did not get response from liteserver before timeout", ls_index=self.ls_index)
143-
except Exception as e:
144-
exception = e
145-
logger.warning("Client #{ls_index:03d} raised exception while executing task. Method: {method}, args: {args}, kwargs: {kwargs}, exception: {exc}",
146-
ls_index=self.ls_index, method=method, args=args, kwargs=kwargs, exc=e)
147-
else:
148-
logger.debug("Client #{ls_index:03d} got result {method}", ls_index=self.ls_index, method=method)
147+
result = await self.tonlib.__getattribute__(method)(*args, **kwargs)
148+
except asyncio.CancelledError:
149+
exception = Exception("Liteserver timeout")
150+
logger.warning("Client #{ls_index:03d} did not get response from liteserver before timeout", ls_index=self.ls_index)
151+
except Exception as e:
152+
exception = e
153+
logger.warning("Client #{ls_index:03d} raised exception while executing task. Method: {method}, args: {args}, kwargs: {kwargs}, exception: {exc}",
154+
ls_index=self.ls_index, method=method, args=args, kwargs=kwargs, exc=e)
149155
else:
150-
exception = asyncio.TimeoutError()
151-
logger.warning("Client #{ls_index:03d} received task after timeout", ls_index=self.ls_index)
152-
end_time = datetime.now()
153-
elapsed_time = (end_time - start_time).total_seconds()
154-
155-
# result
156-
tonlib_task_result = TonlibClientResult(task_id,
157-
method,
158-
elapsed_time=elapsed_time,
159-
params=[args, kwargs],
160-
result=result,
161-
exception=exception,
162-
liteserver_info=self.info)
163-
await self.output_queue.coro_put((TonlibWorkerMsgType.TASK_RESULT, tonlib_task_result))
164-
except:
165-
await self.report_dead()
166-
156+
logger.debug("Client #{ls_index:03d} got result {method}", ls_index=self.ls_index, method=method)
157+
else:
158+
exception = asyncio.TimeoutError()
159+
logger.warning("Client #{ls_index:03d} received task after timeout", ls_index=self.ls_index)
160+
end_time = datetime.now()
161+
elapsed_time = (end_time - start_time).total_seconds()
162+
163+
# result
164+
tonlib_task_result = TonlibClientResult(task_id,
165+
method,
166+
elapsed_time=elapsed_time,
167+
params=[args, kwargs],
168+
result=result,
169+
exception=exception,
170+
liteserver_info=self.info)
171+
await self.loop.run_in_executor(self.threadpool_executor, self.output_queue.put, (TonlibWorkerMsgType.TASK_RESULT, tonlib_task_result))
172+
167173
async def idle_loop(self):
168-
while not self.is_dead:
174+
while not self.exit_event.is_set():
169175
await asyncio.sleep(0.5)
176+
raise Exception("exit_event set")

ton-http-api/requirements.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
aioprocessing>=2.0.0
21
aioredis>=2.0.1
32
loguru>=0.6.0
43
fastapi>=0.78.0

ton-http-api/setup.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
version=version,
1717
packages=find_packages('.', exclude=['tests']),
1818
install_requires=[
19-
'aioprocessing>=2.0.0',
2019
'aioredis>=2.0.1',
2120
'loguru>=0.6.0',
2221
'fastapi>=0.78.0',

0 commit comments

Comments
 (0)