Skip to content

Commit

Permalink
feat: add source transformer SDK support (#17)
Browse files Browse the repository at this point in the history
Signed-off-by: Keran Yang <[email protected]>
  • Loading branch information
KeranYang authored Feb 3, 2023
1 parent ed96f42 commit e4e9ef2
Show file tree
Hide file tree
Showing 10 changed files with 249 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ private static Message[] process(String key, Datum data) {
}

public static void main(String[] args) throws IOException {
logger.info("Forward invoked");
new FunctionServer().registerMapper(new MapFunc(EvenOddFunction::process)).start();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package io.numaproj.numaflow.examples.function.eventtimefilter;

import io.numaproj.numaflow.function.Datum;
import io.numaproj.numaflow.function.FunctionServer;
import io.numaproj.numaflow.function.MessageT;
import io.numaproj.numaflow.function.mapt.MapTFunc;

import java.io.IOException;
import java.time.Instant;
import java.util.logging.Logger;

/**
* This is a simple User Defined Function example which receives a message, applies the following
* data transformation, and returns the message.
* <p>
* If the message event time is before year 2022, drop the message. If it's within year 2022, update
* the key to "within_year_2022" and update the message event time to Jan 1st 2022.
* Otherwise, (exclusively after year 2022), update the key to "after_year_2022" and update the
* message event time to Jan 1st 2023.
*/
public class EventTimeFilterFunction {

private static final Logger logger = Logger.getLogger(EventTimeFilterFunction.class.getName());
private static final Instant januaryFirst2022 = Instant.ofEpochMilli(1640995200000L);
private static final Instant januaryFirst2023 = Instant.ofEpochMilli(1672531200000L);

private static MessageT[] process(String key, Datum data) {
Instant eventTime = data.getEventTime();

if (eventTime.isBefore(januaryFirst2022)) {
return new MessageT[]{MessageT.toDrop()};
} else if (eventTime.isBefore(januaryFirst2023)) {
return new MessageT[]{
MessageT.to(
januaryFirst2022,
"within_year_2022",
data.getValue())};
} else {
return new MessageT[]{
MessageT.to(
januaryFirst2023,
"after_year_2022",
data.getValue())};
}
}

public static void main(String[] args) throws IOException {
new FunctionServer()
.registerMapperT(new MapTFunc(EventTimeFilterFunction::process))
.start();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ private static Message[] process(String key, Datum data) {
}

public static void main(String[] args) throws IOException {
logger.info("Flatmap invoked");
new FunctionServer().registerMapper(new MapFunc(FlatMapFunction::process)).start();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.netty.channel.unix.DomainSocketAddress;
import io.numaproj.numaflow.common.GrpcServerConfig;
import io.numaproj.numaflow.function.map.MapHandler;
import io.numaproj.numaflow.function.mapt.MapTHandler;
import io.numaproj.numaflow.function.reduce.ReduceHandler;

import java.io.IOException;
Expand Down Expand Up @@ -63,6 +64,11 @@ public FunctionServer registerMapper(MapHandler mapHandler) {
return this;
}

public FunctionServer registerMapperT(MapTHandler mapTHandler) {
this.functionService.setMapTHandler(mapTHandler);
return this;
}

public FunctionServer registerReducer(ReduceHandler reduceHandler) {
this.functionService.setReduceHandler(reduceHandler);
return this;
Expand Down
59 changes: 58 additions & 1 deletion src/main/java/io/numaproj/numaflow/function/FunctionService.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.google.protobuf.Empty;
import io.grpc.stub.StreamObserver;
import io.numaproj.numaflow.function.map.MapHandler;
import io.numaproj.numaflow.function.mapt.MapTHandler;
import io.numaproj.numaflow.function.metadata.IntervalWindow;
import io.numaproj.numaflow.function.metadata.IntervalWindowImpl;
import io.numaproj.numaflow.function.metadata.Metadata;
Expand All @@ -12,6 +13,7 @@
import io.numaproj.numaflow.function.reduce.ReduceDatumStreamImpl;
import io.numaproj.numaflow.function.reduce.ReduceHandler;
import io.numaproj.numaflow.function.v1.Udfunction;
import io.numaproj.numaflow.function.v1.Udfunction.EventTime;
import io.numaproj.numaflow.function.v1.UserDefinedFunctionGrpc;

import java.time.Instant;
Expand Down Expand Up @@ -40,6 +42,7 @@ class FunctionService extends UserDefinedFunctionGrpc.UserDefinedFunctionImplBas
private final long SHUTDOWN_TIME = 30;

private MapHandler mapHandler;
private MapTHandler mapTHandler;
private ReduceHandler reduceHandler;

public FunctionService() {
Expand All @@ -49,6 +52,10 @@ public void setMapHandler(MapHandler mapHandler) {
this.mapHandler = mapHandler;
}

public void setMapTHandler(MapTHandler mapTHandler) {
this.mapTHandler = mapTHandler;
}

public void setReduceHandler(ReduceHandler reduceHandler) {
this.reduceHandler = reduceHandler;
}
Expand Down Expand Up @@ -78,7 +85,8 @@ public void mapFn(
request.getWatermark().getWatermark().getNanos()),
Instant.ofEpochSecond(
request.getEventTime().getEventTime().getSeconds(),
request.getEventTime().getEventTime().getNanos()), false);
request.getEventTime().getEventTime().getNanos()),
false);

// process Datum
Message[] messages = mapHandler.HandleDo(key, handlerDatum);
Expand All @@ -88,6 +96,39 @@ public void mapFn(
responseObserver.onCompleted();
}

@Override
public void mapTFn(
Udfunction.Datum request,
StreamObserver<Udfunction.DatumList> responseObserver) {
if (this.mapTHandler == null) {
io.grpc.stub.ServerCalls.asyncUnimplementedUnaryCall(
getMapFnMethod(),
responseObserver);
return;
}

// get key from gPRC metadata
String key = Function.DATUM_CONTEXT_KEY.get();

// get Datum from request
HandlerDatum handlerDatum = new HandlerDatum(
request.getValue().toByteArray(),
Instant.ofEpochSecond(
request.getWatermark().getWatermark().getSeconds(),
request.getWatermark().getWatermark().getNanos()),
Instant.ofEpochSecond(
request.getEventTime().getEventTime().getSeconds(),
request.getEventTime().getEventTime().getNanos()),
false);

// process Datum
MessageT[] messageTs = mapTHandler.HandleDo(key, handlerDatum);

// set response
responseObserver.onNext(buildDatumListResponse(messageTs));
responseObserver.onCompleted();
}

/**
* Streams input data to reduceFn and returns the result.
*/
Expand Down Expand Up @@ -227,4 +268,20 @@ private Udfunction.DatumList buildDatumListResponse(Message[] messages) {
});
return datumListBuilder.build();
}

private Udfunction.DatumList buildDatumListResponse(MessageT[] messageTs) {
Udfunction.DatumList.Builder datumListBuilder = Udfunction.DatumList.newBuilder();
Arrays.stream(messageTs).forEach(messageT -> {
datumListBuilder.addElements(Udfunction.Datum.newBuilder()
.setEventTime(EventTime.newBuilder().setEventTime
(com.google.protobuf.Timestamp.newBuilder()
.setSeconds(messageT.getEventTime().getEpochSecond())
.setNanos(messageT.getEventTime().getNano()))
)
.setKey(messageT.getKey())
.setValue(ByteString.copyFrom(messageT.getValue()))
.build());
});
return datumListBuilder.build();
}
}
36 changes: 36 additions & 0 deletions src/main/java/io/numaproj/numaflow/function/MessageT.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package io.numaproj.numaflow.function;

import static io.numaproj.numaflow.function.Message.ALL;
import static io.numaproj.numaflow.function.Message.DROP;

import java.time.Instant;
import lombok.AllArgsConstructor;
import lombok.Getter;

/**
* MessageT is used to wrap the data return by UDF functions. Compared with Message, MessageT
* contains one more field, the event time, usually extracted from the payload.
*/
@AllArgsConstructor
@Getter
public class MessageT {

private Instant eventTime;
private final String key;
private final byte[] value;

// creates a MessageT to be dropped
public static MessageT toDrop() {
return new MessageT(Instant.MIN, DROP, new byte[0]);
}

// creates a MessageT that will forward to all
public static MessageT toAll(Instant eventTime, byte[] value) {
return new MessageT(eventTime, ALL, value);
}

// creates a MessageT that will forward to specified "to"
public static MessageT to(Instant eventTime, String to, byte[] value) {
return new MessageT(eventTime, to, value);
}
}
23 changes: 23 additions & 0 deletions src/main/java/io/numaproj/numaflow/function/mapt/MapTFunc.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.numaproj.numaflow.function.mapt;

import io.numaproj.numaflow.function.Datum;
import io.numaproj.numaflow.function.MessageT;

import java.util.function.BiFunction;

/**
* Implementation of MapTHandler instantiated from a function
*/
public class MapTFunc implements MapTHandler {

private final BiFunction<String, Datum, MessageT[]> mapTFn;

public MapTFunc(BiFunction<String, Datum, MessageT[]> mapTFn) {
this.mapTFn = mapTFn;
}

@Override
public MessageT[] HandleDo(String key, Datum datum) {
return mapTFn.apply(key, datum);
}
}
13 changes: 13 additions & 0 deletions src/main/java/io/numaproj/numaflow/function/mapt/MapTHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.numaproj.numaflow.function.mapt;

import io.numaproj.numaflow.function.Datum;
import io.numaproj.numaflow.function.MessageT;

/**
* Interface of mapT function implementation.
*/
public interface MapTHandler {

// Function to process each coming message
MessageT[] HandleDo(String key, Datum datum);
}
11 changes: 8 additions & 3 deletions src/main/proto/function/v1/udfunction.proto
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,15 @@ import "google/protobuf/empty.proto";
package function.v1;

service UserDefinedFunction {
// Applies a function to each datum element.
// MapFn applies a function to each datum element.
rpc MapFn(Datum) returns (DatumList);

// Applies a reduce function to a datum stream.
// MapTFn applies a function to each datum element.
// In addition to map function, MapTFn also supports assigning a new event time to datum.
// MapTFn can be used only at source vertex by source data transformer.
rpc MapTFn(Datum) returns (DatumList);

// ReduceFn applies a reduce function to a datum stream.
rpc ReduceFn(stream Datum) returns (DatumList);

// IsReady is the heartbeat endpoint for gRPC.
Expand Down Expand Up @@ -53,4 +58,4 @@ message DatumList {
*/
message ReadyResponse {
bool ready = 1;
}
}
Loading

0 comments on commit e4e9ef2

Please sign in to comment.