Skip to content

Commit

Permalink
Merge pull request #262 from whitevegagabriel/l2cap
Browse files Browse the repository at this point in the history
Port l2cap_bridge sample to Rust
  • Loading branch information
barbibulle authored Sep 7, 2023
2 parents fd4d1bc + 9732eb8 commit 9303f4f
Show file tree
Hide file tree
Showing 12 changed files with 949 additions and 11 deletions.
3 changes: 2 additions & 1 deletion apps/l2cap_bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def connection_lost(self, exc):
asyncio.create_task(self.pipe.l2cap_channel.disconnect())

def data_received(self, data):
print(f'<<< Received on TCP: {len(data)}')
print(color(f'<<< [TCP DATA]: {len(data)} bytes', 'blue'))
self.pipe.l2cap_channel.write(data)

try:
Expand All @@ -123,6 +123,7 @@ def data_received(self, data):
await self.l2cap_channel.disconnect()

def on_l2cap_close(self):
print(color('*** L2CAP channel closed', 'red'))
self.l2cap_channel = None
if self.tcp_transport is not None:
self.tcp_transport.close()
Expand Down
1 change: 1 addition & 0 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 6 additions & 5 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ rust-version = "1.70.0"
[dependencies]
pyo3 = { version = "0.18.3", features = ["macros"] }
pyo3-asyncio = { version = "0.18.0", features = ["tokio-runtime"] }
tokio = { version = "1.28.2" }
tokio = { version = "1.28.2", features = ["macros", "signal"] }
nom = "7.1.3"
strum = "0.25.0"
strum_macros = "0.25.0"
Expand All @@ -28,11 +28,12 @@ thiserror = "1.0.41"
anyhow = { version = "1.0.71", optional = true }
clap = { version = "4.3.3", features = ["derive"], optional = true }
directories = { version = "5.0.1", optional = true }
env_logger = { version = "0.10.0", optional = true }
futures = { version = "0.3.28", optional = true }
log = { version = "0.4.19", optional = true }
owo-colors = { version = "3.5.0", optional = true }
reqwest = { version = "0.11.20", features = ["blocking"], optional = true }
rusb = { version = "0.9.2", optional = true }
log = { version = "0.4.19", optional = true }
env_logger = { version = "0.10.0", optional = true }

[dev-dependencies]
tokio = { version = "1.28.2", features = ["full"] }
Expand Down Expand Up @@ -72,5 +73,5 @@ anyhow = ["pyo3/anyhow"]
pyo3-asyncio-attributes = ["pyo3-asyncio/attributes"]
bumble-codegen = ["dep:anyhow"]
# separate feature for CLI so that dependencies don't spend time building these
bumble-tools = ["dep:clap", "anyhow", "dep:anyhow", "dep:directories", "pyo3-asyncio-attributes", "dep:owo-colors", "dep:reqwest", "dep:rusb", "dep:log", "dep:env_logger"]
default = []
bumble-tools = ["dep:clap", "anyhow", "dep:anyhow", "dep:directories", "pyo3-asyncio-attributes", "dep:owo-colors", "dep:reqwest", "dep:rusb", "dep:log", "dep:env_logger", "dep:futures"]
default = []
191 changes: 191 additions & 0 deletions rust/src/cli/l2cap/client_bridge.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/// L2CAP CoC client bridge: connects to a BLE device, then waits for an inbound
/// TCP connection on a specified port number. When a TCP client connects, an
/// L2CAP CoC channel connection to the BLE device is established, and the data
/// is bridged in both directions, with flow control.
/// When the TCP connection is closed by the client, the L2CAP CoC channel is
/// disconnected, but the connection to the BLE device remains, ready for a new
/// TCP client to connect.
/// When the L2CAP CoC channel is closed, the TCP connection is closed as well.
use crate::cli::l2cap::{
proxy_l2cap_rx_to_tcp_tx, proxy_tcp_rx_to_l2cap_tx, run_future_with_current_task_locals,
BridgeData,
};
use bumble::wrapper::{
device::{Connection, Device},
hci::HciConstant,
};
use futures::executor::block_on;
use owo_colors::OwoColorize;
use pyo3::{PyResult, Python};
use std::{net::SocketAddr, sync::Arc};
use tokio::{
join,
net::{TcpListener, TcpStream},
sync::{mpsc, Mutex},
};

pub struct Args {
pub psm: u16,
pub max_credits: Option<u16>,
pub mtu: Option<u16>,
pub mps: Option<u16>,
pub bluetooth_address: String,
pub tcp_host: String,
pub tcp_port: u16,
}

