Skip to content

Commit ad1df8f

Browse files
authored
Add stream_id and sender_identity to stream_text params (#362)
1 parent a766014 commit ad1df8f

File tree

3 files changed

+23
-13
lines changed

3 files changed

+23
-13
lines changed

livekit-rtc/livekit/rtc/_utils.py

+8-7
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from collections import deque
1818
import ctypes
1919
import random
20-
from typing import Callable, Generic, List, TypeVar
20+
from typing import Callable, Generator, Generic, List, TypeVar
2121

2222
logger = logging.getLogger("livekit")
2323

@@ -133,12 +133,13 @@ def generate_random_base62(length=12):
133133

134134

135135
# adapted from https://stackoverflow.com/a/6043797
136-
def split_utf8(s: str, n: int):
136+
def split_utf8(s: str, n: int) -> Generator[bytes, None, None]:
137137
"""Split UTF-8 s into chunks of maximum length n."""
138-
while len(s) > n:
138+
encoded = s.encode()
139+
while len(encoded) > n:
139140
k = n
140-
while (ord(s[k]) & 0xC0) == 0x80:
141+
while (encoded[k] & 0xC0) == 0x80:
141142
k -= 1
142-
yield s[:k]
143-
s = s[k:]
144-
yield s
143+
yield encoded[:k]
144+
encoded = encoded[k:]
145+
yield encoded

livekit-rtc/livekit/rtc/data_stream.py

+11-6
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,10 @@
2727
from ._utils import split_utf8
2828
from typing import TYPE_CHECKING
2929

30+
3031
if TYPE_CHECKING:
3132
from .participant import LocalParticipant
3233

33-
3434
STREAM_CHUNK_SIZE = 15_000
3535

3636

@@ -65,7 +65,6 @@ def __init__(
6565
attachments=list(header.text_header.attached_stream_ids),
6666
)
6767
self._queue: asyncio.Queue[proto_DataStream.Chunk | None] = asyncio.Queue()
68-
self._chunks: Dict[int, proto_DataStream.Chunk] = {}
6968

7069
async def _on_chunk_update(self, chunk: proto_DataStream.Chunk):
7170
await self._queue.put(chunk)
@@ -146,6 +145,7 @@ def __init__(
146145
total_size: int | None = None,
147146
mime_type: str = "",
148147
destination_identities: Optional[List[str]] = None,
148+
sender_identity: str | None = None,
149149
):
150150
self._local_participant = local_participant
151151
if stream_id is None:
@@ -161,14 +161,15 @@ def __init__(
161161
)
162162
self._next_chunk_index: int = 0
163163
self._destination_identities = destination_identities
164+
self._sender_identity = sender_identity or self._local_participant.identity
164165

165166
async def _send_header(self):
166167
req = proto_ffi.FfiRequest(
167168
send_stream_header=proto_room.SendStreamHeaderRequest(
168169
header=self._header,
169170
local_participant_handle=self._local_participant._ffi_handle.handle,
170171
destination_identities=self._destination_identities,
171-
sender_identity=self._local_participant.identity,
172+
sender_identity=self._sender_identity,
172173
)
173174
)
174175

@@ -230,10 +231,12 @@ async def _send_trailer(self, trailer: proto_DataStream.Trailer):
230231
if cb.send_stream_chunk.error:
231232
raise ConnectionError(cb.send_stream_trailer.error)
232233

233-
async def aclose(self):
234+
async def aclose(
235+
self, *, reason: str = "", attributes: Optional[Dict[str, str]] = None
236+
):
234237
await self._send_trailer(
235238
trailer=proto_DataStream.Trailer(
236-
stream_id=self._header.stream_id, reason=""
239+
stream_id=self._header.stream_id, reason=reason, attributes=attributes
237240
)
238241
)
239242

@@ -249,6 +252,7 @@ def __init__(
249252
total_size: int | None = None,
250253
reply_to_id: str | None = None,
251254
destination_identities: Optional[List[str]] = None,
255+
sender_identity: str | None = None,
252256
) -> None:
253257
super().__init__(
254258
local_participant,
@@ -258,6 +262,7 @@ def __init__(
258262
total_size,
259263
mime_type="text/plain",
260264
destination_identities=destination_identities,
265+
sender_identity=sender_identity,
261266
)
262267
self._header.text_header.operation_type = proto_DataStream.OperationType.CREATE
263268
if reply_to_id:
@@ -276,7 +281,7 @@ def __init__(
276281
async def write(self, text: str):
277282
async with self._write_lock:
278283
for chunk in split_utf8(text, STREAM_CHUNK_SIZE):
279-
content = chunk.encode()
284+
content = chunk
280285
chunk_index = self._next_chunk_index
281286
self._next_chunk_index += 1
282287
chunk_msg = proto_DataStream.Chunk(

livekit-rtc/livekit/rtc/participant.py

+4
Original file line numberDiff line numberDiff line change
@@ -585,8 +585,10 @@ async def stream_text(
585585
destination_identities: Optional[List[str]] = None,
586586
topic: str = "",
587587
attributes: Optional[Dict[str, str]] = None,
588+
stream_id: str | None = None,
588589
reply_to_id: str | None = None,
589590
total_size: int | None = None,
591+
sender_identity: str | None = None,
590592
) -> TextStreamWriter:
591593
"""
592594
Returns a TextStreamWriter that allows to write individual chunks of text to a text stream.
@@ -599,6 +601,8 @@ async def stream_text(
599601
reply_to_id=reply_to_id,
600602
destination_identities=destination_identities,
601603
total_size=total_size,
604+
stream_id=stream_id,
605+
sender_identity=sender_identity,
602606
)
603607

604608
await writer._send_header()

0 commit comments

Comments
 (0)