Skip to content

Commit 8ebd2ad

Browse files
committed
add async interface
1 parent 1c97c6d commit 8ebd2ad

File tree

2 files changed

+62
-0
lines changed

2 files changed

+62
-0
lines changed

pylsp_jsonrpc/endpoint.py

+36
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import logging
55
import uuid
66
import sys
7+
import asyncio
78

89
from concurrent import futures
910
from .exceptions import (JsonRpcException, JsonRpcRequestCancelled,
@@ -12,6 +13,7 @@
1213
log = logging.getLogger(__name__)
1314
JSONRPC_VERSION = '2.0'
1415
CANCEL_METHOD = '$/cancelRequest'
16+
EXIT_METHOD = 'exit'
1517

1618

1719
class Endpoint:
@@ -35,9 +37,25 @@ def __init__(self, dispatcher, consumer, id_generator=lambda: str(uuid.uuid4()),
3537
self._client_request_futures = {}
3638
self._server_request_futures = {}
3739
self._executor_service = futures.ThreadPoolExecutor(max_workers=max_workers)
40+
self._cancelledRequests = set()
41+
self._messageQueue = None
42+
self._consume_task = None
43+
44+
def init_async(self):
45+
self._messageQueue = asyncio.Queue()
46+
self._consume_task = asyncio.create_task(self.consume_task())
47+
48+
async def consume_task(self):
49+
loop = asyncio.get_running_loop()
50+
while loop.is_running():
51+
message = await self._messageQueue.get()
52+
await asyncio.to_thread(self.consume, message)
53+
self._messageQueue.task_done()
3854

3955
def shutdown(self):
4056
self._executor_service.shutdown()
57+
if self._consume_task is not None:
58+
self._consume_task.cancel()
4159

4260
def notify(self, method, params=None):
4361
"""Send a JSON RPC notification to the client.
@@ -94,6 +112,21 @@ def callback(future):
94112
future.set_exception(JsonRpcRequestCancelled())
95113
return callback
96114

115+
async def consume_async(self, message):
116+
"""Consume a JSON RPC message from the client and put it into a queue.
117+
118+
Args:
119+
message (dict): The JSON RPC message sent by the client
120+
"""
121+
if message['method'] == CANCEL_METHOD:
122+
self._cancelledRequests.add(message.get('params')['id'])
123+
124+
# The exit message needs to be handled directly since the stream cannot be closed asynchronously
125+
if message['method'] == EXIT_METHOD:
126+
self.consume(message)
127+
else:
128+
await self._messageQueue.put(message)
129+
97130
def consume(self, message):
98131
"""Consume a JSON RPC message from the client.
99132
@@ -182,6 +215,9 @@ def _handle_request(self, msg_id, method, params):
182215
except KeyError as e:
183216
raise JsonRpcMethodNotFound.of(method) from e
184217

218+
if msg_id in self._cancelledRequests:
219+
raise JsonRpcRequestCancelled()
220+
185221
handler_result = handler(params)
186222

187223
if callable(handler_result):

pylsp_jsonrpc/streams.py

+26
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
import logging
55
import threading
6+
import asyncio
67

78
try:
89
import ujson as json
@@ -65,6 +66,31 @@ def _read_message(self):
6566
# Grab the body
6667
return self._rfile.read(content_length)
6768

69+
async def listen_async(self, message_consumer):
70+
"""Blocking call to listen for messages on the rfile.
71+
72+
Args:
73+
message_consumer (fn): function that is passed each message as it is read off the socket.
74+
"""
75+
76+
while not self._rfile.closed:
77+
try:
78+
request_str = await asyncio.to_thread(self._read_message)
79+
log.warning("got stdin")
80+
except ValueError:
81+
if self._rfile.closed:
82+
return
83+
log.exception("Failed to read from rfile")
84+
85+
if request_str is None:
86+
break
87+
88+
try:
89+
await message_consumer(json.loads(request_str.decode('utf-8')))
90+
except ValueError:
91+
log.exception("Failed to parse JSON message %s", request_str)
92+
continue
93+
6894
@staticmethod
6995
def _content_length(line):
7096
"""Extract the content length from an input line."""

0 commit comments

Comments
 (0)