Add initial support for frame processor usage directly on tracks#671
Conversation
🦋 Changeset detectedLatest commit: e4a769c The changes in this PR will be included in the next version bump. This PR includes changesets to release 1 package
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
| "outDir": "dist", | ||
| "declarationDir": "dist" | ||
| "declarationDir": "dist", | ||
| "lib": ["es2015", "es2021.weakref"] |
There was a problem hiding this comment.
Is this an ok addition to be making here? Or does there need to be a WeakRef polyfill which falls back to some sort of a no-op class wrapper?
(in case it's useful, WeakRef was introduced in node v16.4: mdn link)
|
Going to wait for some of the conversation on the python version of this to shake out before continuing to push this forward. |
Cases like this:
⎯⎯⎯⎯⎯ Uncaught Exception ⎯⎯⎯⎯⎯
Error: trying to drop an invalid handle
⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯
Serialized Error: { code: 'GenericFailure' }
This error originated in "src/audio_stream_room_lifecycle.test.ts" test file. It doesn't mean the error was thrown inside the file itself, but while it was running.
…oth in node and bun test runners
3727db2 to
255f712
Compare
| // These tests fabricate Tracks/publications without going through the real FFI. | ||
| // The native FfiHandle (a NAPI class) registers a Rust-side drop on the JS | ||
| // wrapper that fires at GC and throws "trying to drop an invalid handle" for any | ||
| // unallocated id — an uncaught exception that surfaces intermittently on CI. | ||
| // There is no JS-level hook to disarm that native finalizer, and module-mocking | ||
| // the bindings isn't portable (the suite also runs under `bun test`, whose `vi` | ||
| // shim lacks vi.mock/vi.importActual). So we never construct a real handle: | ||
| // Tracks are built via Object.create (bypassing the constructor — the Track | ||
| // class is written to support this), and the one production path that does build | ||
| // a real handle (publishTrack -> LocalTrackPublication) is pinned to keep its | ||
| // wrapper alive so the finalizer never runs. | ||
|
|
||
| // eslint-disable-next-line @typescript-eslint/no-explicit-any | ||
| const inertFfiHandle = () => ({ handle: BigInt(0), dispose() {} }) as any; | ||
|
|
||
| // Keeps real FfiHandle wrappers reachable for the lifetime of the test process | ||
| // so their native finalizers never fire. See the note above. | ||
| const ffiHandleKeepAlive: Array<unknown> = []; | ||
|
|
||
| /** | ||
| * Stub the FfiClient singleton's request/waitFor round-trip via plain property | ||
| * assignment. Runner-agnostic — avoids vi.spyOn, which Bun's `vi` shim doesn't | ||
| * fully implement. Returns a restore function to call in `finally`. | ||
| */ | ||
| // eslint-disable-next-line @typescript-eslint/no-explicit-any | ||
| function stubFfiRoundTrip(waitForResult: any): () => void { | ||
| // eslint-disable-next-line @typescript-eslint/no-explicit-any | ||
| const client = FfiClient.instance as any; | ||
| const origRequest = client.request; | ||
| const origWaitFor = client.waitFor; | ||
| client.request = () => ({ asyncId: BigInt(1) }); | ||
| client.waitFor = async () => waitForResult; | ||
| return () => { | ||
| client.request = origRequest; | ||
| client.waitFor = origWaitFor; | ||
| }; | ||
| } |
There was a problem hiding this comment.
Note to reviewers: I'm not convinced this whole testing scheme is a good idea...
The reason I ended up doing this - I want to create mock ffi class objects in memory I can use to test all this frame processor lifecycle stuff. Unfortunately, it's not really possible to disentangle into some pure functions or less coupled pieces so this to me was the least bad option I could think of.
Other thing I tried - doing some mocking with vite, but unfortunately, these tests are run on both vitest and bun and bun doesn't have support for as much of the fancy mocking features.
Maybe there's a better way to do this where there's support in the sdk itself for an "inert" ffi handle rather than working around it in the tests? Or maybe there's some better patterns I've missed that could be a better alternative?
| // Lazily-created bound listener for the Room's `tokenRefreshed` event. Stored | ||
| // so the same function reference is used for both on() and off(). Defined via | ||
| // a getter + plain method (rather than an instance arrow field) so the class | ||
| // can be constructed through Object.create in tests — bypassing the FFI handle | ||
| // — while keeping a stable listener identity. | ||
| private boundOnRoomTokenRefreshed?: () => void; | ||
|
|
||
| private get roomTokenRefreshedListener(): () => void { | ||
| if (!this.boundOnRoomTokenRefreshed) { | ||
| this.boundOnRoomTokenRefreshed = () => this.onRoomTokenRefreshed(); | ||
| } | ||
| return this.boundOnRoomTokenRefreshed; | ||
| } |
There was a problem hiding this comment.
This is also more fallout from the above ffi client mocking stuff, I really am not a fan... I haven't been able to find a better solution though.
There was a problem hiding this comment.
thanks for the explanation on why this is necessary
| ffi_handle: FfiHandle; | ||
|
|
||
| private roomRef: WeakRef<Room> | null = null; | ||
| private audioStreams: Set<WeakRef<AudioStreamSource>> = new Set(); |
There was a problem hiding this comment.
Interesting thing I learned while building this - WeakSet<T> is not iterable by design. So in this case Set<WeakRef<T>> is actually required given the class needs to iterate through the set to propagate events to all downstream AudioStreamSources.
| "outDir": "dist", | ||
| "declarationDir": "dist" | ||
| "declarationDir": "dist", | ||
| "lib": ["es2015", "es2021.weakref"] |
| // Lazily-created bound listener for the Room's `tokenRefreshed` event. Stored | ||
| // so the same function reference is used for both on() and off(). Defined via | ||
| // a getter + plain method (rather than an instance arrow field) so the class | ||
| // can be constructed through Object.create in tests — bypassing the FFI handle | ||
| // — while keeping a stable listener identity. | ||
| private boundOnRoomTokenRefreshed?: () => void; | ||
|
|
||
| private get roomTokenRefreshedListener(): () => void { | ||
| if (!this.boundOnRoomTokenRefreshed) { | ||
| this.boundOnRoomTokenRefreshed = () => this.onRoomTokenRefreshed(); | ||
| } | ||
| return this.boundOnRoomTokenRefreshed; | ||
| } |
There was a problem hiding this comment.
thanks for the explanation on why this is necessary
| const publication = this.localParticipant.trackPublications.get(ev.value.publicationSid!); | ||
| const track = publication?.track; | ||
| if (track) { | ||
| track.setRoom(null); | ||
| } | ||
| this.localParticipant.trackPublications.delete(ev.value.publicationSid!); | ||
| // Emit while `publication.track` is still set, preserving the pre-existing | ||
| // payload for callbacks. The handler is synchronous, so nulling the track | ||
| // right after still completes before any other turn can observe it. | ||
| this.emit(RoomEvent.LocalTrackUnpublished, publication!, this.localParticipant!); | ||
| // Mirror trackUnsubscribed: drop the publication's track reference. This | ||
| // also makes unpublishTrack's own setRoom(null) a no-op when it loses the | ||
| // race (its `pub.track` guard short-circuits), avoiding a redundant clear. | ||
| if (track && publication) { | ||
| publication.track = undefined; | ||
| } |
There was a problem hiding this comment.
🚩 localTrackUnpublished emits undefined publication when unpublishTrack wins the race
When unpublishTrack wins the race against the localTrackUnpublished room event, it deletes the publication from trackPublications and sets pub.track = undefined. When the room event handler subsequently runs at packages/livekit-rtc/src/room.ts:582, trackPublications.get(ev.value.publicationSid!) returns undefined, and the emit at line 591 passes undefined! (via the non-null assertion) as the publication to event listeners. This is a pre-existing pattern (the emit with publication! was there before this PR), and the new code doesn't worsen it — the track guard at line 584 prevents calling setRoom(null) on undefined. But downstream listeners receiving an undefined publication could be surprising.
Was this helpful? React with 👍 or 👎 to provide feedback.
…o pass both in node and bun test runners" This reverts commit 255f712. The bun test-runner workaround (Object.create-based track fabrication, inert FfiHandle stubs, keep-alive pinning, and replacing vi.mock/vi.spyOn with manual FFI stubbing) is no longer needed. CI now runs vitest under the bun runtime via `bun --bun run test` (see the CI change), which forces bun to execute vitest (overriding its node shebang) so test bodies run on bun while vitest still provides full module mocking. This restores the clean vi.mock-based tests and the original arrow-field Track listener. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Adds src/bun_runtime.test.ts, armed via EXPECT_BUN_RUNTIME=1 in the bun CI step. It runs through the same `bun --bun run test` vitest path as the real suite and asserts process.versions.bun is set, so it fails if the runtime ever silently falls back to node (e.g. the --bun flag stops overriding vitest's node shebang). Skipped everywhere the env var is unset (node CI job, local dev).
77a1671 to
e4a769c
Compare
|
Did some testing of this locally and in a few different happy path scenarios this all seemed to work, that plus the non happy path extensive test coverage makes me fairly confident in this change. |
Updates the node sdk so that
FrameProcessor-based noise cancellation providers can be used directly onAudioStream, without having to go through the agent's RoomIO to be able to initialize itself with credentials.For example, with this change, something like the below becomes possible:
The way this works -
Tracks now keep track of which room they are part of (holding aWeakRefvalue). When the room a track is in changes, it computes new frame processor options and sends these to anyAudioStreams which are associated with the track.The
noiseCancellationLeaveOpenparameter allows the agents sdk to construct anAudioStreamwith a frame processor which remains open across the whole session, and won't be auto-closed when the track is closed.Todo