Skip to content

Commit c8ea572

Browse files
committed
add newline stream io
1 parent 77faf0b commit c8ea572

File tree

3 files changed

+35
-2
lines changed

3 files changed

+35
-2
lines changed

Diff for: lf_toolkit/rpc/ipc_server.py

+6
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,11 @@
44

55
import anyio
66

7+
from lf_toolkit.rpc.stream_io import StreamIO
8+
79
from .ipc_listener_base import IPCListener
810
from .rpc_handler import RpcHandler
11+
from .stream_io import NewlineStreamIO
912
from .stream_io import StreamServer
1013

1114

@@ -34,6 +37,9 @@ def __init__(
3437
self._listener = listener_factory(endpoint)
3538
super().__init__(handler)
3639

40+
def wrap_io(self, client: StreamIO) -> StreamIO:
41+
return NewlineStreamIO(client)
42+
3743
async def run(self):
3844
async with anyio.create_task_group() as task_group:
3945
async for client in self._listener.listen():

Diff for: lf_toolkit/rpc/stdio_server.py

+4
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from anyio.streams.stapled import StapledByteStream
88

99
from .rpc_handler import RpcHandler
10+
from .stream_io import PrefixStreamIO
1011
from .stream_io import StreamIO
1112
from .stream_io import StreamServer
1213

@@ -38,5 +39,8 @@ def __init__(self, handler: Optional[RpcHandler] = None):
3839
super().__init__(handler)
3940
self._client = StdioClient()
4041

42+
def wrap_io(self, client: StreamIO) -> StreamIO:
43+
return PrefixStreamIO(client)
44+
4145
async def run(self):
4246
await self._handle_client(self._client)

Diff for: lf_toolkit/rpc/stream_io.py

+25-2
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,26 @@ async def write(self, data: bytes):
1717
pass
1818

1919

20+
class NewlineStreamIO:
21+
22+
base: StreamIO
23+
24+
def __init__(self, base: StreamIO):
25+
self.base = base
26+
27+
async def read(self, size: int) -> bytes:
28+
data = b""
29+
while len(data) < size:
30+
chunk = await self.base.read(1)
31+
if chunk == b"\n":
32+
break
33+
data += chunk
34+
return data
35+
36+
async def write(self, data: bytes):
37+
await self.base.write(data + b"\n")
38+
39+
2040
class PrefixStreamIO:
2141

2242
base: StreamIO
@@ -56,12 +76,15 @@ async def write(self, data: bytes):
5676

5777
class StreamServer(BaseServer):
5878

79+
def wrap_io(self, client: StreamIO) -> StreamIO:
80+
return client
81+
5982
async def _handle_client(self, client: StreamIO):
60-
io = PrefixStreamIO(client)
83+
io = self.wrap_io(client)
6184

6285
while True:
6386
try:
64-
data = await io.read(1024)
87+
data = await io.read(4096)
6588

6689
if not data:
6790
print("Received empty data")

0 commit comments

Comments
 (0)