Skip to content

Commit ddf8f8e

Browse files
committed
example
1 parent a761391 commit ddf8f8e

File tree

3 files changed

+282
-5
lines changed

3 files changed

+282
-5
lines changed

examples/rpc.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -85,15 +85,15 @@ async def main():
8585

8686

8787
def register_receiver_methods(greeters_room: rtc.Room, math_genius_room: rtc.Room):
88-
@greeters_room.local_participant.register_rpc_method("arrival")
88+
@greeters_room.register_rpc_method("arrival")
8989
async def arrival_method(
9090
data: RpcInvocationData,
9191
):
9292
print(f'[Greeter] Oh {data.caller_identity} arrived and said "{data.payload}"')
9393
await asyncio.sleep(2)
9494
return "Welcome and have a wonderful day!"
9595

96-
@math_genius_room.local_participant.register_rpc_method("square-root")
96+
@math_genius_room.register_rpc_method("square-root")
9797
async def square_root_method(
9898
data: RpcInvocationData,
9999
):
@@ -110,7 +110,7 @@ async def square_root_method(
110110
print(f"[Math Genius] Aha! It's {result}")
111111
return json.dumps({"result": result})
112112

113-
@math_genius_room.local_participant.register_rpc_method("divide")
113+
@math_genius_room.register_rpc_method("divide")
114114
async def divide_method(
115115
data: RpcInvocationData,
116116
):
@@ -122,7 +122,7 @@ async def divide_method(
122122
result = dividend / divisor
123123
return json.dumps({"result": result})
124124

125-
@math_genius_room.local_participant.register_rpc_method("long-calculation")
125+
@math_genius_room.register_rpc_method("long-calculation")
126126
async def long_calculation_method(
127127
data: RpcInvocationData,
128128
):

examples/rpc_deprecated.py

+277
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,277 @@
1+
from livekit import rtc, api
2+
import os
3+
import json
4+
import asyncio
5+
from dotenv import load_dotenv
6+
from livekit.rtc.rpc import RpcInvocationData
7+
8+
load_dotenv(dotenv_path=".env.local", override=False)
9+
LIVEKIT_API_KEY = os.getenv("LIVEKIT_API_KEY")
10+
LIVEKIT_API_SECRET = os.getenv("LIVEKIT_API_SECRET")
11+
LIVEKIT_URL = os.getenv("LIVEKIT_URL")
12+
if not LIVEKIT_API_KEY or not LIVEKIT_API_SECRET or not LIVEKIT_URL:
13+
raise ValueError("Missing required environment variables. Please check your .env.local file.")
14+
15+
16+
async def main():
17+
rooms = [] # Keep track of all rooms for cleanup
18+
try:
19+
room_name = f"rpc-test-{os.urandom(4).hex()}"
20+
print(f"Connecting participants to room: {room_name}")
21+
22+
callers_room, greeters_room, math_genius_room = await asyncio.gather(
23+
connect_participant("caller", room_name),
24+
connect_participant("greeter", room_name),
25+
connect_participant("math-genius", room_name),
26+
)
27+
rooms = [callers_room, greeters_room, math_genius_room]
28+
29+
register_receiver_methods(greeters_room, math_genius_room)
30+
31+
try:
32+
print("\n\nRunning greeting example...")
33+
await asyncio.gather(perform_greeting(callers_room))
34+
except Exception as error:
35+
print("Error:", error)
36+
37+
try:
38+
print("\n\nRunning error handling example...")
39+
await perform_divide(callers_room)
40+
except Exception as error:
41+
print("Error:", error)
42+
43+
try:
44+
print("\n\nRunning math example...")
45+
await perform_square_root(callers_room)
46+
await asyncio.sleep(2)
47+
await perform_quantum_hypergeometric_series(callers_room)
48+
except Exception as error:
49+
print("Error:", error)
50+
51+
try:
52+
print("\n\nRunning long calculation with timeout...")
53+
await asyncio.create_task(perform_long_calculation(callers_room))
54+
except Exception as error:
55+
print("Error:", error)
56+
57+
try:
58+
print("\n\nRunning long calculation with disconnect...")
59+
# Start the long calculation
60+
long_calc_task = asyncio.create_task(perform_long_calculation(callers_room))
61+
# Wait a bit then disconnect the math genius
62+
await asyncio.sleep(5)
63+
print("\nDisconnecting math genius early...")
64+
await math_genius_room.disconnect()
65+
# Wait for the calculation to fail
66+
await long_calc_task
67+
except Exception as error:
68+
print("Error:", error)
69+
70+
print("\n\nParticipants done, disconnecting remaining participants...")
71+
await callers_room.disconnect()
72+
await greeters_room.disconnect()
73+
74+
print("Participants disconnected. Example completed.")
75+
76+
except KeyboardInterrupt:
77+
print("\nReceived interrupt signal, cleaning up...")
78+
except Exception as e:
79+
print(f"Unexpected error: {e}")
80+
finally:
81+
# Clean up all rooms
82+
print("Disconnecting all participants...")
83+
await asyncio.gather(*(room.disconnect() for room in rooms), return_exceptions=True)
84+
print("Cleanup complete")
85+
86+
87+
def register_receiver_methods(greeters_room: rtc.Room, math_genius_room: rtc.Room):
88+
@greeters_room.local_participant.register_rpc_method("arrival")
89+
async def arrival_method(
90+
data: RpcInvocationData,
91+
):
92+
print(f'[Greeter] Oh {data.caller_identity} arrived and said "{data.payload}"')
93+
await asyncio.sleep(2)
94+
return "Welcome and have a wonderful day!"
95+
96+
@math_genius_room.local_participant.register_rpc_method("square-root")
97+
async def square_root_method(
98+
data: RpcInvocationData,
99+
):
100+
json_data = json.loads(data.payload)
101+
number = json_data["number"]
102+
print(
103+
f"[Math Genius] I guess {data.caller_identity} wants the square root of {number}. I've only got {data.response_timeout} seconds to respond but I think I can pull it off."
104+
)
105+
106+
print("[Math Genius] *doing math*…")
107+
await asyncio.sleep(2)
108+
109+
result = number**0.5
110+
print(f"[Math Genius] Aha! It's {result}")
111+
return json.dumps({"result": result})
112+
113+
@math_genius_room.local_participant.register_rpc_method("divide")
114+
async def divide_method(
115+
data: RpcInvocationData,
116+
):
117+
json_data = json.loads(data.payload)
118+
dividend = json_data["dividend"]
119+
divisor = json_data["divisor"]
120+
print(f"[Math Genius] {data.caller_identity} wants to divide {dividend} by {divisor}.")
121+
122+
result = dividend / divisor
123+
return json.dumps({"result": result})
124+
125+
@math_genius_room.local_participant.register_rpc_method("long-calculation")
126+
async def long_calculation_method(
127+
data: RpcInvocationData,
128+
):
129+
print(f"[Math Genius] Starting a very long calculation for {data.caller_identity}")
130+
print(
131+
f"[Math Genius] This will take 30 seconds even though you're only giving me {data.response_timeout} seconds"
132+
)
133+
await asyncio.sleep(30)
134+
return json.dumps({"result": "Calculation complete!"})
135+
136+
137+
async def perform_greeting(room: rtc.Room):
138+
print("[Caller] Letting the greeter know that I've arrived")
139+
try:
140+
response = await room.local_participant.perform_rpc(
141+
destination_identity="greeter", method="arrival", payload="Hello"
142+
)
143+
print(f'[Caller] That\'s nice, the greeter said: "{response}"')
144+
except Exception as error:
145+
print(f"[Caller] RPC call failed: {error}")
146+
raise
147+
148+
149+
async def perform_square_root(room: rtc.Room):
150+
print("[Caller] What's the square root of 16?")
151+
try:
152+
response = await room.local_participant.perform_rpc(
153+
destination_identity="math-genius",
154+
method="square-root",
155+
payload=json.dumps({"number": 16}),
156+
)
157+
parsed_response = json.loads(response)
158+
print(f"[Caller] Nice, the answer was {parsed_response['result']}")
159+
except Exception as error:
160+
print(f"[Caller] RPC call failed: {error}")
161+
raise
162+
163+
164+
async def perform_quantum_hypergeometric_series(room: rtc.Room):
165+
print("[Caller] What's the quantum hypergeometric series of 42?")
166+
try:
167+
response = await room.local_participant.perform_rpc(
168+
destination_identity="math-genius",
169+
method="quantum-hypergeometric-series",
170+
payload=json.dumps({"number": 42}),
171+
)
172+
parsed_response = json.loads(response)
173+
print(f"[Caller] genius says {parsed_response['result']}!")
174+
except rtc.RpcError as error:
175+
if error.code == rtc.RpcError.ErrorCode.UNSUPPORTED_METHOD:
176+
print("[Caller] Aww looks like the genius doesn't know that one.")
177+
return
178+
print("[Caller] Unexpected error:", error)
179+
raise
180+
except Exception as error:
181+
print("[Caller] Unexpected error:", error)
182+
raise
183+
184+
185+
async def perform_divide(room: rtc.Room):
186+
print("[Caller] Let's divide 10 by 0.")
187+
try:
188+
response = await room.local_participant.perform_rpc(
189+
destination_identity="math-genius",
190+
method="divide",
191+
payload=json.dumps({"dividend": 10, "divisor": 0}),
192+
)
193+
parsed_response = json.loads(response)
194+
print(f"[Caller] The result is {parsed_response['result']}")
195+
except rtc.RpcError as error:
196+
if error.code == rtc.RpcError.ErrorCode.APPLICATION_ERROR:
197+
print("[Caller] Aww something went wrong with that one, lets try something else.")
198+
else:
199+
print(f"[Caller] RPC call failed with unexpected RpcError: {error}")
200+
except Exception as error:
201+
print(f"[Caller] RPC call failed with unexpected error: {error}")
202+
203+
204+
async def perform_long_calculation(room: rtc.Room):
205+
print("[Caller] Giving the math genius 10s to complete a long calculation")
206+
try:
207+
response = await room.local_participant.perform_rpc(
208+
destination_identity="math-genius",
209+
method="long-calculation",
210+
payload=json.dumps({}),
211+
response_timeout=10,
212+
)
213+
parsed_response = json.loads(response)
214+
print(f"[Caller] Result: {parsed_response['result']}")
215+
except rtc.RpcError as error:
216+
if error.code == rtc.RpcError.ErrorCode.RESPONSE_TIMEOUT:
217+
print("[Caller] Math genius took too long to respond")
218+
elif error.code == rtc.RpcError.ErrorCode.RECIPIENT_DISCONNECTED:
219+
print("[Caller] Math genius disconnected before response was received")
220+
else:
221+
print(f"[Caller] Unexpected RPC error: {error}")
222+
except Exception as error:
223+
print(f"[Caller] Unexpected error: {error}")
224+
225+
226+
def create_token(identity: str, room_name: str):
227+
token = (
228+
api.AccessToken(LIVEKIT_API_KEY, LIVEKIT_API_SECRET)
229+
.with_identity(identity)
230+
.with_grants(
231+
api.VideoGrants(
232+
room=room_name,
233+
room_join=True,
234+
can_publish=True,
235+
can_subscribe=True,
236+
)
237+
)
238+
)
239+
return token.to_jwt()
240+
241+
242+
async def connect_participant(identity: str, room_name: str) -> rtc.Room:
243+
room = rtc.Room()
244+
token = create_token(identity, room_name)
245+
246+
def on_disconnected(reason: str):
247+
print(f"[{identity}] Disconnected from room: {reason}")
248+
249+
room.on("disconnected", on_disconnected)
250+
251+
await room.connect(LIVEKIT_URL, token)
252+
253+
async def wait_for_participants():
254+
if room.remote_participants:
255+
return
256+
participant_connected = asyncio.Event()
257+
258+
def _on_participant_connected(participant: rtc.RemoteParticipant):
259+
room.off("participant_connected", _on_participant_connected)
260+
participant_connected.set()
261+
262+
room.on("participant_connected", _on_participant_connected)
263+
await participant_connected.wait()
264+
265+
try:
266+
await asyncio.wait_for(wait_for_participants(), timeout=5.0)
267+
except asyncio.TimeoutError:
268+
raise TimeoutError("Timed out waiting for participants")
269+
270+
return room
271+
272+
273+
if __name__ == "__main__":
274+
try:
275+
asyncio.run(main())
276+
except KeyboardInterrupt:
277+
print("\nProgram terminated by user")

livekit-rtc/livekit/rtc/room.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -952,7 +952,7 @@ async def run_handler():
952952

953953
req = proto_ffi.FfiRequest(
954954
rpc_method_invocation_response=RpcMethodInvocationResponseRequest(
955-
local_participant_handle=self._ffi_handle.handle,
955+
local_participant_handle=self._local_participant._ffi_handle.handle,
956956
invocation_id=invocation_id,
957957
error=response_error._to_proto() if response_error else None,
958958
payload=response_payload,

0 commit comments

Comments
 (0)