pub async fn start(args: &Args, device: &mut Device) -> PyResult<()> {
println!(
"{}",
format!("### Connecting to {}...", args.bluetooth_address).yellow()
);
let mut ble_connection = device.connect(&args.bluetooth_address).await?;
ble_connection.on_disconnection(|_py, reason| {
let disconnection_info = match HciConstant::error_name(reason) {
Ok(info_string) => info_string,
Err(py_err) => format!("failed to get disconnection error name ({})", py_err),
};
println!(
"{} {}",
"@@@ Bluetooth disconnection: ".red(),
disconnection_info,
);
Ok(())
})?;

// Start the TCP server.
let listener = TcpListener::bind(format!("{}:{}", args.tcp_host, args.tcp_port))
.await
.expect("failed to bind tcp to address");
println!(
"{}",
format!(
"### Listening for TCP connections on port {}",
args.tcp_port
)
.magenta()
);

let psm = args.psm;
let max_credits = args.max_credits;
let mtu = args.mtu;
let mps = args.mps;
let ble_connection = Arc::new(Mutex::new(ble_connection));
// Ensure Python event loop is available to l2cap `disconnect`
let _ = run_future_with_current_task_locals(async move {
while let Ok((tcp_stream, addr)) = listener.accept().await {
let ble_connection = ble_connection.clone();
let _ = run_future_with_current_task_locals(proxy_data_between_tcp_and_l2cap(
ble_connection,
tcp_stream,
addr,
psm,
max_credits,
mtu,
mps,
));
}
Ok(())
});
Ok(())
}

async fn proxy_data_between_tcp_and_l2cap(
ble_connection: Arc<Mutex<Connection>>,
tcp_stream: TcpStream,
addr: SocketAddr,
psm: u16,
max_credits: Option<u16>,
mtu: Option<u16>,
mps: Option<u16>,
) -> PyResult<()> {
println!("{}", format!("<<< TCP connection from {}", addr).magenta());
println!(
"{}",
format!(">>> Opening L2CAP channel on PSM = {}", psm).yellow()
);

let mut l2cap_channel = match ble_connection
.lock()
.await
.open_l2cap_channel(psm, max_credits, mtu, mps)
.await
{
Ok(channel) => channel,
Err(e) => {
println!("{}", format!("!!! Connection failed: {e}").red());
// TCP stream will get dropped after returning, automatically shutting it down.
return Err(e);
}
};
let channel_info = l2cap_channel
.debug_string()
.unwrap_or_else(|e| format!("failed to get l2cap channel info ({e})"));

println!("{}{}", "*** L2CAP channel: ".cyan(), channel_info);

let (l2cap_to_tcp_tx, l2cap_to_tcp_rx) = mpsc::channel::<BridgeData>(10);

// Set l2cap callback (`set_sink`) for when data is received.
let l2cap_to_tcp_tx_clone = l2cap_to_tcp_tx.clone();
l2cap_channel
.set_sink(move |_py, sdu| {
block_on(l2cap_to_tcp_tx_clone.send(BridgeData::Data(sdu.into())))
.expect("failed to channel data to tcp");
Ok(())
})
.expect("failed to set sink for l2cap connection");

// Set l2cap callback for when the channel is closed.
l2cap_channel
.on_close(move |_py| {
println!("{}", "*** L2CAP channel closed".red());
block_on(l2cap_to_tcp_tx.send(BridgeData::CloseSignal))
.expect("failed to channel close signal to tcp");
Ok(())
})
.expect("failed to set on_close callback for l2cap channel");

let l2cap_channel = Arc::new(Mutex::new(Some(l2cap_channel)));
let (tcp_reader, tcp_writer) = tcp_stream.into_split();

// Do tcp stuff when something happens on the l2cap channel.
let handle_l2cap_data_future =
proxy_l2cap_rx_to_tcp_tx(l2cap_to_tcp_rx, tcp_writer, l2cap_channel.clone());

// Do l2cap stuff when something happens on tcp.
let handle_tcp_data_future = proxy_tcp_rx_to_l2cap_tx(tcp_reader, l2cap_channel.clone(), true);

let (handle_l2cap_result, handle_tcp_result) =
join!(handle_l2cap_data_future, handle_tcp_data_future);

if let Err(e) = handle_l2cap_result {
println!("!!! Error: {e}");
}

if let Err(e) = handle_tcp_result {
println!("!!! Error: {e}");
}

Python::with_gil(|_| {
// Must hold GIL at least once while/after dropping for Python heap object to ensure
// de-allocation.
drop(l2cap_channel);
});

Ok(())
}
Loading

0 comments on commit 9303f4f

Please sign in to comment.