Skip to content

Commit

Permalink
feat: Add ID and numDelivered fields to Datum in udf (#33)
Browse files Browse the repository at this point in the history
Signed-off-by: Hao Hao <[email protected]>
  • Loading branch information
xdevxy authored Apr 13, 2023
1 parent b1466e6 commit 53ea86c
Show file tree
Hide file tree
Showing 8 changed files with 63 additions and 4 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.22</version>
<version>1.18.26</version>
<scope>compile</scope>
</dependency>

Expand Down
2 changes: 2 additions & 0 deletions src/main/java/io/numaproj/numaflow/function/Datum.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,6 @@ public interface Datum {
public Instant getEventTime();

public Instant getWatermark();

public DatumMetadata getDatumMetadata();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.numaproj.numaflow.function;

public interface DatumMetadata {
public String getId();

public long getNumDelivered();
}
14 changes: 12 additions & 2 deletions src/main/java/io/numaproj/numaflow/function/FunctionService.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,19 @@ public void mapFn(
}

// get Datum from request
HandlerDatumMetadata handlerDatumMetadata = new HandlerDatumMetadata(
request.getMetadata().getId(),
request.getMetadata().getNumDelivered()
);
HandlerDatum handlerDatum = new HandlerDatum(
request.getValue().toByteArray(),
Instant.ofEpochSecond(
request.getWatermark().getWatermark().getSeconds(),
request.getWatermark().getWatermark().getNanos()),
Instant.ofEpochSecond(
request.getEventTime().getEventTime().getSeconds(),
request.getEventTime().getEventTime().getNanos())
request.getEventTime().getEventTime().getNanos()),
handlerDatumMetadata
);

// process Datum
Expand All @@ -101,14 +106,19 @@ public void mapTFn(
}

// get Datum from request
HandlerDatumMetadata handlerDatumMetadata = new HandlerDatumMetadata(
request.getMetadata().getId(),
request.getMetadata().getNumDelivered()
);
HandlerDatum handlerDatum = new HandlerDatum(
request.getValue().toByteArray(),
Instant.ofEpochSecond(
request.getWatermark().getWatermark().getSeconds(),
request.getWatermark().getWatermark().getNanos()),
Instant.ofEpochSecond(
request.getEventTime().getEventTime().getSeconds(),
request.getEventTime().getEventTime().getNanos())
request.getEventTime().getEventTime().getNanos()),
handlerDatumMetadata
);

// process Datum
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/io/numaproj/numaflow/function/HandlerDatum.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ public class HandlerDatum implements Datum {
private Instant watermark;
private Instant eventTime;

private DatumMetadata datumMetadata;

@Override
public Instant getWatermark() {
return this.watermark;
Expand All @@ -26,4 +28,9 @@ public byte[] getValue() {
public Instant getEventTime() {
return this.eventTime;
}

@Override
public DatumMetadata getDatumMetadata() {
return this.datumMetadata;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package io.numaproj.numaflow.function;

import lombok.AllArgsConstructor;

@AllArgsConstructor
public class HandlerDatumMetadata implements DatumMetadata {
private String id;
private long numDelivered;

public String getId() {
return this.id;
}

public long getNumDelivered() {
return this.numDelivered;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.numaproj.numaflow.function.Function;
import io.numaproj.numaflow.function.FunctionService;
import io.numaproj.numaflow.function.HandlerDatum;
import io.numaproj.numaflow.function.HandlerDatumMetadata;
import io.numaproj.numaflow.function.metadata.Metadata;
import io.numaproj.numaflow.function.v1.Udfunction;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -132,14 +133,20 @@ private void responseListener(ActorResponse actorResponse) {
}

private HandlerDatum constructHandlerDatum(Udfunction.DatumRequest datumRequest) {
HandlerDatumMetadata handlerDatumMetadata = new HandlerDatumMetadata(
datumRequest.getMetadata().getId(),
datumRequest.getMetadata().getNumDelivered()
);
return new HandlerDatum(
datumRequest.getValue().toByteArray(),
Instant.ofEpochSecond(
datumRequest.getWatermark().getWatermark().getSeconds(),
datumRequest.getWatermark().getWatermark().getNanos()),
Instant.ofEpochSecond(
datumRequest.getEventTime().getEventTime().getSeconds(),
datumRequest.getEventTime().getEventTime().getNanos()));
datumRequest.getEventTime().getEventTime().getNanos()),
handlerDatumMetadata
);
}

/*
Expand Down
9 changes: 9 additions & 0 deletions src/main/proto/function/v1/udfunction.proto
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@ message Watermark {
// future we can add LATE, ON_TIME etc.
}

/**
* Metadata of a datum element.
*/
message Metadata {
string id = 1;
uint64 num_delivered = 2;
}

/**
* DatumRequest represents a datum request element.
*/
Expand All @@ -44,6 +52,7 @@ message DatumRequest {
bytes value = 2;
EventTime event_time = 3;
Watermark watermark = 4;
Metadata metadata = 5;
}

/**
Expand Down

0 comments on commit 53ea86c

Please sign in to comment.