diff --git a/.rustfmt.toml b/.rustfmt.toml index 3a26366..36c419b 100644 --- a/.rustfmt.toml +++ b/.rustfmt.toml @@ -1 +1 @@ -edition = "2021" +edition = "2021" \ No newline at end of file diff --git a/examples/mapt-event-time-filter/src/main.rs b/examples/mapt-event-time-filter/src/main.rs index 1b76145..2033719 100644 --- a/examples/mapt-event-time-filter/src/main.rs +++ b/examples/mapt-event-time-filter/src/main.rs @@ -1,7 +1,8 @@ +use std::error::Error; + use filter_impl::filter_event_time; use numaflow::sourcetransform; use numaflow::sourcetransform::{Message, SourceTransformRequest}; -use std::error::Error; #[tokio::main] async fn main() -> Result<(), Box> { @@ -41,9 +42,10 @@ mod filter_impl { #[cfg(test)] mod tests { - use crate::filter_impl::filter_event_time; use chrono::{TimeZone, Utc}; use numaflow::sourcetransform::SourceTransformRequest; + + use crate::filter_impl::filter_event_time; /// Tests that events from 2022 are tagged as within the year 2022. #[test] fn test_filter_event_time_should_return_within_year_2022() { diff --git a/examples/sideinput/src/main.rs b/examples/sideinput/src/main.rs index 39af6dd..65b9ec2 100644 --- a/examples/sideinput/src/main.rs +++ b/examples/sideinput/src/main.rs @@ -1,7 +1,7 @@ -use numaflow::sideinput::{self, SideInputer}; use std::sync::Mutex; use std::time::{SystemTime, UNIX_EPOCH}; +use numaflow::sideinput::{self, SideInputer}; use tonic::async_trait; struct SideInputHandler { diff --git a/examples/simple-source/src/main.rs b/examples/simple-source/src/main.rs index dd842c8..bfb1d49 100644 --- a/examples/simple-source/src/main.rs +++ b/examples/simple-source/src/main.rs @@ -7,12 +7,12 @@ async fn main() -> Result<(), Box> { } pub(crate) mod simple_source { - use chrono::Utc; use std::sync::atomic::{AtomicUsize, Ordering}; use std::{collections::HashSet, sync::RwLock}; - use tokio::sync::mpsc::Sender; + use chrono::Utc; use numaflow::source::{Message, Offset, SourceReadRequest, Sourcer}; + use tokio::sync::mpsc::Sender; /// SimpleSource is a data generator which generates monotonically increasing offsets and data. It is a shared state which is protected using Locks /// or Atomics to provide concurrent access. Numaflow actually does not require concurrent access but we are forced to do this because the SDK diff --git a/numaflow/src/batchmap.rs b/numaflow/src/batchmap.rs index f602037..7642df4 100644 --- a/numaflow/src/batchmap.rs +++ b/numaflow/src/batchmap.rs @@ -3,12 +3,6 @@ use std::fs; use std::path::PathBuf; use std::sync::Arc; -use crate::error::Error; -use crate::error::ErrorKind::{InternalError, UserDefinedError}; -use crate::servers::map as proto; -use crate::servers::map::map_server::Map; -use crate::servers::map::{MapRequest, MapResponse, ReadyResponse}; -use crate::shared::{self, shutdown_signal, ContainerType}; use chrono::{DateTime, Utc}; use tokio::sync::mpsc::channel; use tokio::sync::{mpsc, oneshot}; @@ -18,6 +12,13 @@ use tokio_util::sync::CancellationToken; use tonic::{Request, Response, Status, Streaming}; use tracing::{debug, info}; +use crate::error::Error; +use crate::error::ErrorKind::{InternalError, UserDefinedError}; +use crate::servers::map as proto; +use crate::servers::map::map_server::Map; +use crate::servers::map::{MapRequest, MapResponse, ReadyResponse}; +use crate::shared::{self, shutdown_signal, ContainerType}; + const DEFAULT_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024; const DEFAULT_SOCK_ADDR: &str = "/var/run/numaflow/batchmap.sock"; const DEFAULT_SERVER_INFO_FILE: &str = "/var/run/numaflow/mapper-server-info"; diff --git a/numaflow/src/mapstream.rs b/numaflow/src/mapstream.rs index 45e1f2e..5d11eca 100644 --- a/numaflow/src/mapstream.rs +++ b/numaflow/src/mapstream.rs @@ -1,13 +1,10 @@ -use crate::servers::map as proto; -use chrono::{DateTime, Utc}; use std::collections::HashMap; use std::fs; use std::path::PathBuf; -use tokio::sync::mpsc::Sender; - -use crate::error::{Error, ErrorKind}; -use crate::shared::{self, shutdown_signal, ContainerType}; use std::sync::Arc; + +use chrono::{DateTime, Utc}; +use tokio::sync::mpsc::Sender; use tokio::sync::{mpsc, oneshot}; use tokio::task::JoinHandle; use tokio_stream::wrappers::ReceiverStream; @@ -15,6 +12,10 @@ use tokio_util::sync::CancellationToken; use tonic::{async_trait, Request, Response, Status, Streaming}; use tracing::{error, info}; +use crate::error::{Error, ErrorKind}; +use crate::servers::map as proto; +use crate::shared::{self, shutdown_signal, ContainerType}; + const DEFAULT_CHANNEL_SIZE: usize = 1000; const DEFAULT_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024; const DEFAULT_SOCK_ADDR: &str = "/var/run/numaflow/mapstream.sock"; @@ -559,9 +560,8 @@ impl Drop for Server { #[cfg(test)] mod tests { - use super::*; - use crate::servers::map::map_client::MapClient; use std::{error::Error, time::Duration}; + use tempfile::TempDir; use tokio::net::UnixStream; use tokio::sync::{mpsc, oneshot}; @@ -569,6 +569,9 @@ mod tests { use tonic::transport::Uri; use tower::service_fn; + use super::*; + use crate::servers::map::map_client::MapClient; + #[tokio::test] async fn map_stream_single_response() -> Result<(), Box> { struct Cat; diff --git a/numaflow/src/reduce.rs b/numaflow/src/reduce.rs index 6f58972..9c1b82d 100644 --- a/numaflow/src/reduce.rs +++ b/numaflow/src/reduce.rs @@ -255,7 +255,6 @@ impl Message { /// use numaflow::reduce::Message; /// let message = Message::new(vec![1, 2, 3]).tags(vec!["tag1".to_string(), "tag2".to_string()]); /// ``` - pub fn tags(mut self, tags: Vec) -> Self { self.tags = Some(tags); self diff --git a/numaflow/src/sideinput.rs b/numaflow/src/sideinput.rs index 9980653..fd5cc24 100644 --- a/numaflow/src/sideinput.rs +++ b/numaflow/src/sideinput.rs @@ -81,7 +81,6 @@ pub trait SideInputer { /// ``` /// /// The `retrieve_sideinput` method is implemented to return an `Option>`. In this example, the method returns a message containing the current time if the counter is odd, and `None` if the counter is even. - async fn retrieve_sideinput(&self) -> Option>; } diff --git a/numaflow/src/sink.rs b/numaflow/src/sink.rs index 1171256..0034e24 100644 --- a/numaflow/src/sink.rs +++ b/numaflow/src/sink.rs @@ -524,17 +524,18 @@ impl Drop for Server { mod tests { use std::{error::Error, time::Duration}; - use crate::servers::sink::TransmissionStatus; - use crate::sink; - use crate::sink::sink_pb::sink_client::SinkClient; - use crate::sink::sink_pb::sink_request::Request; - use crate::sink::sink_pb::Handshake; use tempfile::TempDir; use tokio::net::UnixStream; use tokio::sync::oneshot; use tonic::transport::Uri; use tower::service_fn; + use crate::servers::sink::TransmissionStatus; + use crate::sink; + use crate::sink::sink_pb::sink_client::SinkClient; + use crate::sink::sink_pb::sink_request::Request; + use crate::sink::sink_pb::Handshake; + #[tokio::test] async fn sink_server() -> Result<(), Box> { struct Logger; diff --git a/numaflow/src/sourcetransform.rs b/numaflow/src/sourcetransform.rs index 1895312..2e6c6a4 100644 --- a/numaflow/src/sourcetransform.rs +++ b/numaflow/src/sourcetransform.rs @@ -167,7 +167,6 @@ impl Message { /// let now = Utc::now(); /// let message = Message::new(vec![1, 2, 3], now).tags(vec!["tag1".to_string(), "tag2".to_string()]); /// ``` - pub fn tags(mut self, tags: Vec) -> Self { self.tags = Some(tags); self