From 99a366711daaf225ca50a3009fdad891b5f49bf5 Mon Sep 17 00:00:00 2001 From: Keran Yang Date: Fri, 13 Oct 2023 15:21:31 -0400 Subject: [PATCH] chore: unify the event time of messages to drop (#76) Set the event time of messages to drop to epoch(0) - 1ms across all our SDKs. Signed-off-by: Keran Yang --- .gitignore | 2 ++ pom.xml | 2 +- .../numaproj/numaflow/sourcetransformer/Message.java | 11 ++++++++--- 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/.gitignore b/.gitignore index 82e19e02..195a9c4b 100644 --- a/.gitignore +++ b/.gitignore @@ -40,3 +40,5 @@ buildNumber.properties # IDE .vscode/ +.DS_Store +*/**/.DS_Store diff --git a/pom.xml b/pom.xml index bd1eb26b..67314255 100644 --- a/pom.xml +++ b/pom.xml @@ -158,7 +158,7 @@ org.projectlombok lombok - 1.18.26 + 1.18.30 compile diff --git a/src/main/java/io/numaproj/numaflow/sourcetransformer/Message.java b/src/main/java/io/numaproj/numaflow/sourcetransformer/Message.java index 68a18948..457d4788 100644 --- a/src/main/java/io/numaproj/numaflow/sourcetransformer/Message.java +++ b/src/main/java/io/numaproj/numaflow/sourcetransformer/Message.java @@ -11,8 +11,9 @@ @Getter public class Message { public static final String DROP = "U+005C__DROP__"; - - + // Watermark are at millisecond granularity, hence we use epoch(0) - 1 to indicate watermark is not available. + // EventTimeForDrop is used to indicate that the message is dropped hence, excluded from watermark calculation + private static final Instant EventTimeForDrop = Instant.ofEpochMilli(-1); private final String[] keys; private final byte[] value; private final Instant eventTime; @@ -60,6 +61,10 @@ public Message(byte[] value, Instant eventTime, String[] keys) { * @return returns the Message which will be dropped */ public static Message toDrop() { - return new Message(new byte[0], Instant.MIN, null, new String[]{DROP}); + return new Message( + new byte[0], + EventTimeForDrop, + null, + new String[]{DROP}); } }