Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 45 additions & 1 deletion asio-sys/src/bindings/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,14 @@ static BUFFER_CALLBACK: Mutex<Vec<(CallbackId, BufferCallback)>> = Mutex::new(Ve
/// Indicates that ASIOOutputReady should be called
static CALL_OUTPUT_READY: AtomicBool = AtomicBool::new(false);

#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct MessageCallbackId(usize);

struct MessageCallback(Arc<dyn Fn(AsioMessageSelectors) + Send + Sync>);

/// A global registry for ASIO message callbacks.
static MESSAGE_CALLBACKS: Mutex<Vec<(MessageCallbackId, MessageCallback)>> = Mutex::new(Vec::new());

impl Asio {
/// Initialise the ASIO API.
pub fn new() -> Self {
Expand Down Expand Up @@ -743,6 +751,32 @@ impl Driver {
}
}
}

/// Adds a callback to the list of message listeners.
///
/// Returns an ID uniquely associated with the given callback so that it may be removed later.
pub fn add_message_callback<F>(&self, callback: F) -> MessageCallbackId
where
F: Fn(AsioMessageSelectors) + Send + Sync + 'static,
{
let mut mcb = MESSAGE_CALLBACKS.lock().unwrap();
let id = mcb
.last()
.map(|&(id, _)| {
MessageCallbackId(id.0.checked_add(1).expect("MessageCallbackId overflowed"))
})
.unwrap_or(MessageCallbackId(0));

let cb = MessageCallback(Arc::new(callback));
mcb.push((id, cb));
id
}

/// Remove the callback with the given ID.
pub fn remove_message_callback(&self, rem_id: MessageCallbackId) {
let mut mcb = MESSAGE_CALLBACKS.lock().unwrap();
mcb.retain(|&(id, _)| id != rem_id);
}
}

impl DriverState {
Expand Down Expand Up @@ -936,7 +970,17 @@ extern "C" fn asio_message(
// You cannot reset the driver right now, as this code is called from the driver. Reset
// the driver is done by completely destruct it. I.e. ASIOStop(), ASIODisposeBuffers(),
// Destruction. Afterwards you initialize the driver again.
// TODO: Handle this.

// Get the list of active message callbacks.
let callbacks: Vec<_> = {
let lock = MESSAGE_CALLBACKS.lock().unwrap();
lock.iter().map(|(_, cb)| cb.0.clone()).collect()
};
// Release lock and call them.
for cb in callbacks {
cb(AsioMessageSelectors::kAsioResetRequest);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ASIO SDK documentation states: "You cannot reset the driver right now, as this code is called from the driver." - we might want to document that somewhere.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added in docs for the new StreamError variant.

}

1
}

Expand Down
9 changes: 9 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,12 @@ pub enum StreamError {
/// The device no longer exists. This can happen if the device is disconnected while the
/// program is running.
DeviceNotAvailable,
/// The stream configuration is no longer valid and must be rebuilt.
/// This occurs when the backend requires a full reset:
/// - ASIO: Driver reset request (sample rate/format change via control panel)
/// - JACK: Sample rate changed by server
/// - Other backends: Format/configuration invalidated
StreamInvalidated,
/// See the [`BackendSpecificError`] docs for more information about this error variant.
BackendSpecific { err: BackendSpecificError },
}
Expand All @@ -301,6 +307,9 @@ impl Display for StreamError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::BackendSpecific { err } => err.fmt(f),
Self::StreamInvalidated => {
f.write_str("The stream configuration is no longer valid and must be rebuilt.")
}
Self::DeviceNotAvailable => f.write_str(
"The requested device is no longer available. For example, it has been unplugged.",
),
Expand Down
31 changes: 29 additions & 2 deletions src/host/asio/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub struct Stream {
#[allow(dead_code)]
asio_streams: Arc<Mutex<sys::AsioStreams>>,
callback_id: sys::CallbackId,
message_callback_id: sys::MessageCallbackId,
}

// Compile-time assertion that Stream is Send and Sync
Expand All @@ -44,7 +45,7 @@ impl Device {
config: &StreamConfig,
sample_format: SampleFormat,
mut data_callback: D,
_error_callback: E,
error_callback: E,
_timeout: Option<Duration>,
) -> Result<Stream, BuildStreamError>
where
Expand All @@ -60,6 +61,9 @@ impl Device {
return Err(BuildStreamError::StreamConfigNotSupported);
}

// Register the message callback with the driver
let message_callback_id = self.add_message_callback(error_callback);

let num_channels = config.channels;
let buffer_size = self.get_or_create_input_stream(config, sample_format)?;
let cpal_num_samples = buffer_size * num_channels as usize;
Expand Down Expand Up @@ -260,6 +264,7 @@ impl Device {
driver,
asio_streams,
callback_id,
message_callback_id,
})
}

Expand All @@ -268,7 +273,7 @@ impl Device {
config: &StreamConfig,
sample_format: SampleFormat,
mut data_callback: D,
_error_callback: E,
error_callback: E,
_timeout: Option<Duration>,
) -> Result<Stream, BuildStreamError>
where
Expand All @@ -284,6 +289,9 @@ impl Device {
return Err(BuildStreamError::StreamConfigNotSupported);
}

// Register the message callback with the driver
let message_callback_id = self.add_message_callback(error_callback);

let num_channels = config.channels;
let buffer_size = self.get_or_create_output_stream(config, sample_format)?;
let cpal_num_samples = buffer_size * num_channels as usize;
Expand Down Expand Up @@ -534,6 +542,7 @@ impl Device {
driver,
asio_streams,
callback_id,
message_callback_id,
})
}

Expand Down Expand Up @@ -632,11 +641,29 @@ impl Device {
}
}
}

fn add_message_callback<E>(&self, error_callback: E) -> sys::MessageCallbackId
where
E: FnMut(StreamError) + Send + 'static,
{
let error_callback_shared = Arc::new(Mutex::new(error_callback));

self.driver.add_message_callback(move |msg| {
// Check specifically for ResetRequest
if let sys::AsioMessageSelectors::kAsioResetRequest = msg {
if let Ok(mut cb) = error_callback_shared.lock() {
cb(StreamError::StreamInvalidated);
}
}
})
}
}

impl Drop for Stream {
fn drop(&mut self) {
self.driver.remove_callback(self.callback_id);
self.driver
.remove_message_callback(self.message_callback_id);
}
}

Expand Down