Skip to content

Yield once to VM per quantum, another thread prio crate, some timings, use condvar instead of crossbeam; no improvements #128

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:

- name: Install ALSA and Jack dependencies
run: |
sudo apt-get update && sudo apt-get install -y libasound2-dev libjack-jackd2-dev
sudo apt-get update && sudo apt-get install -y libasound2-dev libjack-jackd2-dev libdbus-1-dev

- name: Check out repository
uses: actions/checkout@v4
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ version = "0.21.0"
crate-type = ["cdylib"]

[dependencies]
audio_thread_priority = "0.32.0"
crossbeam-channel = "0.5.12"
napi = { version="2.15", features=["napi9", "tokio_rt"] }
napi-derive = { version="2.15" }
Expand Down
3 changes: 3 additions & 0 deletions examples/audio-worklet.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,16 @@ const whiteNoise = new AudioWorkletNode(audioContext, 'white-noise');
whiteNoise.connect(audioContext.destination);

if (TEST_ONLINE) {
var maxPeakLoad = 0.;
audioContext.renderCapacity.addEventListener('update', e => {
const { timestamp, averageLoad, peakLoad, underrunRatio } = e;
console.log('AudioRenderCapacityEvent:', { timestamp, averageLoad, peakLoad, underrunRatio });
maxPeakLoad = Math.max(maxPeakLoad, peakLoad);
});
audioContext.renderCapacity.start({ updateInterval: 1. });

await sleep(8);
console.log('maxPeakLoad', maxPeakLoad);
await audioContext.close();
} else {
const buffer = await audioContext.startRendering();
Expand Down
127 changes: 82 additions & 45 deletions src/audio_worklet_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,22 @@ use std::cell::Cell;
use std::collections::HashMap;
use std::option::Option;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::{Arc, Mutex, OnceLock, RwLock};
use std::sync::{Arc, Condvar, Mutex, OnceLock, RwLock};

use std::time::Instant;

/// Unique ID generator for AudioWorkletProcessors
static INCREMENTING_ID: AtomicU32 = AtomicU32::new(0);

/// Command issued from render thread to the Worker
#[derive(Debug)]
enum WorkletCommand {
Drop(u32),
Process(ProcessorArguments),
}

/// Render thread to Worker processor arguments
#[derive(Debug)]
struct ProcessorArguments {
// processor unique ID
id: u32,
Expand All @@ -47,29 +51,51 @@ struct ProcessorArguments {

/// Message channel from render thread to Worker
struct ProcessCallChannel {
send: Sender<WorkletCommand>,
recv: Receiver<WorkletCommand>,
// queue of worklet commands
command_buffer: Mutex<Vec<WorkletCommand>>,
// Condition Variable to wait/notify on new worklet commands
cond_var: Condvar,
// mark that the worklet has been exited to prevent any further `process` call
exited: Arc<AtomicBool>,
exited: AtomicBool,
}

impl ProcessCallChannel {
fn push(&self, command: WorkletCommand) {
let mut buffer = self.command_buffer.lock().unwrap();
buffer.push(command);
self.cond_var.notify_one();
}

fn try_pop(&self) -> Option<WorkletCommand> {
let mut buffer = self.command_buffer.lock().unwrap();

if buffer.is_empty() {
return None;
}

Some(buffer.remove(0))
}
}

/// Global map of ID -> ProcessCallChannel
///
/// Every (Offline)AudioContext is assigned a new channel + ID. The ID is passed to the
/// AudioWorklet Worker and to every AudioNode in the context so they can grab the channel and use
/// message passing.
static GLOBAL_PROCESS_CALL_CHANNEL_MAP: RwLock<Vec<ProcessCallChannel>> = RwLock::new(vec![]);
static GLOBAL_PROCESS_CALL_CHANNEL_MAP: RwLock<Vec<Arc<ProcessCallChannel>>> = RwLock::new(vec![]);

/// Request a new channel + ID for a newly created (Offline)AudioContext
pub(crate) fn allocate_process_call_channel() -> usize {
// Only one process message can be sent at same time from a given context,
// but Drop messages could be send too, so let's take some room
let (send, recv) = crossbeam_channel::bounded(32);
let command_buffer = Mutex::new(Vec::with_capacity(32));

let channel = ProcessCallChannel {
send,
recv,
exited: Arc::new(AtomicBool::new(false)),
command_buffer,
cond_var: Condvar::new(),
exited: AtomicBool::new(false),
};
let channel = Arc::new(channel);

// We need a write-lock to initialize the channel
let mut write_lock = GLOBAL_PROCESS_CALL_CHANNEL_MAP.write().unwrap();
Expand All @@ -80,27 +106,9 @@ pub(crate) fn allocate_process_call_channel() -> usize {
}

/// Obtain the WorkletCommand sender for this context ID
fn process_call_sender(id: usize) -> Sender<WorkletCommand> {
fn process_call_channel(id: usize) -> Arc<ProcessCallChannel> {
// optimistically assume the channel exists and we can use a shared read-lock
GLOBAL_PROCESS_CALL_CHANNEL_MAP.read().unwrap()[id]
.send
.clone()
}

/// Obtain the WorkletCommand receiver for this context ID
fn process_call_receiver(id: usize) -> Receiver<WorkletCommand> {
// optimistically assume the channel exists and we can use a shared read-lock
GLOBAL_PROCESS_CALL_CHANNEL_MAP.read().unwrap()[id]
.recv
.clone()
}

/// Obtain the WorkletCommand exited flag for this context ID
fn process_call_exited(id: usize) -> Arc<AtomicBool> {
// optimistically assume the channel exists and we can use a shared read-lock
GLOBAL_PROCESS_CALL_CHANNEL_MAP.read().unwrap()[id]
.exited
.clone()
Arc::clone(&GLOBAL_PROCESS_CALL_CHANNEL_MAP.read().unwrap()[id])
}

/// Message channel inside the control thread to pass param descriptors of a given AudioWorkletNode
Expand Down Expand Up @@ -377,13 +385,27 @@ fn process_audio_worklet(env: &Env, processors: &JsObject, args: ProcessorArgume
Ok(())
}

static PREV_START: RwLock<Option<Instant>> = RwLock::new(None);

/// The entry point into Rust from the Worker
#[js_function(2)]
pub(crate) fn run_audio_worklet_global_scope(ctx: CallContext) -> Result<JsUndefined> {
let enter_start = Instant::now();
let mut lock = PREV_START.write().unwrap();
if let Some(prev) = *lock {
let micros = enter_start.duration_since(prev).as_micros();
if micros > 200 {
println!("return to Rust after {} micros", micros);
}
}

// Set thread priority to highest, if not done already
if !HAS_THREAD_PRIO.replace(true) {
// allowed to fail
let _ = thread_priority::set_current_thread_priority(thread_priority::ThreadPriority::Max);
let result = audio_thread_priority::promote_current_thread_to_real_time(
128, 44100, // TODO get sample rate
);
dbg!(&result);
result.ok(); // allowed to fail
}

// Obtain the unique worker ID
Expand All @@ -394,8 +416,15 @@ pub(crate) fn run_audio_worklet_global_scope(ctx: CallContext) -> Result<JsUndef
// Poll for incoming commands and yield back to the event loop if there are none.
// recv_timeout is not an option due to realtime safety, see discussion of
// https://github.com/ircam-ismm/node-web-audio-api/pull/124#pullrequestreview-2053515583
while let Ok(msg) = process_call_receiver(worklet_id).try_recv() {
match msg {
let mut prev = Instant::now();
while let Some(cmd) = process_call_channel(worklet_id).try_pop() {
let now = Instant::now();
let micros = now.duration_since(prev).as_micros();
if micros > 3000 {
println!("got command after {} micros", micros);
}

match cmd {
WorkletCommand::Drop(id) => {
let mut processors = ctx.get::<JsObject>(1)?;
processors.delete_named_property(&id.to_string()).unwrap();
Expand All @@ -404,8 +433,17 @@ pub(crate) fn run_audio_worklet_global_scope(ctx: CallContext) -> Result<JsUndef
process_audio_worklet(ctx.env, &processors, args)?;
}
}

let end = Instant::now();
let micros = end.duration_since(now).as_micros();
if micros > 200 {
println!("handled command after {} micros", micros);
}

prev = now;
}

*lock = Some(Instant::now());
ctx.env.get_undefined()
}

Expand All @@ -414,9 +452,11 @@ pub(crate) fn exit_audio_worklet_global_scope(ctx: CallContext) -> Result<JsUnde
// Obtain the unique worker ID
let worklet_id = ctx.get::<JsNumber>(0)?.get_uint32()? as usize;
// Flag message channel as exited to prevent any other render call
process_call_exited(worklet_id).store(true, Ordering::SeqCst);
process_call_channel(worklet_id)
.exited
.store(true, Ordering::SeqCst);
// Handle any pending message from audio thread
if let Ok(WorkletCommand::Process(args)) = process_call_receiver(worklet_id).try_recv() {
if let Some(WorkletCommand::Process(args)) = process_call_channel(worklet_id).try_pop() {
let _ = args.tail_time_sender.send(false);
}

Expand Down Expand Up @@ -614,8 +654,7 @@ fn constructor(ctx: CallContext) -> Result<JsUndefined> {
let id = INCREMENTING_ID.fetch_add(1, Ordering::Relaxed);
let processor_options = NapiAudioWorkletProcessor {
id,
send: process_call_sender(worklet_id),
exited: process_call_exited(worklet_id),
command_channel: process_call_channel(worklet_id),
tail_time_channel: crossbeam_channel::bounded(1),
param_values: Vec::with_capacity(32),
};
Expand Down Expand Up @@ -706,10 +745,8 @@ audio_node_impl!(NapiAudioWorkletNode);
struct NapiAudioWorkletProcessor {
/// Unique id to pair Napi Worklet and JS processor
id: u32,
/// Sender to the JS Worklet
send: Sender<WorkletCommand>,
/// Flag that marks the JS worklet as exited
exited: Arc<AtomicBool>,
/// Command channel to the JS Worklet
command_channel: Arc<ProcessCallChannel>,
/// tail_time result channel
tail_time_channel: (Sender<bool>, Receiver<bool>),
/// Reusable Vec for AudioParam values
Expand Down Expand Up @@ -739,7 +776,7 @@ impl AudioWorkletProcessor for NapiAudioWorkletProcessor {
scope: &'b AudioWorkletGlobalScope,
) -> bool {
// Early return if audio thread is still closing while worklet has been exited
if self.exited.load(Ordering::SeqCst) {
if self.command_channel.exited.load(Ordering::SeqCst) {
return false;
}

Expand Down Expand Up @@ -773,16 +810,16 @@ impl AudioWorkletProcessor for NapiAudioWorkletProcessor {
};

// send command to Worker
self.send.send(WorkletCommand::Process(item)).unwrap();
self.command_channel.push(WorkletCommand::Process(item));
// await result
self.tail_time_channel.1.recv().unwrap()
}
}

impl Drop for NapiAudioWorkletProcessor {
fn drop(&mut self) {
if !self.exited.load(Ordering::SeqCst) {
self.send.send(WorkletCommand::Drop(self.id)).unwrap();
if !self.command_channel.exited.load(Ordering::SeqCst) {
self.command_channel.push(WorkletCommand::Drop(self.id));
}
}
}
Loading