diff --git a/apps/l2cap_bridge.py b/apps/l2cap_bridge.py index 17623e4c..83379a0a 100644 --- a/apps/l2cap_bridge.py +++ b/apps/l2cap_bridge.py @@ -105,7 +105,7 @@ def connection_lost(self, exc): asyncio.create_task(self.pipe.l2cap_channel.disconnect()) def data_received(self, data): - print(f'<<< Received on TCP: {len(data)}') + print(color(f'<<< [TCP DATA]: {len(data)} bytes', 'blue')) self.pipe.l2cap_channel.write(data) try: @@ -123,6 +123,7 @@ def data_received(self, data): await self.l2cap_channel.disconnect() def on_l2cap_close(self): + print(color('*** L2CAP channel closed', 'red')) self.l2cap_channel = None if self.tcp_transport is not None: self.tcp_transport.close() diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 024a1a27..bd168dca 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -138,6 +138,7 @@ dependencies = [ "clap 4.4.1", "directories", "env_logger", + "futures", "hex", "itertools", "lazy_static", diff --git a/rust/Cargo.toml b/rust/Cargo.toml index c12709f7..6c38c822 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -15,7 +15,7 @@ rust-version = "1.70.0" [dependencies] pyo3 = { version = "0.18.3", features = ["macros"] } pyo3-asyncio = { version = "0.18.0", features = ["tokio-runtime"] } -tokio = { version = "1.28.2" } +tokio = { version = "1.28.2", features = ["macros", "signal"] } nom = "7.1.3" strum = "0.25.0" strum_macros = "0.25.0" @@ -28,11 +28,12 @@ thiserror = "1.0.41" anyhow = { version = "1.0.71", optional = true } clap = { version = "4.3.3", features = ["derive"], optional = true } directories = { version = "5.0.1", optional = true } +env_logger = { version = "0.10.0", optional = true } +futures = { version = "0.3.28", optional = true } +log = { version = "0.4.19", optional = true } owo-colors = { version = "3.5.0", optional = true } reqwest = { version = "0.11.20", features = ["blocking"], optional = true } rusb = { version = "0.9.2", optional = true } -log = { version = "0.4.19", optional = true } -env_logger = { version = "0.10.0", optional = true } [dev-dependencies] tokio = { version = "1.28.2", features = ["full"] } @@ -72,5 +73,5 @@ anyhow = ["pyo3/anyhow"] pyo3-asyncio-attributes = ["pyo3-asyncio/attributes"] bumble-codegen = ["dep:anyhow"] # separate feature for CLI so that dependencies don't spend time building these -bumble-tools = ["dep:clap", "anyhow", "dep:anyhow", "dep:directories", "pyo3-asyncio-attributes", "dep:owo-colors", "dep:reqwest", "dep:rusb", "dep:log", "dep:env_logger"] -default = [] \ No newline at end of file +bumble-tools = ["dep:clap", "anyhow", "dep:anyhow", "dep:directories", "pyo3-asyncio-attributes", "dep:owo-colors", "dep:reqwest", "dep:rusb", "dep:log", "dep:env_logger", "dep:futures"] +default = [] diff --git a/rust/src/cli/l2cap/client_bridge.rs b/rust/src/cli/l2cap/client_bridge.rs new file mode 100644 index 00000000..37606fc5 --- /dev/null +++ b/rust/src/cli/l2cap/client_bridge.rs @@ -0,0 +1,191 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/// L2CAP CoC client bridge: connects to a BLE device, then waits for an inbound +/// TCP connection on a specified port number. When a TCP client connects, an +/// L2CAP CoC channel connection to the BLE device is established, and the data +/// is bridged in both directions, with flow control. +/// When the TCP connection is closed by the client, the L2CAP CoC channel is +/// disconnected, but the connection to the BLE device remains, ready for a new +/// TCP client to connect. +/// When the L2CAP CoC channel is closed, the TCP connection is closed as well. +use crate::cli::l2cap::{ + proxy_l2cap_rx_to_tcp_tx, proxy_tcp_rx_to_l2cap_tx, run_future_with_current_task_locals, + BridgeData, +}; +use bumble::wrapper::{ + device::{Connection, Device}, + hci::HciConstant, +}; +use futures::executor::block_on; +use owo_colors::OwoColorize; +use pyo3::{PyResult, Python}; +use std::{net::SocketAddr, sync::Arc}; +use tokio::{ + join, + net::{TcpListener, TcpStream}, + sync::{mpsc, Mutex}, +}; + +pub struct Args { + pub psm: u16, + pub max_credits: Option, + pub mtu: Option, + pub mps: Option, + pub bluetooth_address: String, + pub tcp_host: String, + pub tcp_port: u16, +} + +pub async fn start(args: &Args, device: &mut Device) -> PyResult<()> { + println!( + "{}", + format!("### Connecting to {}...", args.bluetooth_address).yellow() + ); + let mut ble_connection = device.connect(&args.bluetooth_address).await?; + ble_connection.on_disconnection(|_py, reason| { + let disconnection_info = match HciConstant::error_name(reason) { + Ok(info_string) => info_string, + Err(py_err) => format!("failed to get disconnection error name ({})", py_err), + }; + println!( + "{} {}", + "@@@ Bluetooth disconnection: ".red(), + disconnection_info, + ); + Ok(()) + })?; + + // Start the TCP server. + let listener = TcpListener::bind(format!("{}:{}", args.tcp_host, args.tcp_port)) + .await + .expect("failed to bind tcp to address"); + println!( + "{}", + format!( + "### Listening for TCP connections on port {}", + args.tcp_port + ) + .magenta() + ); + + let psm = args.psm; + let max_credits = args.max_credits; + let mtu = args.mtu; + let mps = args.mps; + let ble_connection = Arc::new(Mutex::new(ble_connection)); + // Ensure Python event loop is available to l2cap `disconnect` + let _ = run_future_with_current_task_locals(async move { + while let Ok((tcp_stream, addr)) = listener.accept().await { + let ble_connection = ble_connection.clone(); + let _ = run_future_with_current_task_locals(proxy_data_between_tcp_and_l2cap( + ble_connection, + tcp_stream, + addr, + psm, + max_credits, + mtu, + mps, + )); + } + Ok(()) + }); + Ok(()) +} + +async fn proxy_data_between_tcp_and_l2cap( + ble_connection: Arc>, + tcp_stream: TcpStream, + addr: SocketAddr, + psm: u16, + max_credits: Option, + mtu: Option, + mps: Option, +) -> PyResult<()> { + println!("{}", format!("<<< TCP connection from {}", addr).magenta()); + println!( + "{}", + format!(">>> Opening L2CAP channel on PSM = {}", psm).yellow() + ); + + let mut l2cap_channel = match ble_connection + .lock() + .await + .open_l2cap_channel(psm, max_credits, mtu, mps) + .await + { + Ok(channel) => channel, + Err(e) => { + println!("{}", format!("!!! Connection failed: {e}").red()); + // TCP stream will get dropped after returning, automatically shutting it down. + return Err(e); + } + }; + let channel_info = l2cap_channel + .debug_string() + .unwrap_or_else(|e| format!("failed to get l2cap channel info ({e})")); + + println!("{}{}", "*** L2CAP channel: ".cyan(), channel_info); + + let (l2cap_to_tcp_tx, l2cap_to_tcp_rx) = mpsc::channel::(10); + + // Set l2cap callback (`set_sink`) for when data is received. + let l2cap_to_tcp_tx_clone = l2cap_to_tcp_tx.clone(); + l2cap_channel + .set_sink(move |_py, sdu| { + block_on(l2cap_to_tcp_tx_clone.send(BridgeData::Data(sdu.into()))) + .expect("failed to channel data to tcp"); + Ok(()) + }) + .expect("failed to set sink for l2cap connection"); + + // Set l2cap callback for when the channel is closed. + l2cap_channel + .on_close(move |_py| { + println!("{}", "*** L2CAP channel closed".red()); + block_on(l2cap_to_tcp_tx.send(BridgeData::CloseSignal)) + .expect("failed to channel close signal to tcp"); + Ok(()) + }) + .expect("failed to set on_close callback for l2cap channel"); + + let l2cap_channel = Arc::new(Mutex::new(Some(l2cap_channel))); + let (tcp_reader, tcp_writer) = tcp_stream.into_split(); + + // Do tcp stuff when something happens on the l2cap channel. + let handle_l2cap_data_future = + proxy_l2cap_rx_to_tcp_tx(l2cap_to_tcp_rx, tcp_writer, l2cap_channel.clone()); + + // Do l2cap stuff when something happens on tcp. + let handle_tcp_data_future = proxy_tcp_rx_to_l2cap_tx(tcp_reader, l2cap_channel.clone(), true); + + let (handle_l2cap_result, handle_tcp_result) = + join!(handle_l2cap_data_future, handle_tcp_data_future); + + if let Err(e) = handle_l2cap_result { + println!("!!! Error: {e}"); + } + + if let Err(e) = handle_tcp_result { + println!("!!! Error: {e}"); + } + + Python::with_gil(|_| { + // Must hold GIL at least once while/after dropping for Python heap object to ensure + // de-allocation. + drop(l2cap_channel); + }); + + Ok(()) +} diff --git a/rust/src/cli/l2cap/mod.rs b/rust/src/cli/l2cap/mod.rs new file mode 100644 index 00000000..31097edb --- /dev/null +++ b/rust/src/cli/l2cap/mod.rs @@ -0,0 +1,190 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Rust version of the Python `l2cap_bridge.py` found under the `apps` folder. + +use crate::L2cap; +use anyhow::anyhow; +use bumble::wrapper::{device::Device, l2cap::LeConnectionOrientedChannel, transport::Transport}; +use owo_colors::{colors::css::Orange, OwoColorize}; +use pyo3::{PyObject, PyResult, Python}; +use std::{future::Future, path::PathBuf, sync::Arc}; +use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + net::tcp::{OwnedReadHalf, OwnedWriteHalf}, + sync::{mpsc::Receiver, Mutex}, +}; + +mod client_bridge; +mod server_bridge; + +pub(crate) async fn run( + command: L2cap, + device_config: PathBuf, + transport: String, + psm: u16, + max_credits: Option, + mtu: Option, + mps: Option, +) -> PyResult<()> { + println!("<<< connecting to HCI..."); + let transport = Transport::open(transport).await?; + println!("<<< connected"); + + let mut device = + Device::from_config_file_with_hci(&device_config, transport.source()?, transport.sink()?)?; + + device.power_on().await?; + + match command { + L2cap::Server { tcp_host, tcp_port } => { + let args = server_bridge::Args { + psm, + max_credits, + mtu, + mps, + tcp_host, + tcp_port, + }; + + server_bridge::start(&args, &mut device).await? + } + L2cap::Client { + bluetooth_address, + tcp_host, + tcp_port, + } => { + let args = client_bridge::Args { + psm, + max_credits, + mtu, + mps, + bluetooth_address, + tcp_host, + tcp_port, + }; + + client_bridge::start(&args, &mut device).await? + } + }; + + // wait until user kills the process + tokio::signal::ctrl_c().await?; + + Ok(()) +} + +/// Used for channeling data from Python callbacks to a Rust consumer. +enum BridgeData { + Data(Vec), + CloseSignal, +} + +async fn proxy_l2cap_rx_to_tcp_tx( + mut l2cap_data_receiver: Receiver, + mut tcp_writer: OwnedWriteHalf, + l2cap_channel: Arc>>, +) -> anyhow::Result<()> { + while let Some(bridge_data) = l2cap_data_receiver.recv().await { + match bridge_data { + BridgeData::Data(sdu) => { + println!("{}", format!("<<< [L2CAP SDU]: {} bytes", sdu.len()).cyan()); + tcp_writer + .write_all(sdu.as_ref()) + .await + .map_err(|_| anyhow!("Failed to write to tcp stream"))?; + tcp_writer + .flush() + .await + .map_err(|_| anyhow!("Failed to flush tcp stream"))?; + } + BridgeData::CloseSignal => { + l2cap_channel.lock().await.take(); + tcp_writer + .shutdown() + .await + .map_err(|_| anyhow!("Failed to shut down write half of tcp stream"))?; + return Ok(()); + } + } + } + Ok(()) +} + +async fn proxy_tcp_rx_to_l2cap_tx( + mut tcp_reader: OwnedReadHalf, + l2cap_channel: Arc>>, + drain_l2cap_after_write: bool, +) -> PyResult<()> { + let mut buf = [0; 4096]; + loop { + match tcp_reader.read(&mut buf).await { + Ok(len) => { + if len == 0 { + println!("{}", "!!! End of stream".fg::()); + + if let Some(mut channel) = l2cap_channel.lock().await.take() { + channel.disconnect().await.map_err(|e| { + eprintln!("Failed to call disconnect on l2cap channel: {e}"); + e + })?; + } + return Ok(()); + } + + println!("{}", format!("<<< [TCP DATA]: {len} bytes").blue()); + match l2cap_channel.lock().await.as_mut() { + None => { + println!("{}", "!!! L2CAP channel not connected, dropping".red()); + return Ok(()); + } + Some(channel) => { + channel.write(&buf[..len])?; + if drain_l2cap_after_write { + channel.drain().await?; + } + } + } + } + Err(e) => { + println!("{}", format!("!!! TCP connection lost: {}", e).red()); + if let Some(mut channel) = l2cap_channel.lock().await.take() { + let _ = channel.disconnect().await.map_err(|e| { + eprintln!("Failed to call disconnect on l2cap channel: {e}"); + }); + } + return Err(e.into()); + } + } + } +} + +/// Copies the current thread's TaskLocals into a Python "awaitable" and encapsulates it in a Rust +/// future, running it as a Python Task. +/// `TaskLocals` stores the current event loop, and allows the user to copy the current Python +/// context if necessary. In this case, the python event loop is used when calling `disconnect` on +/// an l2cap connection, or else the call will fail. +pub fn run_future_with_current_task_locals( + fut: F, +) -> PyResult> + Send> +where + F: Future> + Send + 'static, +{ + Python::with_gil(|py| { + let locals = pyo3_asyncio::tokio::get_current_locals(py)?; + let future = pyo3_asyncio::tokio::scope(locals.clone(), fut); + pyo3_asyncio::tokio::future_into_py_with_locals(py, locals, future) + .and_then(pyo3_asyncio::tokio::into_future) + }) +} diff --git a/rust/src/cli/l2cap/server_bridge.rs b/rust/src/cli/l2cap/server_bridge.rs new file mode 100644 index 00000000..3a32db92 --- /dev/null +++ b/rust/src/cli/l2cap/server_bridge.rs @@ -0,0 +1,205 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/// L2CAP CoC server bridge: waits for a peer to connect an L2CAP CoC channel +/// on a specified PSM. When the connection is made, the bridge connects a TCP +/// socket to a remote host and bridges the data in both directions, with flow +/// control. +/// When the L2CAP CoC channel is closed, the bridge disconnects the TCP socket +/// and waits for a new L2CAP CoC channel to be connected. +/// When the TCP connection is closed by the TCP server, the L2CAP connection is closed as well. +use crate::cli::l2cap::{ + proxy_l2cap_rx_to_tcp_tx, proxy_tcp_rx_to_l2cap_tx, run_future_with_current_task_locals, + BridgeData, +}; +use bumble::wrapper::{device::Device, hci::HciConstant, l2cap::LeConnectionOrientedChannel}; +use futures::executor::block_on; +use owo_colors::OwoColorize; +use pyo3::{PyResult, Python}; +use std::{sync::Arc, time::Duration}; +use tokio::{ + join, + net::TcpStream, + select, + sync::{mpsc, Mutex}, +}; + +pub struct Args { + pub psm: u16, + pub max_credits: Option, + pub mtu: Option, + pub mps: Option, + pub tcp_host: String, + pub tcp_port: u16, +} + +pub async fn start(args: &Args, device: &mut Device) -> PyResult<()> { + let host = args.tcp_host.clone(); + let port = args.tcp_port; + device.register_l2cap_channel_server( + args.psm, + move |_py, l2cap_channel| { + let channel_info = l2cap_channel + .debug_string() + .unwrap_or_else(|e| format!("failed to get l2cap channel info ({e})")); + println!("{} {channel_info}", "*** L2CAP channel:".cyan()); + + let host = host.clone(); + // Ensure Python event loop is available to l2cap `disconnect` + let _ = run_future_with_current_task_locals(proxy_data_between_l2cap_and_tcp( + l2cap_channel, + host, + port, + )); + Ok(()) + }, + args.max_credits, + args.mtu, + args.mps, + )?; + + println!( + "{}", + format!("### Listening for CoC connection on PSM {}", args.psm).yellow() + ); + + device.on_connection(|_py, mut connection| { + let connection_info = connection + .debug_string() + .unwrap_or_else(|e| format!("failed to get connection info ({e})")); + println!( + "{} {}", + "@@@ Bluetooth connection: ".green(), + connection_info, + ); + connection.on_disconnection(|_py, reason| { + let disconnection_info = match HciConstant::error_name(reason) { + Ok(info_string) => info_string, + Err(py_err) => format!("failed to get disconnection error name ({})", py_err), + }; + println!( + "{} {}", + "@@@ Bluetooth disconnection: ".red(), + disconnection_info, + ); + Ok(()) + })?; + Ok(()) + })?; + + device.start_advertising(false).await?; + + Ok(()) +} + +async fn proxy_data_between_l2cap_and_tcp( + mut l2cap_channel: LeConnectionOrientedChannel, + tcp_host: String, + tcp_port: u16, +) -> PyResult<()> { + let (l2cap_to_tcp_tx, mut l2cap_to_tcp_rx) = mpsc::channel::(10); + + // Set callback (`set_sink`) for when l2cap data is received. + let l2cap_to_tcp_tx_clone = l2cap_to_tcp_tx.clone(); + l2cap_channel + .set_sink(move |_py, sdu| { + block_on(l2cap_to_tcp_tx_clone.send(BridgeData::Data(sdu.into()))) + .expect("failed to channel data to tcp"); + Ok(()) + }) + .expect("failed to set sink for l2cap connection"); + + // Set l2cap callback for when the channel is closed. + l2cap_channel + .on_close(move |_py| { + println!("{}", "*** L2CAP channel closed".red()); + block_on(l2cap_to_tcp_tx.send(BridgeData::CloseSignal)) + .expect("failed to channel close signal to tcp"); + Ok(()) + }) + .expect("failed to set on_close callback for l2cap channel"); + + println!( + "{}", + format!("### Connecting to TCP {tcp_host}:{tcp_port}...").yellow() + ); + + let l2cap_channel = Arc::new(Mutex::new(Some(l2cap_channel))); + let tcp_stream = match TcpStream::connect(format!("{tcp_host}:{tcp_port}")).await { + Ok(stream) => { + println!("{}", "### Connected".green()); + Some(stream) + } + Err(err) => { + println!("{}", format!("!!! Connection failed: {err}").red()); + if let Some(mut channel) = l2cap_channel.lock().await.take() { + // Bumble might enter an invalid state if disconnection request is received from + // l2cap client before receiving a disconnection response from the same client, + // blocking this async call from returning. + // See: https://github.com/google/bumble/issues/257 + select! { + res = channel.disconnect() => { + let _ = res.map_err(|e| eprintln!("Failed to call disconnect on l2cap channel: {e}")); + }, + _ = tokio::time::sleep(Duration::from_secs(1)) => eprintln!("Timed out while calling disconnect on l2cap channel."), + } + } + None + } + }; + + match tcp_stream { + None => { + while let Some(bridge_data) = l2cap_to_tcp_rx.recv().await { + match bridge_data { + BridgeData::Data(sdu) => { + println!("{}", format!("<<< [L2CAP SDU]: {} bytes", sdu.len()).cyan()); + println!("{}", "!!! TCP socket not open, dropping".red()) + } + BridgeData::CloseSignal => break, + } + } + } + Some(tcp_stream) => { + let (tcp_reader, tcp_writer) = tcp_stream.into_split(); + + // Do tcp stuff when something happens on the l2cap channel. + let handle_l2cap_data_future = + proxy_l2cap_rx_to_tcp_tx(l2cap_to_tcp_rx, tcp_writer, l2cap_channel.clone()); + + // Do l2cap stuff when something happens on tcp. + let handle_tcp_data_future = + proxy_tcp_rx_to_l2cap_tx(tcp_reader, l2cap_channel.clone(), false); + + let (handle_l2cap_result, handle_tcp_result) = + join!(handle_l2cap_data_future, handle_tcp_data_future); + + if let Err(e) = handle_l2cap_result { + println!("!!! Error: {e}"); + } + + if let Err(e) = handle_tcp_result { + println!("!!! Error: {e}"); + } + } + }; + + Python::with_gil(|_| { + // Must hold GIL at least once while/after dropping for Python heap object to ensure + // de-allocation. + drop(l2cap_channel); + }); + + Ok(()) +} diff --git a/rust/src/cli/mod.rs b/rust/src/cli/mod.rs index 2648e125..e58f88c7 100644 --- a/rust/src/cli/mod.rs +++ b/rust/src/cli/mod.rs @@ -15,3 +15,5 @@ pub(crate) mod firmware; pub(crate) mod usb; + +pub(crate) mod l2cap; diff --git a/rust/src/main.rs b/rust/src/main.rs index f8401e93..c21f4c85 100644 --- a/rust/src/main.rs +++ b/rust/src/main.rs @@ -49,6 +49,26 @@ async fn main() -> PyResult<()> { Realtek::Parse { firmware_path } => cli::firmware::rtk::parse(&firmware_path)?, }, }, + Subcommand::L2cap { + subcommand, + device_config, + transport, + psm, + l2cap_coc_max_credits, + l2cap_coc_mtu, + l2cap_coc_mps, + } => { + cli::l2cap::run( + subcommand, + device_config, + transport, + psm, + l2cap_coc_max_credits, + l2cap_coc_mtu, + l2cap_coc_mps, + ) + .await? + } Subcommand::Usb { subcommand } => match subcommand { Usb::Probe(probe) => cli::usb::probe(probe.verbose)?, }, @@ -70,6 +90,46 @@ enum Subcommand { #[clap(subcommand)] subcommand: Firmware, }, + /// L2cap client/server operations + L2cap { + #[command(subcommand)] + subcommand: L2cap, + + /// Device configuration file. + /// + /// See, for instance, `examples/device1.json` in the Python project. + #[arg(long)] + device_config: path::PathBuf, + /// Bumble transport spec. + /// + /// + #[arg(long)] + transport: String, + + /// PSM for L2CAP Connection-oriented Channel. + /// + /// Must be in the range [0, 65535]. + #[arg(long)] + psm: u16, + + /// Maximum L2CAP CoC Credits. When not specified, lets Bumble set the default. + /// + /// Must be in the range [1, 65535]. + #[arg(long, value_parser = clap::value_parser!(u16).range(1..))] + l2cap_coc_max_credits: Option, + + /// L2CAP CoC MTU. When not specified, lets Bumble set the default. + /// + /// Must be in the range [23, 65535]. + #[arg(long, value_parser = clap::value_parser!(u16).range(23..))] + l2cap_coc_mtu: Option, + + /// L2CAP CoC MPS. When not specified, lets Bumble set the default. + /// + /// Must be in the range [23, 65535]. + #[arg(long, value_parser = clap::value_parser!(u16).range(23..))] + l2cap_coc_mps: Option, + }, /// USB operations Usb { #[clap(subcommand)] @@ -165,6 +225,38 @@ impl fmt::Display for Source { } } +#[derive(clap::Subcommand, Debug, Clone)] +enum L2cap { + /// Starts an L2CAP server + Server { + /// TCP host that the l2cap server will connect to. + /// Data is bridged like so: + /// TCP server <-> (TCP client / **L2CAP server**) <-> (L2CAP client / TCP server) <-> TCP client + #[arg(long, default_value = "localhost")] + tcp_host: String, + /// TCP port that the server will connect to. + /// + /// Must be in the range [1, 65535]. + #[arg(long, default_value_t = 9544)] + tcp_port: u16, + }, + /// Starts an L2CAP client + Client { + /// L2cap server address that this l2cap client will connect to. + bluetooth_address: String, + /// TCP host that the l2cap client will bind to and listen for incoming TCP connections. + /// Data is bridged like so: + /// TCP client <-> (TCP server / **L2CAP client**) <-> (L2CAP server / TCP client) <-> TCP server + #[arg(long, default_value = "localhost")] + tcp_host: String, + /// TCP port that the client will connect to. + /// + /// Must be in the range [1, 65535]. + #[arg(long, default_value_t = 9543)] + tcp_port: u16, + }, +} + #[derive(clap::Subcommand, Debug, Clone)] enum Usb { /// Probe the USB bus for Bluetooth devices diff --git a/rust/src/wrapper/device.rs b/rust/src/wrapper/device.rs index 11770a31..be5e4fa0 100644 --- a/rust/src/wrapper/device.rs +++ b/rust/src/wrapper/device.rs @@ -19,16 +19,17 @@ use crate::{ wrapper::{ core::AdvertisingData, gatt_client::{ProfileServiceProxy, ServiceProxy}, - hci::Address, + hci::{Address, HciErrorCode}, host::Host, + l2cap::LeConnectionOrientedChannel, transport::{Sink, Source}, - ClosureCallback, PyObjectExt, + ClosureCallback, PyDictExt, PyObjectExt, }, }; use pyo3::{ intern, types::{PyDict, PyModule}, - PyObject, PyResult, Python, ToPyObject, + IntoPy, PyObject, PyResult, Python, ToPyObject, }; use pyo3_asyncio::tokio::into_future; use std::path; @@ -87,6 +88,22 @@ impl Device { .map(Connection) } + /// Register a callback to be called for each incoming connection. + pub fn on_connection( + &mut self, + callback: impl Fn(Python, Connection) -> PyResult<()> + Send + 'static, + ) -> PyResult<()> { + let boxed = ClosureCallback::new(move |py, args, _kwargs| { + callback(py, Connection(args.get_item(0)?.into())) + }); + + Python::with_gil(|py| { + self.0 + .call_method1(py, intern!(py, "add_listener"), ("connection", boxed)) + }) + .map(|_| ()) + } + /// Start scanning pub async fn start_scanning(&self, filter_duplicates: bool) -> PyResult<()> { Python::with_gil(|py| { @@ -161,11 +178,109 @@ impl Device { .await .map(|_| ()) } + + /// Registers an L2CAP connection oriented channel server. When a client connects to the server, + /// the `server` callback is passed a handle to the established channel. When optional arguments + /// are not specified, the Python module specifies the defaults. + pub fn register_l2cap_channel_server( + &mut self, + psm: u16, + server: impl Fn(Python, LeConnectionOrientedChannel) -> PyResult<()> + Send + 'static, + max_credits: Option, + mtu: Option, + mps: Option, + ) -> PyResult<()> { + Python::with_gil(|py| { + let boxed = ClosureCallback::new(move |py, args, _kwargs| { + server( + py, + LeConnectionOrientedChannel::from(args.get_item(0)?.into()), + ) + }); + + let kwargs = PyDict::new(py); + kwargs.set_item("psm", psm)?; + kwargs.set_item("server", boxed.into_py(py))?; + kwargs.set_opt_item("max_credits", max_credits)?; + kwargs.set_opt_item("mtu", mtu)?; + kwargs.set_opt_item("mps", mps)?; + self.0.call_method( + py, + intern!(py, "register_l2cap_channel_server"), + (), + Some(kwargs), + ) + })?; + Ok(()) + } } /// A connection to a remote device. pub struct Connection(PyObject); +impl Connection { + /// Open an L2CAP channel using this connection. When optional arguments are not specified, the + /// Python module specifies the defaults. + pub async fn open_l2cap_channel( + &mut self, + psm: u16, + max_credits: Option, + mtu: Option, + mps: Option, + ) -> PyResult { + Python::with_gil(|py| { + let kwargs = PyDict::new(py); + kwargs.set_item("psm", psm)?; + kwargs.set_opt_item("max_credits", max_credits)?; + kwargs.set_opt_item("mtu", mtu)?; + kwargs.set_opt_item("mps", mps)?; + self.0 + .call_method(py, intern!(py, "open_l2cap_channel"), (), Some(kwargs)) + .and_then(|coroutine| pyo3_asyncio::tokio::into_future(coroutine.as_ref(py))) + })? + .await + .map(LeConnectionOrientedChannel::from) + } + + /// Disconnect from device with provided reason. When optional arguments are not specified, the + /// Python module specifies the defaults. + pub async fn disconnect(&mut self, reason: Option) -> PyResult<()> { + Python::with_gil(|py| { + let kwargs = PyDict::new(py); + kwargs.set_opt_item("reason", reason)?; + self.0 + .call_method(py, intern!(py, "disconnect"), (), Some(kwargs)) + .and_then(|coroutine| pyo3_asyncio::tokio::into_future(coroutine.as_ref(py))) + })? + .await + .map(|_| ()) + } + + /// Register a callback to be called on disconnection. + pub fn on_disconnection( + &mut self, + callback: impl Fn(Python, HciErrorCode) -> PyResult<()> + Send + 'static, + ) -> PyResult<()> { + let boxed = ClosureCallback::new(move |py, args, _kwargs| { + callback(py, args.get_item(0)?.extract()?) + }); + + Python::with_gil(|py| { + self.0 + .call_method1(py, intern!(py, "add_listener"), ("disconnection", boxed)) + }) + .map(|_| ()) + } + + /// Returns some information about the connection as a [String]. + pub fn debug_string(&self) -> PyResult { + Python::with_gil(|py| { + let str_obj = self.0.call_method0(py, intern!(py, "__str__"))?; + str_obj.gil_ref(py).extract() + }) + } +} + /// The other end of a connection pub struct Peer(PyObject); diff --git a/rust/src/wrapper/hci.rs b/rust/src/wrapper/hci.rs index 48f7dc11..41dcbf34 100644 --- a/rust/src/wrapper/hci.rs +++ b/rust/src/wrapper/hci.rs @@ -15,7 +15,40 @@ //! HCI use itertools::Itertools as _; -use pyo3::{exceptions::PyException, intern, types::PyModule, PyErr, PyObject, PyResult, Python}; +use pyo3::{ + exceptions::PyException, intern, types::PyModule, FromPyObject, PyAny, PyErr, PyObject, + PyResult, Python, ToPyObject, +}; + +/// HCI error code. +pub struct HciErrorCode(u8); + +impl<'source> FromPyObject<'source> for HciErrorCode { + fn extract(ob: &'source PyAny) -> PyResult { + Ok(HciErrorCode(ob.extract()?)) + } +} + +impl ToPyObject for HciErrorCode { + fn to_object(&self, py: Python<'_>) -> PyObject { + self.0.to_object(py) + } +} + +/// Provides helpers for interacting with HCI +pub struct HciConstant; + +impl HciConstant { + /// Human-readable error name + pub fn error_name(status: HciErrorCode) -> PyResult { + Python::with_gil(|py| { + PyModule::import(py, intern!(py, "bumble.hci"))? + .getattr(intern!(py, "HCI_Constant"))? + .call_method1(intern!(py, "error_name"), (status.0,))? + .extract() + }) + } +} /// A Bluetooth address pub struct Address(pub(crate) PyObject); diff --git a/rust/src/wrapper/l2cap.rs b/rust/src/wrapper/l2cap.rs new file mode 100644 index 00000000..5e0752e1 --- /dev/null +++ b/rust/src/wrapper/l2cap.rs @@ -0,0 +1,92 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! L2CAP + +use crate::wrapper::{ClosureCallback, PyObjectExt}; +use pyo3::{intern, PyObject, PyResult, Python}; + +/// L2CAP connection-oriented channel +pub struct LeConnectionOrientedChannel(PyObject); + +impl LeConnectionOrientedChannel { + /// Create a LeConnectionOrientedChannel that wraps the provided obj. + pub(crate) fn from(obj: PyObject) -> Self { + Self(obj) + } + + /// Queues data to be automatically sent across this channel. + pub fn write(&mut self, data: &[u8]) -> PyResult<()> { + Python::with_gil(|py| self.0.call_method1(py, intern!(py, "write"), (data,))).map(|_| ()) + } + + /// Wait for queued data to be sent on this channel. + pub async fn drain(&mut self) -> PyResult<()> { + Python::with_gil(|py| { + self.0 + .call_method0(py, intern!(py, "drain")) + .and_then(|coroutine| pyo3_asyncio::tokio::into_future(coroutine.as_ref(py))) + })? + .await + .map(|_| ()) + } + + /// Register a callback to be called when the channel is closed. + pub fn on_close( + &mut self, + callback: impl Fn(Python) -> PyResult<()> + Send + 'static, + ) -> PyResult<()> { + let boxed = ClosureCallback::new(move |py, _args, _kwargs| callback(py)); + + Python::with_gil(|py| { + self.0 + .call_method1(py, intern!(py, "add_listener"), ("close", boxed)) + }) + .map(|_| ()) + } + + /// Register a callback to be called when the channel receives data. + pub fn set_sink( + &mut self, + callback: impl Fn(Python, &[u8]) -> PyResult<()> + Send + 'static, + ) -> PyResult<()> { + let boxed = ClosureCallback::new(move |py, args, _kwargs| { + callback(py, args.get_item(0)?.extract()?) + }); + Python::with_gil(|py| self.0.setattr(py, intern!(py, "sink"), boxed)).map(|_| ()) + } + + /// Disconnect the l2cap channel. + /// Must be called from a thread with a Python event loop, which should be true on + /// `tokio::main` and `async_std::main`. + /// + /// For more info, see https://awestlake87.github.io/pyo3-asyncio/master/doc/pyo3_asyncio/#event-loop-references-and-contextvars. + pub async fn disconnect(&mut self) -> PyResult<()> { + Python::with_gil(|py| { + self.0 + .call_method0(py, intern!(py, "disconnect")) + .and_then(|coroutine| pyo3_asyncio::tokio::into_future(coroutine.as_ref(py))) + })? + .await + .map(|_| ()) + } + + /// Returns some information about the channel as a [String]. + pub fn debug_string(&self) -> PyResult { + Python::with_gil(|py| { + let str_obj = self.0.call_method0(py, intern!(py, "__str__"))?; + str_obj.gil_ref(py).extract() + }) + } +} diff --git a/rust/src/wrapper/mod.rs b/rust/src/wrapper/mod.rs index cb0730b1..94ac15a6 100644 --- a/rust/src/wrapper/mod.rs +++ b/rust/src/wrapper/mod.rs @@ -31,11 +31,11 @@ pub use pyo3_asyncio; pub mod assigned_numbers; pub mod core; pub mod device; - pub mod drivers; pub mod gatt_client; pub mod hci; pub mod host; +pub mod l2cap; pub mod logging; pub mod profile; pub mod transport; @@ -71,6 +71,21 @@ impl PyObjectExt for PyObject { } } +/// Convenience extensions to [PyDict] +pub trait PyDictExt { + /// Set item in dict only if value is Some, otherwise do nothing. + fn set_opt_item(&self, key: K, value: Option) -> PyResult<()>; +} + +impl PyDictExt for PyDict { + fn set_opt_item(&self, key: K, value: Option) -> PyResult<()> { + if let Some(value) = value { + self.set_item(key, value)? + } + Ok(()) + } +} + /// Wrapper to make Rust closures ([Fn] implementations) callable from Python. /// /// The Python callable form returns a Python `None`.