Skip to content

Commit e6acde2

Browse files
committed
add async interface
1 parent 1c97c6d commit e6acde2

File tree

2 files changed

+134
-0
lines changed

2 files changed

+134
-0
lines changed

pylsp_jsonrpc/endpoint.py

+54
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44
import logging
55
import uuid
66
import sys
7+
import asyncio
78

9+
from threading import Thread
810
from concurrent import futures
911
from .exceptions import (JsonRpcException, JsonRpcRequestCancelled,
1012
JsonRpcInternalError, JsonRpcMethodNotFound)
@@ -14,6 +16,34 @@
1416
CANCEL_METHOD = '$/cancelRequest'
1517

1618

19+
async def run_as_daemon(func, *args):
20+
future = futures.Future()
21+
future.set_running_or_notify_cancel()
22+
23+
# A bug in python 3.7 makes it a bad idea to set a BaseException
24+
# in a wrapped future (see except statement in asyncio.Task.__wakeup)
25+
# Instead, we'll wrap base exceptions into exceptions and unwrap them
26+
# on the other side of the call.
27+
class BaseExceptionWrapper(Exception):
28+
pass
29+
30+
def daemon():
31+
try:
32+
result = func(*args)
33+
except Exception as e:
34+
future.set_exception(e)
35+
except BaseException as e:
36+
future.set_exception(BaseExceptionWrapper(e))
37+
else:
38+
future.set_result(result)
39+
40+
Thread(target=daemon, daemon=True).start()
41+
try:
42+
return await asyncio.wrap_future(future)
43+
except BaseExceptionWrapper as exc:
44+
raise exc.args[0]
45+
46+
1747
class Endpoint:
1848

1949
def __init__(self, dispatcher, consumer, id_generator=lambda: str(uuid.uuid4()), max_workers=5):
@@ -35,6 +65,19 @@ def __init__(self, dispatcher, consumer, id_generator=lambda: str(uuid.uuid4()),
3565
self._client_request_futures = {}
3666
self._server_request_futures = {}
3767
self._executor_service = futures.ThreadPoolExecutor(max_workers=max_workers)
68+
self._cancelledRequests = set()
69+
70+
def init_async(self):
71+
log.warning("init async")
72+
self._messageQueue = asyncio.Queue()
73+
74+
async def consume_task(self):
75+
log.warning("starting task")
76+
while True:
77+
message = await self._messageQueue.get()
78+
await run_as_daemon(self.consume, message)
79+
log.warning("got message in task")
80+
self._messageQueue.task_done()
3881

3982
def shutdown(self):
4083
self._executor_service.shutdown()
@@ -94,7 +137,15 @@ def callback(future):
94137
future.set_exception(JsonRpcRequestCancelled())
95138
return callback
96139

140+
async def consume_async(self, message):
141+
log.warning("got message put in queue")
142+
if message['method'] == CANCEL_METHOD:
143+
self._cancelledRequests.add(message.get('params')['id'])
144+
await self._messageQueue.put(message)
145+
146+
97147
def consume(self, message):
148+
log.warning("consume message")
98149
"""Consume a JSON RPC message from the client.
99150
100151
Args:
@@ -182,6 +233,9 @@ def _handle_request(self, msg_id, method, params):
182233
except KeyError as e:
183234
raise JsonRpcMethodNotFound.of(method) from e
184235

236+
if msg_id in self._cancelledRequests:
237+
raise JsonRpcRequestCancelled()
238+
185239
handler_result = handler(params)
186240

187241
if callable(handler_result):

pylsp_jsonrpc/streams.py

+80
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,10 @@
33

44
import logging
55
import threading
6+
import asyncio
7+
import sys
8+
from threading import Thread
9+
from concurrent.futures import Future
610

711
try:
812
import ujson as json
@@ -12,6 +16,34 @@
1216
log = logging.getLogger(__name__)
1317

1418

19+
async def run_as_daemon(func, *args):
20+
future = Future()
21+
future.set_running_or_notify_cancel()
22+
23+
# A bug in python 3.7 makes it a bad idea to set a BaseException
24+
# in a wrapped future (see except statement in asyncio.Task.__wakeup)
25+
# Instead, we'll wrap base exceptions into exceptions and unwrap them
26+
# on the other side of the call.
27+
class BaseExceptionWrapper(Exception):
28+
pass
29+
30+
def daemon():
31+
try:
32+
result = func(*args)
33+
except Exception as e:
34+
future.set_exception(e)
35+
except BaseException as e:
36+
future.set_exception(BaseExceptionWrapper(e))
37+
else:
38+
future.set_result(result)
39+
40+
Thread(target=daemon, daemon=True).start()
41+
try:
42+
return await asyncio.wrap_future(future)
43+
except BaseExceptionWrapper as exc:
44+
raise exc.args[0]
45+
46+
1547
class JsonRpcStreamReader:
1648
def __init__(self, rfile):
1749
self._rfile = rfile
@@ -25,6 +57,7 @@ def listen(self, message_consumer):
2557
Args:
2658
message_consumer (fn): function that is passed each message as it is read off the socket.
2759
"""
60+
2861
while not self._rfile.closed:
2962
try:
3063
request_str = self._read_message()
@@ -65,6 +98,53 @@ def _read_message(self):
6598
# Grab the body
6699
return self._rfile.read(content_length)
67100

101+
async def listen_async(self, message_consumer):
102+
"""Blocking call to listen for messages on the rfile.
103+
104+
Args:
105+
message_consumer (fn): function that is passed each message as it is read off the socket.
106+
"""
107+
108+
while not self._rfile.closed:
109+
try:
110+
request_str = await self._read_message_async()
111+
except ValueError:
112+
if self._rfile.closed:
113+
return
114+
log.exception("Failed to read from rfile")
115+
116+
if request_str is None:
117+
break
118+
119+
try:
120+
await message_consumer(json.loads(request_str.decode('utf-8')))
121+
except ValueError:
122+
log.exception("Failed to parse JSON message %s", request_str)
123+
continue
124+
125+
async def _read_message_async(self):
126+
"""Reads the contents of a message.
127+
128+
Returns:
129+
body of message if parsable else None
130+
"""
131+
line = await run_as_daemon(self._rfile.readline)
132+
133+
if not line:
134+
return None
135+
136+
content_length = self._content_length(line)
137+
138+
# Blindly consume all header lines
139+
while line and line.strip():
140+
line = await run_as_daemon(self._rfile.readline)
141+
142+
if not line:
143+
return None
144+
145+
# Grab the body
146+
return await run_as_daemon(self._rfile.read, content_length)
147+
68148
@staticmethod
69149
def _content_length(line):
70150
"""Extract the content length from an input line."""

0 commit comments

Comments
 (0)