Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ffi/rodbus-ffi/src/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ impl<'a> RegisterValueIterator<'a> {
}

pub(crate) unsafe fn bit_value_iterator_next(
it: *mut crate::BitValueIterator,
it: *mut crate::BitValueIterator<'_>,
) -> Option<&crate::ffi::BitValue> {
match it.as_mut() {
Some(it) => match it.inner.next() {
Expand All @@ -46,7 +46,7 @@ pub(crate) unsafe fn bit_value_iterator_next(
}

pub(crate) unsafe fn register_value_iterator_next(
it: *mut crate::RegisterValueIterator,
it: *mut crate::RegisterValueIterator<'_>,
) -> Option<&crate::ffi::RegisterValue> {
match it.as_mut() {
Some(it) => match it.inner.next() {
Expand Down
32 changes: 30 additions & 2 deletions rodbus/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub use crate::client::channel::*;
pub use crate::client::listener::*;
pub use crate::client::requests::write_multiple::WriteMultiple;
pub use crate::retry::*;
use crate::ClientOptions;

#[cfg(feature = "ffi")]
pub use ffi_channel::*;
Expand Down Expand Up @@ -105,13 +106,40 @@ pub fn spawn_tcp_client_task(
retry: Box<dyn RetryStrategy>,
decode: DecodeLevel,
listener: Option<Box<dyn Listener<ClientState>>>,
) -> Channel {
let options = ClientOptions::default()
.decode(decode)
.max_queued_requests(max_queued_requests);
crate::tcp::client::spawn_tcp_channel(
host,
retry,
listener.unwrap_or_else(|| NullListener::create()),
options,
)
}

/// Spawns a channel task onto the runtime that maintains a TCP connection and processes
/// requests. The task completes when the returned channel handle is dropped.
///
/// The channel uses the provided [`RetryStrategy`] to pause between failed connection attempts
///
/// * `host` - Address/port of the remote server. Can be a IP address or name on which to perform DNS resolution.
/// * `retry` - A boxed trait object that controls when the connection is retried on failure
/// * `listener` - Optional callback to monitor the TCP connection state
/// * `client_options` - A builder that contains various client options.
///
/// `WARNING`: This function must be called from with the context of the Tokio runtime or it will panic.
pub fn spawn_tcp_client_task_2(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The function name spawn_tcp_client_task_2 is not idiomatic. A more descriptive name like spawn_tcp_client_task_with_options would better reflect its purpose. Consider also deprecating the older spawn_tcp_client_task function to guide users towards the new, more flexible API.

Suggested change
pub fn spawn_tcp_client_task_2(
pub fn spawn_tcp_client_task_with_options(

host: HostAddr,
retry: Box<dyn RetryStrategy>,
listener: Option<Box<dyn Listener<ClientState>>>,
client_options: ClientOptions,
) -> Channel {
crate::tcp::client::spawn_tcp_channel(
host,
max_queued_requests,
retry,
decode,
listener.unwrap_or_else(|| NullListener::create()),
client_options,
)
}

Expand Down
124 changes: 66 additions & 58 deletions rodbus/src/tcp/client.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::error::Error;

use tracing::Instrument;

use crate::client::{Channel, ClientState, HostAddr, Listener};
Expand All @@ -9,38 +11,37 @@ use crate::client::task::{ClientLoop, SessionError, StateChange};
use crate::common::frame::{FrameWriter, FramedReader};
use crate::error::Shutdown;
use crate::retry::RetryStrategy;
use crate::{ClientOptions, ConnectionLoggingStrategy};

use tokio::net::TcpStream;

pub(crate) fn spawn_tcp_channel(
host: HostAddr,
max_queued_requests: usize,
connect_retry: Box<dyn RetryStrategy>,
decode: DecodeLevel,
listener: Box<dyn Listener<ClientState>>,
client_options: ClientOptions,
) -> Channel {
let (handle, task) =
create_tcp_channel(host, max_queued_requests, connect_retry, decode, listener);
let (handle, task) = create_tcp_channel(host, connect_retry, listener, client_options);
tokio::spawn(task);
handle
}

pub(crate) fn create_tcp_channel(
host: HostAddr,
max_queued_requests: usize,
connect_retry: Box<dyn RetryStrategy>,
decode: DecodeLevel,
listener: Box<dyn Listener<ClientState>>,
options: ClientOptions,
) -> (Channel, impl std::future::Future<Output = ()>) {
let (tx, rx) = tokio::sync::mpsc::channel(max_queued_requests);
let (tx, rx) = tokio::sync::mpsc::channel(options.max_queued_requests);
let task = async move {
TcpChannelTask::new(
host.clone(),
rx.into(),
TcpTaskConnectionHandler::Tcp,
connect_retry,
decode,
options.decode,
listener,
options.connection_logging_strategy,
)
.run()
.instrument(tracing::info_span!("Modbus-Client-TCP", endpoint = ?host))
Expand Down Expand Up @@ -75,6 +76,7 @@ pub(crate) struct TcpChannelTask {
connection_handler: TcpTaskConnectionHandler,
client_loop: ClientLoop,
listener: Box<dyn Listener<ClientState>>,
_logging_strategy: ConnectionLoggingStrategy,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The field _logging_strategy is added to TcpChannelTask and initialized, but it's never used. The _ prefix indicates this is intentional to avoid compiler warnings, but it's unclear why an unused field is being added.

If this is for future use, please add a // TODO comment explaining its purpose. If it's not intended to be used, it should be removed. If the intention is to control logging, it should be used to conditionally enable/disable logging statements, for example in failed_tcp_connection and failed_tcp_stream_connection.

}

impl TcpChannelTask {
Expand All @@ -85,13 +87,15 @@ impl TcpChannelTask {
connect_retry: Box<dyn RetryStrategy>,
decode: DecodeLevel,
listener: Box<dyn Listener<ClientState>>,
_logging_strategy: ConnectionLoggingStrategy,
) -> Self {
Self {
host,
connect_retry,
connection_handler,
client_loop: ClientLoop::new(rx, FrameWriter::tcp(), FramedReader::tcp(), decode),
listener,
_logging_strategy,
}
}

Expand Down Expand Up @@ -133,20 +137,7 @@ impl TcpChannelTask {
async fn try_connect_and_run(&mut self) -> Result<(), StateChange> {
self.listener.update(ClientState::Connecting).get().await;
match self.connect().await? {
Err(err) => {
let delay = self.connect_retry.after_failed_connect();
tracing::warn!(
"failed to connect to {}: {} - waiting {} ms before next attempt",
self.host,
err,
delay.as_millis()
);
self.listener
.update(ClientState::WaitAfterFailedConnect(delay))
.get()
.await;
self.client_loop.fail_requests_for(delay).await
}
Err(err) => self.failed_tcp_stream_connection(err).await,
Ok(socket) => {
if let Ok(addr) = socket.peer_addr() {
tracing::info!("connected to: {}", addr);
Expand All @@ -155,44 +146,61 @@ impl TcpChannelTask {
tracing::warn!("unable to enable TCP_NODELAY: {}", err);
}
match self.connection_handler.handle(socket, &self.host).await {
Err(err) => {
let delay = self.connect_retry.after_failed_connect();
tracing::warn!(
"{} - waiting {} ms before next attempt",
err,
delay.as_millis()
);
self.listener
.update(ClientState::WaitAfterFailedConnect(delay))
.get()
.await;
self.client_loop.fail_requests_for(delay).await
}
Ok(mut phys) => {
self.listener.update(ClientState::Connected).get().await;
// reset the retry strategy now that we have a successful connection
// we do this here so that the reset happens after a TLS handshake
self.connect_retry.reset();
// run the physical layer independent processing loop
match self.client_loop.run(&mut phys).await {
// the mpsc was closed, end the task
SessionError::Shutdown => Err(StateChange::Shutdown),
// re-establish the connection
SessionError::Disabled
| SessionError::IoError(_)
| SessionError::BadFrame => {
let delay = self.connect_retry.after_disconnect();
tracing::warn!("waiting {:?} to reconnect", delay);
self.listener
.update(ClientState::WaitAfterDisconnect(delay))
.get()
.await;
self.client_loop.fail_requests_for(delay).await
}
}
}
Err(err) => self.failed_tcp_connection(err).await,
Ok(phys) => self.connected(phys).await,
}
}
}
}

async fn connected(&mut self, mut phys: PhysLayer) -> Result<(), StateChange> {
self.listener.update(ClientState::Connected).get().await;
// reset the retry strategy now that we have a successful connection
// we do this here so that the reset happens after a TLS handshake
self.connect_retry.reset();
// run the physical layer independent processing loop
match self.client_loop.run(&mut phys).await {
// the mpsc was closed, end the task
SessionError::Shutdown => Err(StateChange::Shutdown),
// re-establish the connection
SessionError::Disabled | SessionError::IoError(_) | SessionError::BadFrame => {
let delay = self.connect_retry.after_disconnect();
tracing::warn!("waiting {:?} to reconnect", delay);
self.listener
.update(ClientState::WaitAfterDisconnect(delay))
.get()
.await;
self.client_loop.fail_requests_for(delay).await
}
}
}

async fn failed_tcp_connection(&mut self, err: String) -> Result<(), StateChange> {
let delay = self.connect_retry.after_failed_connect();
tracing::warn!(
"{} - waiting {} ms before next attempt",
err,
delay.as_millis()
);
self.listener
.update(ClientState::WaitAfterFailedConnect(delay))
.get()
.await;
self.client_loop.fail_requests_for(delay).await
}

async fn failed_tcp_stream_connection<T: Error>(&mut self, err: T) -> Result<(), StateChange> {
let delay = self.connect_retry.after_failed_connect();
tracing::warn!(
"failed to connect to {}: {} - waiting {} ms before next attempt",
self.host,
err,
delay.as_millis()
);
self.listener
.update(ClientState::WaitAfterFailedConnect(delay))
.get()
.await;
self.client_loop.fail_requests_for(delay).await
}
}
3 changes: 2 additions & 1 deletion rodbus/src/tcp/tls/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::common::phys::PhysLayer;
use crate::tcp::client::{TcpChannelTask, TcpTaskConnectionHandler};
use crate::tcp::tls::{CertificateMode, MinTlsVersion, TlsError};

use crate::DecodeLevel;
use crate::{ConnectionLoggingStrategy, DecodeLevel};

/// TLS configuration
pub struct TlsClientConfig {
Expand Down Expand Up @@ -60,6 +60,7 @@ pub(crate) fn create_tls_channel(
connect_retry,
decode,
listener,
ConnectionLoggingStrategy::All,
)
Comment on lines 60 to 64

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The refactoring to use ClientOptions has been applied to the plain TCP client, but not consistently to the TLS client. In create_tls_channel, ConnectionLoggingStrategy::All is hardcoded, and other options like max_queued_requests and decode are still passed as separate arguments.

For API consistency, the TLS client creation functions (spawn_tls_client_task in client/mod.rs and create_tls_channel here) should be updated to accept ClientOptions. This would allow users to configure all client options for TLS connections, just as they can for plain TCP.

.run()
.instrument(tracing::info_span!("Modbus-Client-TCP", endpoint = ?host))
Expand Down
62 changes: 61 additions & 1 deletion rodbus/src/types.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::decode::AppDecodeLevel;
use crate::error::{AduParseError, InvalidRange};
use crate::DecodeLevel;

use scursor::ReadCursor;

Expand Down Expand Up @@ -375,6 +376,65 @@ impl Default for UnitId {
}
}

/// How verbose to make the logging for connects & disconnects
#[derive(Clone, Copy)]
pub enum ConnectionLoggingStrategy {
/// Log it all
All,
/// Only log State changes
StateDriven,
}

impl Default for ConnectionLoggingStrategy {
fn default() -> Self {
Self::All
}
}

/// A ClientOptions builder
#[derive(Copy, Clone)]
pub struct ClientOptions {
pub(crate) connection_logging_strategy: ConnectionLoggingStrategy,
pub(crate) max_queued_requests: usize,
pub(crate) decode: DecodeLevel,
}

impl ClientOptions {
/// Builder function for the connection_logging_strategy field.
pub fn connection_logging(
self,
connection_logging_strategy: ConnectionLoggingStrategy,
) -> Self {
Self {
connection_logging_strategy,
..self
}
}

/// builder function for the max queued requests field
pub fn max_queued_requests(self, max_queued_requests: usize) -> Self {
Self {
max_queued_requests,
..self
}
}

/// builder function for the decode level field
pub fn decode(self, decode: DecodeLevel) -> Self {
Self { decode, ..self }
}
}

impl Default for ClientOptions {
fn default() -> Self {
Self {
connection_logging_strategy: ConnectionLoggingStrategy::default(),
max_queued_requests: 16,
decode: DecodeLevel::default(),
}
}
}

#[cfg(test)]
mod tests {
use crate::error::*;
Expand All @@ -383,7 +443,7 @@ mod tests {

#[test]
fn address_start_max_count_of_one_is_allowed() {
AddressRange::try_from(std::u16::MAX, 1).unwrap();
AddressRange::try_from(u16::MAX, 1).unwrap();
}

#[test]
Expand Down