Skip to content

Commit

Permalink
lint
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 committed Dec 19, 2024
1 parent 037a667 commit 00db37e
Show file tree
Hide file tree
Showing 10 changed files with 32 additions and 28 deletions.
2 changes: 1 addition & 1 deletion .rustfmt.toml
Original file line number Diff line number Diff line change
@@ -1 +1 @@
edition = "2021"
edition = "2021"
6 changes: 4 additions & 2 deletions examples/mapt-event-time-filter/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::error::Error;

use filter_impl::filter_event_time;
use numaflow::sourcetransform;
use numaflow::sourcetransform::{Message, SourceTransformRequest};
use std::error::Error;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
Expand Down Expand Up @@ -41,9 +42,10 @@ mod filter_impl {

#[cfg(test)]
mod tests {
use crate::filter_impl::filter_event_time;
use chrono::{TimeZone, Utc};
use numaflow::sourcetransform::SourceTransformRequest;

use crate::filter_impl::filter_event_time;
/// Tests that events from 2022 are tagged as within the year 2022.
#[test]
fn test_filter_event_time_should_return_within_year_2022() {
Expand Down
2 changes: 1 addition & 1 deletion examples/sideinput/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use numaflow::sideinput::{self, SideInputer};
use std::sync::Mutex;
use std::time::{SystemTime, UNIX_EPOCH};

use numaflow::sideinput::{self, SideInputer};
use tonic::async_trait;

struct SideInputHandler {
Expand Down
4 changes: 2 additions & 2 deletions examples/simple-source/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
}

pub(crate) mod simple_source {
use chrono::Utc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::{collections::HashSet, sync::RwLock};
use tokio::sync::mpsc::Sender;

use chrono::Utc;
use numaflow::source::{Message, Offset, SourceReadRequest, Sourcer};
use tokio::sync::mpsc::Sender;

/// SimpleSource is a data generator which generates monotonically increasing offsets and data. It is a shared state which is protected using Locks
/// or Atomics to provide concurrent access. Numaflow actually does not require concurrent access but we are forced to do this because the SDK
Expand Down
13 changes: 7 additions & 6 deletions numaflow/src/batchmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,6 @@ use std::fs;
use std::path::PathBuf;
use std::sync::Arc;

use crate::error::Error;
use crate::error::ErrorKind::{InternalError, UserDefinedError};
use crate::servers::map as proto;
use crate::servers::map::map_server::Map;
use crate::servers::map::{MapRequest, MapResponse, ReadyResponse};
use crate::shared::{self, shutdown_signal, ContainerType};
use chrono::{DateTime, Utc};
use tokio::sync::mpsc::channel;
use tokio::sync::{mpsc, oneshot};
Expand All @@ -18,6 +12,13 @@ use tokio_util::sync::CancellationToken;
use tonic::{Request, Response, Status, Streaming};
use tracing::{debug, info};

use crate::error::Error;
use crate::error::ErrorKind::{InternalError, UserDefinedError};
use crate::servers::map as proto;
use crate::servers::map::map_server::Map;
use crate::servers::map::{MapRequest, MapResponse, ReadyResponse};
use crate::shared::{self, shutdown_signal, ContainerType};

const DEFAULT_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024;
const DEFAULT_SOCK_ADDR: &str = "/var/run/numaflow/batchmap.sock";
const DEFAULT_SERVER_INFO_FILE: &str = "/var/run/numaflow/mapper-server-info";
Expand Down
19 changes: 11 additions & 8 deletions numaflow/src/mapstream.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
use crate::servers::map as proto;
use chrono::{DateTime, Utc};
use std::collections::HashMap;
use std::fs;
use std::path::PathBuf;
use tokio::sync::mpsc::Sender;

use crate::error::{Error, ErrorKind};
use crate::shared::{self, shutdown_signal, ContainerType};
use std::sync::Arc;

use chrono::{DateTime, Utc};
use tokio::sync::mpsc::Sender;
use tokio::sync::{mpsc, oneshot};
use tokio::task::JoinHandle;
use tokio_stream::wrappers::ReceiverStream;
use tokio_util::sync::CancellationToken;
use tonic::{async_trait, Request, Response, Status, Streaming};
use tracing::{error, info};

use crate::error::{Error, ErrorKind};
use crate::servers::map as proto;
use crate::shared::{self, shutdown_signal, ContainerType};

const DEFAULT_CHANNEL_SIZE: usize = 1000;
const DEFAULT_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024;
const DEFAULT_SOCK_ADDR: &str = "/var/run/numaflow/mapstream.sock";
Expand Down Expand Up @@ -559,16 +560,18 @@ impl<C> Drop for Server<C> {

#[cfg(test)]
mod tests {
use super::*;
use crate::servers::map::map_client::MapClient;
use std::{error::Error, time::Duration};

use tempfile::TempDir;
use tokio::net::UnixStream;
use tokio::sync::{mpsc, oneshot};
use tokio_stream::wrappers::ReceiverStream;
use tonic::transport::Uri;
use tower::service_fn;

use super::*;
use crate::servers::map::map_client::MapClient;

#[tokio::test]
async fn map_stream_single_response() -> Result<(), Box<dyn Error>> {
struct Cat;
Expand Down
1 change: 0 additions & 1 deletion numaflow/src/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,6 @@ impl Message {
/// use numaflow::reduce::Message;
/// let message = Message::new(vec![1, 2, 3]).tags(vec!["tag1".to_string(), "tag2".to_string()]);
/// ```
pub fn tags(mut self, tags: Vec<String>) -> Self {
self.tags = Some(tags);
self
Expand Down
1 change: 0 additions & 1 deletion numaflow/src/sideinput.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ pub trait SideInputer {
/// ```
///
/// The `retrieve_sideinput` method is implemented to return an `Option<Vec<u8>>`. In this example, the method returns a message containing the current time if the counter is odd, and `None` if the counter is even.
async fn retrieve_sideinput(&self) -> Option<Vec<u8>>;
}

Expand Down
11 changes: 6 additions & 5 deletions numaflow/src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -524,17 +524,18 @@ impl<C> Drop for Server<C> {
mod tests {
use std::{error::Error, time::Duration};

use crate::servers::sink::TransmissionStatus;
use crate::sink;
use crate::sink::sink_pb::sink_client::SinkClient;
use crate::sink::sink_pb::sink_request::Request;
use crate::sink::sink_pb::Handshake;
use tempfile::TempDir;
use tokio::net::UnixStream;
use tokio::sync::oneshot;
use tonic::transport::Uri;
use tower::service_fn;

use crate::servers::sink::TransmissionStatus;
use crate::sink;
use crate::sink::sink_pb::sink_client::SinkClient;
use crate::sink::sink_pb::sink_request::Request;
use crate::sink::sink_pb::Handshake;

#[tokio::test]
async fn sink_server() -> Result<(), Box<dyn Error>> {
struct Logger;
Expand Down
1 change: 0 additions & 1 deletion numaflow/src/sourcetransform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,6 @@ impl Message {
/// let now = Utc::now();
/// let message = Message::new(vec![1, 2, 3], now).tags(vec!["tag1".to_string(), "tag2".to_string()]);
/// ```
pub fn tags(mut self, tags: Vec<String>) -> Self {
self.tags = Some(tags);
self
Expand Down

0 comments on commit 00db37e

Please sign in to comment.