Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: use std time instead of chrono time #112

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions examples/flatmap-stream/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,9 @@ struct Cat;
#[tonic::async_trait]
impl mapstream::MapStreamer for Cat {
async fn map_stream(&self, input: mapstream::MapStreamRequest, tx: Sender<Message>) {
let payload_str = String::from_utf8(input.value).unwrap_or_default();
let splits: Vec<&str> = payload_str.split(',').collect();

for split in splits {
let message = Message::new(split.as_bytes().to_vec())
.with_keys(input.keys.clone())
for i in 0..2 {
let message = Message::new(input.value.clone())
.with_keys(vec![format!("key-{}", i)])
.with_tags(vec![]);
if tx.send(message).await.is_err() {
break;
Expand Down
1 change: 0 additions & 1 deletion examples/map-tickgen-serde/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,4 @@ tonic = "0.12.0"
tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }
serde = { version = "1.0.103", features = ["derive"] }
serde_json = "1.0.103"
chrono = "0.4.26"
numaflow = { path = "../../numaflow" }
8 changes: 3 additions & 5 deletions examples/map-tickgen-serde/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use chrono::{SecondsFormat, TimeZone, Utc};
use numaflow::map;
use numaflow::map::Message;
use serde::Serialize;
use std::time::UNIX_EPOCH;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
Expand Down Expand Up @@ -35,13 +35,11 @@ impl map::Mapper for TickGen {
let Ok(payload) = serde_json::from_slice::<Payload>(&input.value) else {
return vec![];
};
let ts = Utc
.timestamp_nanos(payload.created_ts)
.to_rfc3339_opts(SecondsFormat::Nanos, true);
let ts = UNIX_EPOCH + std::time::Duration::from_nanos(payload.created_ts as u64);
let message = map::Message::new(
serde_json::to_vec(&ResultPayload {
value: payload.data.value,
time: ts,
time: format!("{:?}", ts),
})
.unwrap_or_default(),
)
Expand Down
1 change: 0 additions & 1 deletion examples/mapt-event-time-filter/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,4 @@ edition = "2021"
[dependencies]
numaflow = { path = "../../numaflow" }
tokio = "1.38.0"
chrono = "0.4.38"
tonic = "0.12.0"
26 changes: 14 additions & 12 deletions examples/mapt-event-time-filter/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use std::error::Error;

use filter_impl::filter_event_time;
use numaflow::sourcetransform;
use numaflow::sourcetransform::{Message, SourceTransformRequest};
use std::error::Error;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
Expand All @@ -21,13 +20,15 @@ impl sourcetransform::SourceTransformer for EventTimeFilter {
}

mod filter_impl {
use chrono::{TimeZone, Utc};
use numaflow::sourcetransform::{Message, SourceTransformRequest};
use std::time::{Duration, UNIX_EPOCH};

/// Filters messages based on their event time.
/// Returns different types of messages depending on the event time comparison.
pub fn filter_event_time(input: SourceTransformRequest) -> Vec<Message> {
let jan_first_2022 = Utc.with_ymd_and_hms(2022, 1, 1, 0, 0, 0).unwrap();
let jan_first_2023 = Utc.with_ymd_and_hms(2023, 1, 1, 0, 0, 0).unwrap();
let jan_first_2022 = UNIX_EPOCH + Duration::new(1_640_995_200, 0); // 2022-01-01 00:00:00 UTC
let jan_first_2023 = UNIX_EPOCH + Duration::new(1_672_348_800, 0); // 2023-01-01 00:00:00 UTC

if input.eventtime < jan_first_2022 {
vec![Message::message_to_drop(input.eventtime)]
} else if input.eventtime < jan_first_2023 {
Expand All @@ -42,18 +43,19 @@ mod filter_impl {

#[cfg(test)]
mod tests {
use chrono::{TimeZone, Utc};
use numaflow::sourcetransform::SourceTransformRequest;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

use crate::filter_impl::filter_event_time;

/// Tests that events from 2022 are tagged as within the year 2022.
#[test]
fn test_filter_event_time_should_return_within_year_2022() {
let time = Utc.with_ymd_and_hms(2022, 7, 2, 2, 0, 0).unwrap();
let time = UNIX_EPOCH + Duration::new(1_656_732_000, 0); // 2022-07-02 02:00:00 UTC
let source_request = SourceTransformRequest {
keys: vec![],
value: vec![],
watermark: Default::default(),
watermark: SystemTime::now(),
eventtime: time,
headers: Default::default(),
};
Expand All @@ -67,11 +69,11 @@ mod tests {
/// Tests that events from 2023 are tagged as after the year 2022.
#[test]
fn test_filter_event_time_should_return_after_year_2022() {
let time = Utc.with_ymd_and_hms(2023, 7, 2, 2, 0, 0).unwrap();
let time = UNIX_EPOCH + Duration::new(1_682_348_800, 0); // 2023-07-02 02:00:00 UTC
let source_request = SourceTransformRequest {
keys: vec![],
value: vec![],
watermark: Default::default(),
watermark: SystemTime::now(),
eventtime: time,
headers: Default::default(),
};
Expand All @@ -85,11 +87,11 @@ mod tests {
/// Tests that events before 2022 are dropped.
#[test]
fn test_filter_event_time_should_drop() {
let time = Utc.with_ymd_and_hms(2021, 7, 2, 2, 0, 0).unwrap();
let time = UNIX_EPOCH + Duration::new(1_594_732_000, 0); // 2021-07-02 02:00:00 UTC
let source_request = SourceTransformRequest {
keys: vec![],
value: vec![],
watermark: Default::default(),
watermark: SystemTime::now(),
eventtime: time,
headers: Default::default(),
};
Expand Down
2 changes: 0 additions & 2 deletions examples/simple-source/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,3 @@ edition = "2021"
tonic = "0.12.0"
tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }
numaflow = { path = "../../numaflow" }
chrono = "0.4.38"
uuid = "1.2.0"
13 changes: 8 additions & 5 deletions examples/simple-source/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
}

pub(crate) mod simple_source {
use numaflow::source::{Message, Offset, SourceReadRequest, Sourcer};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::{SystemTime, UNIX_EPOCH};
use std::{collections::HashSet, sync::RwLock};

use chrono::Utc;
use numaflow::source::{Message, Offset, SourceReadRequest, Sourcer};
use tokio::sync::mpsc::Sender;

/// SimpleSource is a data generator which generates monotonically increasing offsets and data. It is a shared state which is protected using Locks
Expand All @@ -38,10 +37,14 @@ pub(crate) mod simple_source {
return;
}

let event_time = Utc::now();
let event_time = SystemTime::now();
let mut message_offsets = Vec::with_capacity(request.count);
for i in 0..request.count {
let offset = format!("{}-{}", event_time.timestamp_nanos_opt().unwrap(), i);
let offset = format!(
"{}-{}",
event_time.duration_since(UNIX_EPOCH).unwrap().as_nanos(),
i
);
let payload = self.counter.fetch_add(1, Ordering::Relaxed).to_string();
transmitter
.send(Message {
Expand Down
1 change: 0 additions & 1 deletion examples/source-transformer-now/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,3 @@ edition = "2021"
tonic = "0.12.0"
tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }
numaflow = { path = "../../numaflow" }
chrono = "0.4.30"
3 changes: 2 additions & 1 deletion examples/source-transformer-now/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use numaflow::sourcetransform;
use std::time::SystemTime;

/// A simple source transformer which assigns event time to the current time in utc.

Expand All @@ -16,7 +17,7 @@ impl sourcetransform::SourceTransformer for NowCat {
input: sourcetransform::SourceTransformRequest,
) -> Vec<sourcetransform::Message> {
vec![
sourcetransform::Message::new(input.value, chrono::offset::Utc::now())
sourcetransform::Message::new(input.value, SystemTime::now())
.with_keys(input.keys.clone()),
]
}
Expand Down
2 changes: 0 additions & 2 deletions numaflow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@ tokio = { version = "1.0", features = ["macros", "rt-multi-thread", "signal"] }
tokio-util = "0.7.12"
tokio-stream = { version = "0.1.16", features = ["net"] }
serde = { version = "1.0.210", features = ["derive"] }
chrono = "0.4.38"
serde_json = "1.0.128"
futures-util = "0.3.30"
tracing = "0.1.40"
uuid = { version = "1.10.0", features = ["v4"] }
thiserror = "1.0"
Expand Down
17 changes: 10 additions & 7 deletions numaflow/src/batchmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ use std::collections::HashMap;
use std::fs;
use std::path::PathBuf;
use std::sync::Arc;

use chrono::{DateTime, Utc};
use std::time::SystemTime;
use tokio::sync::mpsc::channel;
use tokio::sync::{mpsc, oneshot};
use tokio::task::JoinHandle;
Expand Down Expand Up @@ -84,9 +83,9 @@ pub struct Datum {
/// The value in the (key, value) terminology of map/reduce paradigm.
pub value: Vec<u8>,
/// [watermark](https://numaflow.numaproj.io/core-concepts/watermarks/) represented by time is a guarantee that we will not see an element older than this time.
pub watermark: DateTime<Utc>,
pub watermark: SystemTime,
/// Time of the element as seen at source or aligned after a reduce operation.
pub event_time: DateTime<Utc>,
pub event_time: SystemTime,
/// ID is the unique id of the message to be sent to the Batch Map.
pub id: String,
/// Headers for the message.
Expand All @@ -104,8 +103,12 @@ impl TryFrom<MapRequest> for Datum {
Ok(Self {
keys: request.keys,
value: request.value,
watermark: shared::utc_from_timestamp(request.watermark),
event_time: shared::utc_from_timestamp(request.event_time),
watermark: shared::prost_timestamp_to_system_time(
request.watermark.unwrap_or_default(),
),
event_time: shared::prost_timestamp_to_system_time(
request.event_time.unwrap_or_default(),
),
id: sr.id,
headers: request.headers,
})
Expand Down Expand Up @@ -354,7 +357,7 @@ where
};

// we are done with this batch because eot=true
if message.status.map_or(false, |status| status.eot) {
if message.status.is_some_and(|status| status.eot) {
debug!("Batch Ended, received an EOT message");
break;
}
Expand Down
11 changes: 5 additions & 6 deletions numaflow/src/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ use std::collections::HashMap;
use std::fs;
use std::path::PathBuf;
use std::sync::Arc;

use chrono::{DateTime, Utc};
use std::time::SystemTime;
use tokio::sync::{mpsc, oneshot};
use tokio::task::JoinHandle;
use tokio_stream::wrappers::ReceiverStream;
Expand Down Expand Up @@ -164,9 +163,9 @@ pub struct MapRequest {
/// The value in the (key, value) terminology of map/reduce paradigm.
pub value: Vec<u8>,
/// [watermark](https://numaflow.numaproj.io/core-concepts/watermarks/) represented by time is a guarantee that we will not see an element older than this time.
pub watermark: DateTime<Utc>,
pub watermark: SystemTime,
/// Time of the element as seen at source or aligned after a reduce operation.
pub eventtime: DateTime<Utc>,
pub eventtime: SystemTime,
/// Headers for the message.
pub headers: HashMap<String, String>,
}
Expand All @@ -176,8 +175,8 @@ impl From<proto::map_request::Request> for MapRequest {
Self {
keys: value.keys,
value: value.value,
watermark: shared::utc_from_timestamp(value.watermark),
eventtime: shared::utc_from_timestamp(value.event_time),
watermark: shared::prost_timestamp_to_system_time(value.watermark.unwrap_or_default()),
eventtime: shared::prost_timestamp_to_system_time(value.event_time.unwrap_or_default()),
headers: value.headers,
}
}
Expand Down
20 changes: 9 additions & 11 deletions numaflow/src/mapstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ use std::collections::HashMap;
use std::fs;
use std::path::PathBuf;
use std::sync::Arc;

use chrono::{DateTime, Utc};
use std::time::SystemTime;
use tokio::sync::mpsc::Sender;
use tokio::sync::{mpsc, oneshot};
use tokio::task::JoinHandle;
Expand Down Expand Up @@ -40,7 +39,6 @@ pub trait MapStreamer {
/// use tokio::sync::mpsc;
/// use tonic::async_trait;
/// use std::collections::HashMap;
/// use chrono::{DateTime, Utc};
/// use tokio::sync::mpsc::Sender;
/// use numaflow::mapstream::{MapStreamRequest, MapStreamer, Message};
///
Expand Down Expand Up @@ -169,9 +167,9 @@ pub struct MapStreamRequest {
/// The value in the (key, value) terminology of map/reduce paradigm.
pub value: Vec<u8>,
/// [watermark](https://numaflow.numaproj.io/core-concepts/watermarks/) represented by time is a guarantee that we will not see an element older than this time.
pub watermark: DateTime<Utc>,
pub watermark: SystemTime,
/// Time of the element as seen at source or aligned after a reduce operation.
pub eventtime: DateTime<Utc>,
pub eventtime: SystemTime,
/// Headers for the message.
pub headers: HashMap<String, String>,
}
Expand All @@ -181,16 +179,16 @@ impl From<proto::map_request::Request> for MapStreamRequest {
Self {
keys: value.keys,
value: value.value,
watermark: shared::utc_from_timestamp(value.watermark),
eventtime: shared::utc_from_timestamp(value.event_time),
watermark: shared::prost_timestamp_to_system_time(value.watermark.unwrap_or_default()),
eventtime: shared::prost_timestamp_to_system_time(value.event_time.unwrap_or_default()),
headers: value.headers,
}
}
}

struct MapStreamService<T> {
handler: Arc<T>,
shutdown_tx: mpsc::Sender<()>,
shutdown_tx: Sender<()>,
cancellation_token: CancellationToken,
}

Expand Down Expand Up @@ -276,8 +274,8 @@ async fn handle_stream_requests<T>(
async fn handle_request<T>(
handler: Arc<T>,
map_request: Result<Option<proto::MapRequest>, Status>,
stream_response_tx: mpsc::Sender<Result<proto::MapResponse, Status>>,
error_tx: mpsc::Sender<Error>,
stream_response_tx: Sender<Result<proto::MapResponse, Status>>,
error_tx: Sender<Error>,
token: CancellationToken,
) -> bool
where
Expand Down Expand Up @@ -431,7 +429,7 @@ async fn manage_grpc_stream(
/// Performs the handshake with the client.
async fn perform_handshake(
stream: &mut Streaming<proto::MapRequest>,
stream_response_tx: &mpsc::Sender<Result<proto::MapResponse, Status>>,
stream_response_tx: &Sender<Result<proto::MapResponse, Status>>,
) -> Result<(), Status> {
let handshake_request = stream
.message()
Expand Down
Loading
Loading