Skip to content

Commit

Permalink
chore: infinite retry while connecting to grpc server (#2432)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 authored Feb 27, 2025
1 parent 4f31aad commit 3fb5b5d
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 8 deletions.
4 changes: 2 additions & 2 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 11 additions & 2 deletions rust/monovertex/src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ use backoff::retry::Retry;
use backoff::strategy::fixed;
use tonic::transport::Channel;
use tonic::Request;
use tracing::warn;

pub mod proto {
tonic::include_proto!("sink.v1");
}

const RECONNECT_INTERVAL: u64 = 1000;
const MAX_RECONNECT_ATTEMPTS: usize = 5;
const MAX_RECONNECT_ATTEMPTS: usize = usize::MAX;
const SINK_SOCKET: &str = "/var/run/numaflow/sink.sock";
const FB_SINK_SOCKET: &str = "/var/run/numaflow/fb-sink.sock";

Expand Down Expand Up @@ -61,7 +62,15 @@ impl SinkClient {

let channel = Retry::retry(
interval,
|| async { connect_with_uds(config.socket_path.clone().into()).await },
|| async {
match connect_with_uds(config.socket_path.clone().into()).await {
Ok(channel) => Ok(channel),
Err(e) => {
warn!(?e, "Failed to connect to sink UDS socket");
Err(e)
}
}
},
|_: &Error| true,
)
.await?;
Expand Down
13 changes: 11 additions & 2 deletions rust/monovertex/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@ use base64::Engine;
use tokio_stream::StreamExt;
use tonic::transport::Channel;
use tonic::Request;
use tracing::warn;

pub mod proto {
tonic::include_proto!("source.v1");
}
const RECONNECT_INTERVAL: u64 = 1000;
const MAX_RECONNECT_ATTEMPTS: usize = 5;
const MAX_RECONNECT_ATTEMPTS: usize = usize::MAX;
const SOURCE_SOCKET: &str = "/var/run/numaflow/source.sock";
const SOURCE_SERVER_INFO_FILE: &str = "/var/run/numaflow/sourcer-server-info";

Expand Down Expand Up @@ -48,7 +49,15 @@ impl SourceClient {

let channel = Retry::retry(
interval,
|| async { connect_with_uds(config.socket_path.clone().into()).await },
|| async {
match connect_with_uds(config.socket_path.clone().into()).await {
Ok(channel) => Ok(channel),
Err(e) => {
warn!(?e, "Failed to connect to source UDS socket");
Err(e)
}
}
},
|_: &Error| true,
)
.await?;
Expand Down
13 changes: 11 additions & 2 deletions rust/monovertex/src/transformer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ use backoff::retry::Retry;
use backoff::strategy::fixed;
use tonic::transport::Channel;
use tonic::Request;
use tracing::warn;

pub mod proto {
tonic::include_proto!("sourcetransformer.v1");
}

const DROP: &str = "U+005C__DROP__";
const RECONNECT_INTERVAL: u64 = 1000;
const MAX_RECONNECT_ATTEMPTS: usize = 5;
const MAX_RECONNECT_ATTEMPTS: usize = usize::MAX;
const TRANSFORMER_SOCKET: &str = "/var/run/numaflow/sourcetransform.sock";
const TRANSFORMER_SERVER_INFO_FILE: &str = "/var/run/numaflow/sourcetransformer-server-info";

Expand Down Expand Up @@ -48,7 +49,15 @@ impl TransformerClient {

let channel = Retry::retry(
interval,
|| async { connect_with_uds(config.socket_path.clone().into()).await },
|| async {
match connect_with_uds(config.socket_path.clone().into()).await {
Ok(channel) => Ok(channel),
Err(e) => {
warn!(?e, "Failed to connect to transformer UDS socket");
Err(e)
}
}
},
|_: &Error| true,
)
.await?;
Expand Down

0 comments on commit 3fb5b5d

Please sign in to comment.