From 53ea86c1a5f93acdc8e5be07203df176e3a4f077 Mon Sep 17 00:00:00 2001 From: xdevxy <115589853+xdevxy@users.noreply.github.com> Date: Wed, 12 Apr 2023 21:24:11 -0700 Subject: [PATCH] feat: Add ID and numDelivered fields to Datum in udf (#33) Signed-off-by: Hao Hao --- pom.xml | 2 +- .../io/numaproj/numaflow/function/Datum.java | 2 ++ .../numaflow/function/DatumMetadata.java | 7 +++++++ .../numaflow/function/FunctionService.java | 14 ++++++++++++-- .../numaflow/function/HandlerDatum.java | 7 +++++++ .../numaflow/function/HandlerDatumMetadata.java | 17 +++++++++++++++++ .../function/reduce/ReduceSupervisorActor.java | 9 ++++++++- src/main/proto/function/v1/udfunction.proto | 9 +++++++++ 8 files changed, 63 insertions(+), 4 deletions(-) create mode 100644 src/main/java/io/numaproj/numaflow/function/DatumMetadata.java create mode 100644 src/main/java/io/numaproj/numaflow/function/HandlerDatumMetadata.java diff --git a/pom.xml b/pom.xml index 85302fe2..5e0d2473 100644 --- a/pom.xml +++ b/pom.xml @@ -60,7 +60,7 @@ org.projectlombok lombok - 1.18.22 + 1.18.26 compile diff --git a/src/main/java/io/numaproj/numaflow/function/Datum.java b/src/main/java/io/numaproj/numaflow/function/Datum.java index 0e05d04b..ee5d6c41 100644 --- a/src/main/java/io/numaproj/numaflow/function/Datum.java +++ b/src/main/java/io/numaproj/numaflow/function/Datum.java @@ -8,4 +8,6 @@ public interface Datum { public Instant getEventTime(); public Instant getWatermark(); + + public DatumMetadata getDatumMetadata(); } diff --git a/src/main/java/io/numaproj/numaflow/function/DatumMetadata.java b/src/main/java/io/numaproj/numaflow/function/DatumMetadata.java new file mode 100644 index 00000000..ec5e0206 --- /dev/null +++ b/src/main/java/io/numaproj/numaflow/function/DatumMetadata.java @@ -0,0 +1,7 @@ +package io.numaproj.numaflow.function; + +public interface DatumMetadata { + public String getId(); + + public long getNumDelivered(); +} diff --git a/src/main/java/io/numaproj/numaflow/function/FunctionService.java b/src/main/java/io/numaproj/numaflow/function/FunctionService.java index bb7bf320..e4d76439 100644 --- a/src/main/java/io/numaproj/numaflow/function/FunctionService.java +++ b/src/main/java/io/numaproj/numaflow/function/FunctionService.java @@ -68,6 +68,10 @@ public void mapFn( } // get Datum from request + HandlerDatumMetadata handlerDatumMetadata = new HandlerDatumMetadata( + request.getMetadata().getId(), + request.getMetadata().getNumDelivered() + ); HandlerDatum handlerDatum = new HandlerDatum( request.getValue().toByteArray(), Instant.ofEpochSecond( @@ -75,7 +79,8 @@ public void mapFn( request.getWatermark().getWatermark().getNanos()), Instant.ofEpochSecond( request.getEventTime().getEventTime().getSeconds(), - request.getEventTime().getEventTime().getNanos()) + request.getEventTime().getEventTime().getNanos()), + handlerDatumMetadata ); // process Datum @@ -101,6 +106,10 @@ public void mapTFn( } // get Datum from request + HandlerDatumMetadata handlerDatumMetadata = new HandlerDatumMetadata( + request.getMetadata().getId(), + request.getMetadata().getNumDelivered() + ); HandlerDatum handlerDatum = new HandlerDatum( request.getValue().toByteArray(), Instant.ofEpochSecond( @@ -108,7 +117,8 @@ public void mapTFn( request.getWatermark().getWatermark().getNanos()), Instant.ofEpochSecond( request.getEventTime().getEventTime().getSeconds(), - request.getEventTime().getEventTime().getNanos()) + request.getEventTime().getEventTime().getNanos()), + handlerDatumMetadata ); // process Datum diff --git a/src/main/java/io/numaproj/numaflow/function/HandlerDatum.java b/src/main/java/io/numaproj/numaflow/function/HandlerDatum.java index 7e41ba26..3fe87996 100644 --- a/src/main/java/io/numaproj/numaflow/function/HandlerDatum.java +++ b/src/main/java/io/numaproj/numaflow/function/HandlerDatum.java @@ -12,6 +12,8 @@ public class HandlerDatum implements Datum { private Instant watermark; private Instant eventTime; + private DatumMetadata datumMetadata; + @Override public Instant getWatermark() { return this.watermark; @@ -26,4 +28,9 @@ public byte[] getValue() { public Instant getEventTime() { return this.eventTime; } + + @Override + public DatumMetadata getDatumMetadata() { + return this.datumMetadata; + } } diff --git a/src/main/java/io/numaproj/numaflow/function/HandlerDatumMetadata.java b/src/main/java/io/numaproj/numaflow/function/HandlerDatumMetadata.java new file mode 100644 index 00000000..198a4abd --- /dev/null +++ b/src/main/java/io/numaproj/numaflow/function/HandlerDatumMetadata.java @@ -0,0 +1,17 @@ +package io.numaproj.numaflow.function; + +import lombok.AllArgsConstructor; + +@AllArgsConstructor +public class HandlerDatumMetadata implements DatumMetadata { + private String id; + private long numDelivered; + + public String getId() { + return this.id; + } + + public long getNumDelivered() { + return this.numDelivered; + } +} diff --git a/src/main/java/io/numaproj/numaflow/function/reduce/ReduceSupervisorActor.java b/src/main/java/io/numaproj/numaflow/function/reduce/ReduceSupervisorActor.java index 2c38d504..dcc3a90d 100644 --- a/src/main/java/io/numaproj/numaflow/function/reduce/ReduceSupervisorActor.java +++ b/src/main/java/io/numaproj/numaflow/function/reduce/ReduceSupervisorActor.java @@ -12,6 +12,7 @@ import io.numaproj.numaflow.function.Function; import io.numaproj.numaflow.function.FunctionService; import io.numaproj.numaflow.function.HandlerDatum; +import io.numaproj.numaflow.function.HandlerDatumMetadata; import io.numaproj.numaflow.function.metadata.Metadata; import io.numaproj.numaflow.function.v1.Udfunction; import lombok.extern.slf4j.Slf4j; @@ -132,6 +133,10 @@ private void responseListener(ActorResponse actorResponse) { } private HandlerDatum constructHandlerDatum(Udfunction.DatumRequest datumRequest) { + HandlerDatumMetadata handlerDatumMetadata = new HandlerDatumMetadata( + datumRequest.getMetadata().getId(), + datumRequest.getMetadata().getNumDelivered() + ); return new HandlerDatum( datumRequest.getValue().toByteArray(), Instant.ofEpochSecond( @@ -139,7 +144,9 @@ private HandlerDatum constructHandlerDatum(Udfunction.DatumRequest datumRequest) datumRequest.getWatermark().getWatermark().getNanos()), Instant.ofEpochSecond( datumRequest.getEventTime().getEventTime().getSeconds(), - datumRequest.getEventTime().getEventTime().getNanos())); + datumRequest.getEventTime().getEventTime().getNanos()), + handlerDatumMetadata + ); } /* diff --git a/src/main/proto/function/v1/udfunction.proto b/src/main/proto/function/v1/udfunction.proto index a43b15a9..839eef8b 100644 --- a/src/main/proto/function/v1/udfunction.proto +++ b/src/main/proto/function/v1/udfunction.proto @@ -36,6 +36,14 @@ message Watermark { // future we can add LATE, ON_TIME etc. } +/** + * Metadata of a datum element. + */ +message Metadata { + string id = 1; + uint64 num_delivered = 2; +} + /** * DatumRequest represents a datum request element. */ @@ -44,6 +52,7 @@ message DatumRequest { bytes value = 2; EventTime event_time = 3; Watermark watermark = 4; + Metadata metadata = 5; } /**