diff --git a/examples/README.md b/examples/README.md index 26297c73..51e94a98 100644 --- a/examples/README.md +++ b/examples/README.md @@ -1,8 +1,11 @@ ## Build -The sdk artifacts are published as GitHub packages. Check the links below on how to use GitHub packages as dependencies in a Java Project. +The sdk artifacts are published as GitHub packages. Check the links below on how to use GitHub +packages as dependencies in a Java Project. + - [Reference](https://docs.github.com/en/packages/working-with-a-github-packages-registry/working-with-the-apache-maven-registry) - [Reference](https://github.com/orgs/community/discussions/26634#discussioncomment-3252638) + ### Maven users Add this dependency to your project's POM: @@ -27,8 +30,8 @@ compile "io.numaproj.numaflow:numaflow-java:${latest}" ### Examples on how to write UDFs and UDSinks in Java * **User Defined Function(UDF)** - * [Map](src/main/java/io/numaproj/numaflow/examples/function/map) - * [Reduce](src/main/java/io/numaproj/numaflow/examples/function/reduce) + * [Map](src/main/java/io/numaproj/numaflow/examples/function/map) + * [Reduce](src/main/java/io/numaproj/numaflow/examples/function/reduce) * **User Defined Sink(UDSink)** - * [Sink](src/main/java/io/numaproj/numaflow/examples/sink/simple) + * [Sink](src/main/java/io/numaproj/numaflow/examples/sink/simple) diff --git a/examples/src/main/java/io/numaproj/numaflow/examples/function/map/evenodd/EvenOddFunction.java b/examples/src/main/java/io/numaproj/numaflow/examples/function/map/evenodd/EvenOddFunction.java index c5eb910e..c5df020e 100644 --- a/examples/src/main/java/io/numaproj/numaflow/examples/function/map/evenodd/EvenOddFunction.java +++ b/examples/src/main/java/io/numaproj/numaflow/examples/function/map/evenodd/EvenOddFunction.java @@ -3,6 +3,7 @@ import io.numaproj.numaflow.function.Datum; import io.numaproj.numaflow.function.FunctionServer; import io.numaproj.numaflow.function.Message; +import io.numaproj.numaflow.function.MessageList; import io.numaproj.numaflow.function.map.MapHandler; import lombok.extern.slf4j.Slf4j; @@ -18,21 +19,27 @@ @Slf4j public class EvenOddFunction extends MapHandler { - public Message[] processMessage(String[] keys, Datum data) { + public static void main(String[] args) throws IOException { + new FunctionServer().registerMapHandler(new EvenOddFunction()).start(); + } + + public MessageList processMessage(String[] keys, Datum data) { int value = 0; try { value = Integer.parseInt(new String(data.getValue())); } catch (NumberFormatException e) { log.error("Error occurred while parsing int"); - return new Message[]{Message.toDrop()}; - } - if (value % 2 == 0) { - return new Message[]{Message.to(new String[]{"even"}, data.getValue())}; + return MessageList.newBuilder().addMessage(Message.toDrop()).build(); } - return new Message[]{Message.to(new String[]{"odd"}, data.getValue())}; - } - public static void main(String[] args) throws IOException { - new FunctionServer().registerMapHandler(new EvenOddFunction()).start(); + String[] outputKeys = value % 2 == 0 ? new String[]{"even"} : new String[]{"odd"}; + + // tags will be used for conditional forwarding + String[] tags = value % 2 == 0 ? new String[]{"even-tag"} : new String[]{"odd-tag"}; + + return MessageList + .newBuilder() + .addMessage(new Message(data.getValue(), outputKeys, tags)) + .build(); } } diff --git a/examples/src/main/java/io/numaproj/numaflow/examples/function/map/eventtimefilter/EventTimeFilterFunction.java b/examples/src/main/java/io/numaproj/numaflow/examples/function/map/eventtimefilter/EventTimeFilterFunction.java index b4d745e1..ba19b183 100644 --- a/examples/src/main/java/io/numaproj/numaflow/examples/function/map/eventtimefilter/EventTimeFilterFunction.java +++ b/examples/src/main/java/io/numaproj/numaflow/examples/function/map/eventtimefilter/EventTimeFilterFunction.java @@ -3,6 +3,7 @@ import io.numaproj.numaflow.function.Datum; import io.numaproj.numaflow.function.FunctionServer; import io.numaproj.numaflow.function.MessageT; +import io.numaproj.numaflow.function.MessageTList; import io.numaproj.numaflow.function.mapt.MapTHandler; import java.io.IOException; @@ -22,29 +23,34 @@ public class EventTimeFilterFunction extends MapTHandler { private static final Instant januaryFirst2022 = Instant.ofEpochMilli(1640995200000L); private static final Instant januaryFirst2023 = Instant.ofEpochMilli(1672531200000L); - public MessageT[] processMessage(String[] keys, Datum data) { + public static void main(String[] args) throws IOException { + new FunctionServer() + .registerMapTHandler(new EventTimeFilterFunction()) + .start(); + } + + public MessageTList processMessage(String[] keys, Datum data) { Instant eventTime = data.getEventTime(); if (eventTime.isBefore(januaryFirst2022)) { - return new MessageT[]{MessageT.toDrop()}; + return MessageTList.newBuilder().addMessage(MessageT.toDrop()).build(); } else if (eventTime.isBefore(januaryFirst2023)) { - return new MessageT[]{ - MessageT.to( - januaryFirst2022, - new String[]{"within_year_2022"}, - data.getValue())}; + return MessageTList + .newBuilder() + .addMessage( + new MessageT( + data.getValue(), + januaryFirst2022, + new String[]{"within_year_2022"})) + .build(); } else { - return new MessageT[]{ - MessageT.to( + return MessageTList + .newBuilder() + .addMessage(new MessageT( + data.getValue(), januaryFirst2023, - new String[]{"after_year_2022"}, - data.getValue())}; + new String[]{"after_year_2022"})) + .build(); } } - - public static void main(String[] args) throws IOException { - new FunctionServer() - .registerMapTHandler(new EventTimeFilterFunction()) - .start(); - } } diff --git a/examples/src/main/java/io/numaproj/numaflow/examples/function/map/flatmap/FlatMapFunction.java b/examples/src/main/java/io/numaproj/numaflow/examples/function/map/flatmap/FlatMapFunction.java index 09265aa4..d1c215ea 100644 --- a/examples/src/main/java/io/numaproj/numaflow/examples/function/map/flatmap/FlatMapFunction.java +++ b/examples/src/main/java/io/numaproj/numaflow/examples/function/map/flatmap/FlatMapFunction.java @@ -3,6 +3,7 @@ import io.numaproj.numaflow.function.Datum; import io.numaproj.numaflow.function.FunctionServer; import io.numaproj.numaflow.function.Message; +import io.numaproj.numaflow.function.MessageList; import io.numaproj.numaflow.function.map.MapHandler; import java.io.IOException; @@ -16,18 +17,19 @@ public class FlatMapFunction extends MapHandler { - public Message[] processMessage(String[] keys, Datum data) { + public static void main(String[] args) throws IOException { + new FunctionServer().registerMapHandler(new FlatMapFunction()).start(); + } + + public MessageList processMessage(String[] keys, Datum data) { String msg = new String(data.getValue()); String[] strs = msg.split(","); - Message[] results = new Message[strs.length]; + MessageList.MessageListBuilder listBuilder = MessageList.newBuilder(); - for (int i = 0; i < strs.length; i++) { - results[i] = Message.toAll(strs[i].getBytes()); + for (String str : strs) { + listBuilder.addMessage(new Message(str.getBytes())); } - return results; - } - public static void main(String[] args) throws IOException { - new FunctionServer().registerMapHandler(new FlatMapFunction()).start(); + return listBuilder.build(); } } diff --git a/examples/src/main/java/io/numaproj/numaflow/examples/function/map/forward/ForwardFunction.java b/examples/src/main/java/io/numaproj/numaflow/examples/function/map/forward/ForwardFunction.java index a930f4d5..963e8b1e 100644 --- a/examples/src/main/java/io/numaproj/numaflow/examples/function/map/forward/ForwardFunction.java +++ b/examples/src/main/java/io/numaproj/numaflow/examples/function/map/forward/ForwardFunction.java @@ -3,6 +3,7 @@ import io.numaproj.numaflow.function.Datum; import io.numaproj.numaflow.function.FunctionServer; import io.numaproj.numaflow.function.Message; +import io.numaproj.numaflow.function.MessageList; import io.numaproj.numaflow.function.map.MapHandler; import java.io.IOException; @@ -12,11 +13,15 @@ */ public class ForwardFunction extends MapHandler { - public Message[] processMessage(String[] keys, Datum data) { - return new Message[]{Message.toAll(data.getValue())}; - } public static void main(String[] args) throws IOException { new FunctionServer().registerMapHandler(new ForwardFunction()).start(); } + + public MessageList processMessage(String[] keys, Datum data) { + return MessageList + .newBuilder() + .addMessage(new Message(data.getValue())) + .build(); + } } diff --git a/examples/src/main/java/io/numaproj/numaflow/examples/function/reduce/count/EvenOddCounterFactory.java b/examples/src/main/java/io/numaproj/numaflow/examples/function/reduce/count/EvenOddCounterFactory.java index 395dcd19..c8803453 100644 --- a/examples/src/main/java/io/numaproj/numaflow/examples/function/reduce/count/EvenOddCounterFactory.java +++ b/examples/src/main/java/io/numaproj/numaflow/examples/function/reduce/count/EvenOddCounterFactory.java @@ -3,6 +3,7 @@ import io.numaproj.numaflow.function.Datum; import io.numaproj.numaflow.function.FunctionServer; import io.numaproj.numaflow.function.Message; +import io.numaproj.numaflow.function.MessageList; import io.numaproj.numaflow.function.metadata.Metadata; import io.numaproj.numaflow.function.reduce.ReduceHandler; import io.numaproj.numaflow.function.reduce.ReducerFactory; @@ -11,13 +12,18 @@ import java.io.IOException; import java.util.Arrays; -import java.util.Objects; @Slf4j @AllArgsConstructor public class EvenOddCounterFactory extends ReducerFactory { private Config config; + public static void main(String[] args) throws IOException { + log.info("counter udf was invoked"); + Config config = new Config(1, 2); + new FunctionServer().registerReducerFactory(new EvenOddCounterFactory(config)).start(); + } + @Override public EvenOddCounter createReducer() { return new EvenOddCounter(config); @@ -49,7 +55,7 @@ public void addMessage(String[] keys, Datum datum, Metadata md) { } @Override - public Message[] getOutput(String[] keys, Metadata md) { + public MessageList getOutput(String[] keys, Metadata md) { log.info( "even and odd count - {} {}, window - {} {}", evenCount, @@ -57,17 +63,16 @@ public Message[] getOutput(String[] keys, Metadata md) { md.getIntervalWindow().getStartTime().toString(), md.getIntervalWindow().getEndTime().toString()); + byte[] val; if (Arrays.equals(keys, new String[]{"even"})) { - return new Message[]{Message.to(keys, String.valueOf(evenCount).getBytes())}; + val = String.valueOf(evenCount).getBytes(); } else { - return new Message[]{Message.to(keys, String.valueOf(oddCount).getBytes())}; + val = String.valueOf(oddCount).getBytes(); } + return MessageList + .newBuilder() + .addMessage(new Message(val)) + .build(); } } - - public static void main(String[] args) throws IOException { - log.info("counter udf was invoked"); - Config config = new Config(1, 2); - new FunctionServer().registerReducerFactory(new EvenOddCounterFactory(config)).start(); - } } diff --git a/examples/src/main/java/io/numaproj/numaflow/examples/function/reduce/sum/SumFunction.java b/examples/src/main/java/io/numaproj/numaflow/examples/function/reduce/sum/SumFunction.java index cb02c56d..052d2643 100644 --- a/examples/src/main/java/io/numaproj/numaflow/examples/function/reduce/sum/SumFunction.java +++ b/examples/src/main/java/io/numaproj/numaflow/examples/function/reduce/sum/SumFunction.java @@ -2,6 +2,7 @@ import io.numaproj.numaflow.function.Datum; import io.numaproj.numaflow.function.Message; +import io.numaproj.numaflow.function.MessageList; import io.numaproj.numaflow.function.metadata.Metadata; import io.numaproj.numaflow.function.reduce.ReduceHandler; import lombok.extern.slf4j.Slf4j; @@ -22,7 +23,10 @@ public void addMessage(String[] keys, Datum datum, Metadata md) { } @Override - public Message[] getOutput(String[] keys, Metadata md) { - return new Message[]{Message.toAll(String.valueOf(sum).getBytes())}; + public MessageList getOutput(String[] keys, Metadata md) { + return MessageList + .newBuilder() + .addMessage(new Message(String.valueOf(sum).getBytes())) + .build(); } } diff --git a/examples/src/main/java/io/numaproj/numaflow/examples/sink/simple/SimpleSink.java b/examples/src/main/java/io/numaproj/numaflow/examples/sink/simple/SimpleSink.java index 8bf2fc2a..aad8bb1b 100644 --- a/examples/src/main/java/io/numaproj/numaflow/examples/sink/simple/SimpleSink.java +++ b/examples/src/main/java/io/numaproj/numaflow/examples/sink/simple/SimpleSink.java @@ -9,7 +9,6 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; /** @@ -19,6 +18,11 @@ @Slf4j public class SimpleSink extends SinkHandler { + public static void main(String[] args) throws IOException { + new SinkServer().registerSinker(new SimpleSink()).start(); + } + + @Override public List processMessage(SinkDatumStream datumStream) { ArrayList responses = new ArrayList<>(); @@ -28,13 +32,9 @@ public List processMessage(SinkDatumStream datumStream) { if (datum == SinkDatumStream.EOF) { break; } - log.info(Arrays.toString(datum.getValue())); + log.info(new String(datum.getValue())); responses.add(new Response(datum.getId(), true, "")); } return responses; } - - public static void main(String[] args) throws IOException { - new SinkServer().registerSinker(new SimpleSink()).start(); - } } diff --git a/src/main/java/io/numaproj/numaflow/function/FunctionService.java b/src/main/java/io/numaproj/numaflow/function/FunctionService.java index 4e25c4b9..bb7bf320 100644 --- a/src/main/java/io/numaproj/numaflow/function/FunctionService.java +++ b/src/main/java/io/numaproj/numaflow/function/FunctionService.java @@ -12,8 +12,8 @@ import io.numaproj.numaflow.function.metadata.IntervalWindowImpl; import io.numaproj.numaflow.function.metadata.Metadata; import io.numaproj.numaflow.function.metadata.MetadataImpl; -import io.numaproj.numaflow.function.reduce.ReduceSupervisorActor; import io.numaproj.numaflow.function.reduce.ReduceHandler; +import io.numaproj.numaflow.function.reduce.ReduceSupervisorActor; import io.numaproj.numaflow.function.reduce.ReducerFactory; import io.numaproj.numaflow.function.reduce.ShutdownActor; import io.numaproj.numaflow.function.v1.Udfunction; @@ -23,7 +23,7 @@ import lombok.extern.slf4j.Slf4j; import java.time.Instant; -import java.util.Arrays; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -58,8 +58,8 @@ public void setReduceHandler(ReducerFactory reducerFact */ @Override public void mapFn( - Udfunction.Datum request, - StreamObserver responseObserver) { + Udfunction.DatumRequest request, + StreamObserver responseObserver) { if (this.mapHandler == null) { io.grpc.stub.ServerCalls.asyncUnimplementedUnaryCall( getMapFnMethod(), @@ -79,17 +79,19 @@ public void mapFn( ); // process Datum - Message[] messages = mapHandler.processMessage(request.getKeysList().toArray(new String[0]), handlerDatum); + MessageList messageList = mapHandler.processMessage(request + .getKeysList() + .toArray(new String[0]), handlerDatum); // set response - responseObserver.onNext(buildDatumListResponse(messages)); + responseObserver.onNext(buildDatumListResponse(messageList)); responseObserver.onCompleted(); } @Override public void mapTFn( - Udfunction.Datum request, - StreamObserver responseObserver) { + Udfunction.DatumRequest request, + StreamObserver responseObserver) { if (this.mapTHandler == null) { io.grpc.stub.ServerCalls.asyncUnimplementedUnaryCall( @@ -110,10 +112,12 @@ public void mapTFn( ); // process Datum - MessageT[] messageTs = mapTHandler.processMessage(request.getKeysList().toArray(new String[0]), handlerDatum); + MessageTList messageTList = mapTHandler.processMessage(request + .getKeysList() + .toArray(new String[0]), handlerDatum); // set response - responseObserver.onNext(buildDatumListResponse(messageTs)); + responseObserver.onNext(buildDatumListResponse(messageTList)); responseObserver.onCompleted(); } @@ -121,7 +125,7 @@ public void mapTFn( * Streams input data to reduceFn and returns the result. */ @Override - public StreamObserver reduceFn(final StreamObserver responseObserver) { + public StreamObserver reduceFn(final StreamObserver responseObserver) { if (this.reducerFactory == null) { return io.grpc.stub.ServerCalls.asyncUnimplementedStreamingCall( @@ -156,12 +160,16 @@ public StreamObserver reduceFn(final StreamObserver() { + return new StreamObserver() { @Override - public void onNext(Udfunction.Datum datum) { + public void onNext(Udfunction.DatumRequest datum) { // send the message to parent actor, which takes care of distribution. if (!supervisorActor.isTerminated()) { supervisorActor.tell(datum, supervisorActor); @@ -194,27 +202,33 @@ public void isReady(Empty request, StreamObserver resp responseObserver.onCompleted(); } - private Udfunction.DatumList buildDatumListResponse(Message[] messages) { - Udfunction.DatumList.Builder datumListBuilder = Udfunction.DatumList.newBuilder(); - Arrays.stream(messages).forEach(message -> { - datumListBuilder.addElements(Udfunction.Datum.newBuilder() - .addAllKeys(List.of(message.getKeys())) + private Udfunction.DatumResponseList buildDatumListResponse(MessageList messageList) { + Udfunction.DatumResponseList.Builder datumListBuilder = Udfunction.DatumResponseList.newBuilder(); + messageList.getMessages().forEach(message -> { + datumListBuilder.addElements(Udfunction.DatumResponse.newBuilder() .setValue(ByteString.copyFrom(message.getValue())) + .addAllKeys(message.getKeys() + == null ? new ArrayList<>() : List.of(message.getKeys())) + .addAllTags(message.getTags() + == null ? new ArrayList<>() : List.of(message.getTags())) .build()); }); return datumListBuilder.build(); } - private Udfunction.DatumList buildDatumListResponse(MessageT[] messageTs) { - Udfunction.DatumList.Builder datumListBuilder = Udfunction.DatumList.newBuilder(); - Arrays.stream(messageTs).forEach(messageT -> { - datumListBuilder.addElements(Udfunction.Datum.newBuilder() + private Udfunction.DatumResponseList buildDatumListResponse(MessageTList messageTList) { + Udfunction.DatumResponseList.Builder datumListBuilder = Udfunction.DatumResponseList.newBuilder(); + messageTList.getMessages().forEach(messageT -> { + datumListBuilder.addElements(Udfunction.DatumResponse.newBuilder() .setEventTime(EventTime.newBuilder().setEventTime (com.google.protobuf.Timestamp.newBuilder() .setSeconds(messageT.getEventTime().getEpochSecond()) .setNanos(messageT.getEventTime().getNano())) ) - .addAllKeys(List.of(messageT.getKeys())) + .addAllKeys(messageT.getKeys() + == null ? new ArrayList<>() : List.of(messageT.getKeys())) + .addAllTags(messageT.getTags() + == null ? new ArrayList<>() : List.of(messageT.getTags())) .setValue(ByteString.copyFrom(messageT.getValue())) .build()); }); diff --git a/src/main/java/io/numaproj/numaflow/function/Message.java b/src/main/java/io/numaproj/numaflow/function/Message.java index f93745ed..4e00402b 100644 --- a/src/main/java/io/numaproj/numaflow/function/Message.java +++ b/src/main/java/io/numaproj/numaflow/function/Message.java @@ -1,30 +1,38 @@ package io.numaproj.numaflow.function; -import lombok.AccessLevel; -import lombok.AllArgsConstructor; import lombok.Getter; -@AllArgsConstructor(access = AccessLevel.PRIVATE) +/** + * Message is used to wrap the data return by UDF functions. + */ + @Getter public class Message { - public static final String ALL = "U+005C__ALL__"; public static final String DROP = "U+005C__DROP__"; private final String[] keys; private final byte[] value; + private final String[] tags; - // creates a Message to be dropped - public static Message toDrop() { - return new Message(new String[]{DROP}, new byte[0]); + // used to create message with keys, value and tags(used for conditional forwarding) + public Message(byte[] value, String[] keys, String[] tags) { + this.keys = keys; + this.value = value; + this.tags = tags; } - // creates a Message that will forward to all - public static Message toAll(byte[] value) { - return new Message(new String[]{ALL}, value); + // used to create Message with value. + public Message(byte[] value) { + this(value, null, null); } - // creates a Message that will forward to specified "to" - public static Message to(String[] to, byte[] value) { - return new Message(to, value); + // used to create Message with keys and value. + public Message(byte[] value, String[] keys) { + this(value, keys, null); + } + + // creates a Message to be dropped + public static Message toDrop() { + return new Message(null, null, new String[]{DROP}); } } diff --git a/src/main/java/io/numaproj/numaflow/function/MessageList.java b/src/main/java/io/numaproj/numaflow/function/MessageList.java new file mode 100644 index 00000000..03aeec2e --- /dev/null +++ b/src/main/java/io/numaproj/numaflow/function/MessageList.java @@ -0,0 +1,26 @@ +package io.numaproj.numaflow.function; + +import lombok.Builder; +import lombok.Getter; +import lombok.Singular; + +import java.util.Collection; + +/** + * MessageList is used to return the list of Message from UDF + */ + +@Getter +@Builder(builderMethodName = "newBuilder") +public class MessageList { + + @Singular("addMessage") + private Iterable messages; + + public static class MessageListBuilder { + public MessageListBuilder addMessages(Iterable messages) { + this.messages.addAll((Collection) messages); + return this; + } + } +} diff --git a/src/main/java/io/numaproj/numaflow/function/MessageT.java b/src/main/java/io/numaproj/numaflow/function/MessageT.java index e7f0fdd6..cef41828 100644 --- a/src/main/java/io/numaproj/numaflow/function/MessageT.java +++ b/src/main/java/io/numaproj/numaflow/function/MessageT.java @@ -1,38 +1,42 @@ package io.numaproj.numaflow.function; -import lombok.AccessLevel; -import lombok.AllArgsConstructor; import lombok.Getter; import java.time.Instant; -import static io.numaproj.numaflow.function.Message.ALL; import static io.numaproj.numaflow.function.Message.DROP; /** * MessageT is used to wrap the data return by UDF functions. Compared with Message, MessageT * contains one more field, the event time, usually extracted from the payload. */ -@AllArgsConstructor(access = AccessLevel.PRIVATE) @Getter public class MessageT { - - private Instant eventTime; private final String[] keys; private final byte[] value; + private final Instant eventTime; + private final String[] tags; + + // used to create MessageT with eventTime, keys, value and tags. + public MessageT(byte[] value, Instant eventTime, String[] keys, String[] tags) { + this.keys = keys; + this.value = value; + this.tags = tags; + this.eventTime = eventTime; + } - // creates a MessageT to be dropped - public static MessageT toDrop() { - return new MessageT(Instant.MIN, new String[]{DROP}, new byte[0]); + // used to create MessageT with eventTime and value. + public MessageT(byte[] value, Instant eventTime) { + this(value, eventTime, null, null); } - // creates a MessageT that will forward to all - public static MessageT toAll(Instant eventTime, byte[] value) { - return new MessageT(eventTime, new String[]{ALL}, value); + // used to create MessageT with eventTime, keys and value. + public MessageT(byte[] value, Instant eventTime, String[] keys) { + this(value, eventTime, keys, null); } - // creates a MessageT that will forward to specified "to" - public static MessageT to(Instant eventTime, String[] to, byte[] value) { - return new MessageT(eventTime, to, value); + // creates a MessageT to be dropped + public static MessageT toDrop() { + return new MessageT(null, null, null, new String[]{DROP}); } } diff --git a/src/main/java/io/numaproj/numaflow/function/MessageTList.java b/src/main/java/io/numaproj/numaflow/function/MessageTList.java new file mode 100644 index 00000000..4ee4b11e --- /dev/null +++ b/src/main/java/io/numaproj/numaflow/function/MessageTList.java @@ -0,0 +1,26 @@ +package io.numaproj.numaflow.function; + +import lombok.Builder; +import lombok.Getter; +import lombok.Singular; + +import java.util.Collection; + +/** + * MessageTList is used to return the list of MessageT from UDF + */ + +@Getter +@Builder(builderMethodName = "newBuilder") +public class MessageTList { + + @Singular("addMessage") + private Iterable messages; + + public static class MessageTListBuilder { + public MessageTListBuilder addAllMessages(Iterable messages) { + this.messages.addAll((Collection) messages); + return this; + } + } +} diff --git a/src/main/java/io/numaproj/numaflow/function/map/MapHandler.java b/src/main/java/io/numaproj/numaflow/function/map/MapHandler.java index f8432d76..7360cb81 100644 --- a/src/main/java/io/numaproj/numaflow/function/map/MapHandler.java +++ b/src/main/java/io/numaproj/numaflow/function/map/MapHandler.java @@ -1,7 +1,7 @@ package io.numaproj.numaflow.function.map; import io.numaproj.numaflow.function.Datum; -import io.numaproj.numaflow.function.Message; +import io.numaproj.numaflow.function.MessageList; /** * MapHandler exposes method for performing map operation. @@ -14,5 +14,5 @@ public abstract class MapHandler { processMessage will be invoked for each input message. this method will be used for processing messages */ - public abstract Message[] processMessage(String[] key, Datum datum); + public abstract MessageList processMessage(String[] key, Datum datum); } diff --git a/src/main/java/io/numaproj/numaflow/function/mapt/MapTHandler.java b/src/main/java/io/numaproj/numaflow/function/mapt/MapTHandler.java index 97f4ff9f..7d3bb408 100644 --- a/src/main/java/io/numaproj/numaflow/function/mapt/MapTHandler.java +++ b/src/main/java/io/numaproj/numaflow/function/mapt/MapTHandler.java @@ -1,7 +1,7 @@ package io.numaproj.numaflow.function.mapt; import io.numaproj.numaflow.function.Datum; -import io.numaproj.numaflow.function.MessageT; +import io.numaproj.numaflow.function.MessageTList; /** * MapTHandler exposes method for performing transform operation @@ -16,6 +16,6 @@ public abstract class MapTHandler { this method will be used for processing and transforming the messages */ - public abstract MessageT[] processMessage(String[] keys, Datum datum); + public abstract MessageTList processMessage(String[] keys, Datum datum); } diff --git a/src/main/java/io/numaproj/numaflow/function/reduce/ActorResponse.java b/src/main/java/io/numaproj/numaflow/function/reduce/ActorResponse.java index 60bbdcab..b087f050 100644 --- a/src/main/java/io/numaproj/numaflow/function/reduce/ActorResponse.java +++ b/src/main/java/io/numaproj/numaflow/function/reduce/ActorResponse.java @@ -3,6 +3,7 @@ import io.numaproj.numaflow.function.v1.Udfunction; import lombok.AllArgsConstructor; import lombok.Getter; + /* used to store the reduced result from the handler */ @@ -10,5 +11,5 @@ @AllArgsConstructor public class ActorResponse { String[] keys; - Udfunction.DatumList datumList; + Udfunction.DatumResponseList datumList; } diff --git a/src/main/java/io/numaproj/numaflow/function/reduce/ReduceActor.java b/src/main/java/io/numaproj/numaflow/function/reduce/ReduceActor.java index e518445a..3248a026 100644 --- a/src/main/java/io/numaproj/numaflow/function/reduce/ReduceActor.java +++ b/src/main/java/io/numaproj/numaflow/function/reduce/ReduceActor.java @@ -5,12 +5,13 @@ import akka.japi.pf.ReceiveBuilder; import com.google.protobuf.ByteString; import io.numaproj.numaflow.function.HandlerDatum; -import io.numaproj.numaflow.function.Message; +import io.numaproj.numaflow.function.MessageList; import io.numaproj.numaflow.function.metadata.Metadata; import io.numaproj.numaflow.function.v1.Udfunction; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -44,18 +45,22 @@ private void invokeHandler(HandlerDatum handlerDatum) { } private void getResult(String eof) { - Message[] resultMessages = this.groupBy.getOutput(keys, md); + MessageList resultMessages = this.groupBy.getOutput(keys, md); // send the result back to sender(parent actor) getSender().tell(buildDatumListResponse(resultMessages), getSelf()); } - private ActorResponse buildDatumListResponse(Message[] messages) { - Udfunction.DatumList.Builder datumListBuilder = Udfunction.DatumList.newBuilder(); - Arrays.stream(messages).forEach(message -> { - datumListBuilder.addElements(Udfunction.Datum.newBuilder() - .addAllKeys(List.of(message.getKeys())) + private ActorResponse buildDatumListResponse(MessageList messageList) { + Udfunction.DatumResponseList.Builder datumListBuilder = Udfunction.DatumResponseList.newBuilder(); + messageList.getMessages().forEach(message -> { + datumListBuilder.addElements(Udfunction.DatumResponse.newBuilder() .setValue(ByteString.copyFrom(message.getValue())) + .addAllKeys(message.getKeys() == null ? new ArrayList<>() : Arrays.asList( + message.getKeys())) + .addAllTags(message.getTags() == null ? new ArrayList<>() : List.of( + message.getTags())) .build()); + }); return new ActorResponse(this.keys, datumListBuilder.build()); } diff --git a/src/main/java/io/numaproj/numaflow/function/reduce/ReduceHandler.java b/src/main/java/io/numaproj/numaflow/function/reduce/ReduceHandler.java index 4d31bb28..a7ca1f4e 100644 --- a/src/main/java/io/numaproj/numaflow/function/reduce/ReduceHandler.java +++ b/src/main/java/io/numaproj/numaflow/function/reduce/ReduceHandler.java @@ -1,7 +1,7 @@ package io.numaproj.numaflow.function.reduce; import io.numaproj.numaflow.function.Datum; -import io.numaproj.numaflow.function.Message; +import io.numaproj.numaflow.function.MessageList; import io.numaproj.numaflow.function.metadata.Metadata; /** @@ -21,5 +21,5 @@ public abstract class ReduceHandler { getOutput will be invoked at the end of input. It can is used to return the aggregated result. */ - public abstract Message[] getOutput(String[] keys, Metadata md); + public abstract MessageList getOutput(String[] keys, Metadata md); } diff --git a/src/main/java/io/numaproj/numaflow/function/reduce/ReduceSupervisorActor.java b/src/main/java/io/numaproj/numaflow/function/reduce/ReduceSupervisorActor.java index 52394bb2..2c38d504 100644 --- a/src/main/java/io/numaproj/numaflow/function/reduce/ReduceSupervisorActor.java +++ b/src/main/java/io/numaproj/numaflow/function/reduce/ReduceSupervisorActor.java @@ -19,11 +19,9 @@ import scala.collection.Iterable; import java.time.Instant; -import java.util.Comparator; import java.util.HashMap; import java.util.Map; import java.util.Optional; -import java.util.TreeMap; /** * Supervisor actor distributes the messages to actors and handles failure. @@ -34,14 +32,14 @@ public class ReduceSupervisorActor extends AbstractActor { private final ReducerFactory reducerFactory; private final Metadata md; private final ActorRef shutdownActor; - private final StreamObserver responseObserver; + private final StreamObserver responseObserver; private final Map actorsMap = new HashMap<>(); public ReduceSupervisorActor( ReducerFactory reducerFactory, Metadata md, ActorRef shutdownActor, - StreamObserver responseObserver) { + StreamObserver responseObserver) { this.reducerFactory = reducerFactory; this.md = md; this.shutdownActor = shutdownActor; @@ -52,8 +50,13 @@ public static Props props( ReducerFactory reducerFactory, Metadata md, ActorRef shutdownActor, - StreamObserver responseObserver) { - return Props.create(ReduceSupervisorActor.class, reducerFactory, md, shutdownActor, responseObserver); + StreamObserver responseObserver) { + return Props.create( + ReduceSupervisorActor.class, + reducerFactory, + md, + shutdownActor, + responseObserver); } // if there is an uncaught exception stop in the supervisor actor, send a signal to shut down @@ -80,7 +83,7 @@ public void postStop() { public Receive createReceive() { return ReceiveBuilder .create() - .match(Udfunction.Datum.class, this::invokeActors) + .match(Udfunction.DatumRequest.class, this::invokeActors) .match(String.class, this::sendEOF) .match(ActorResponse.class, this::responseListener) .build(); @@ -91,8 +94,8 @@ public Receive createReceive() { if there is no actor for an incoming set of keys, create a new actor track all the child actors using actors map */ - private void invokeActors(Udfunction.Datum datum) { - String[] keys = datum.getKeysList().toArray(new String[0]); + private void invokeActors(Udfunction.DatumRequest datumRequest) { + String[] keys = datumRequest.getKeysList().toArray(new String[0]); String keyStr = String.join(Function.DELIMITTER, keys); if (!actorsMap.containsKey(keyStr)) { ReduceHandler reduceHandler = reducerFactory.createReducer(); @@ -101,7 +104,7 @@ private void invokeActors(Udfunction.Datum datum) { actorsMap.put(keyStr, actorRef); } - HandlerDatum handlerDatum = constructHandlerDatum(datum); + HandlerDatum handlerDatum = constructHandlerDatum(datumRequest); actorsMap.get(keyStr).tell(handlerDatum, getSelf()); } @@ -128,15 +131,15 @@ private void responseListener(ActorResponse actorResponse) { } } - private HandlerDatum constructHandlerDatum(Udfunction.Datum datum) { + private HandlerDatum constructHandlerDatum(Udfunction.DatumRequest datumRequest) { return new HandlerDatum( - datum.getValue().toByteArray(), + datumRequest.getValue().toByteArray(), Instant.ofEpochSecond( - datum.getWatermark().getWatermark().getSeconds(), - datum.getWatermark().getWatermark().getNanos()), + datumRequest.getWatermark().getWatermark().getSeconds(), + datumRequest.getWatermark().getWatermark().getNanos()), Instant.ofEpochSecond( - datum.getEventTime().getEventTime().getSeconds(), - datum.getEventTime().getEventTime().getNanos())); + datumRequest.getEventTime().getEventTime().getSeconds(), + datumRequest.getEventTime().getEventTime().getNanos())); } /* diff --git a/src/main/java/io/numaproj/numaflow/function/reduce/ShutdownActor.java b/src/main/java/io/numaproj/numaflow/function/reduce/ShutdownActor.java index 3a55fadc..2ae66c3c 100644 --- a/src/main/java/io/numaproj/numaflow/function/reduce/ShutdownActor.java +++ b/src/main/java/io/numaproj/numaflow/function/reduce/ShutdownActor.java @@ -2,7 +2,6 @@ import akka.actor.AbstractActor; import akka.actor.AllDeadLetters; -import akka.actor.DeadLetter; import akka.actor.Props; import akka.japi.pf.ReceiveBuilder; import io.grpc.stub.StreamObserver; @@ -21,11 +20,11 @@ @Slf4j @AllArgsConstructor public class ShutdownActor extends AbstractActor { - private StreamObserver responseObserver; + private StreamObserver responseObserver; private final CompletableFuture failureFuture; public static Props props( - StreamObserver responseObserver, + StreamObserver responseObserver, CompletableFuture failureFuture) { return Props.create(ShutdownActor.class, responseObserver, failureFuture); } diff --git a/src/main/java/io/numaproj/numaflow/sink/SinkService.java b/src/main/java/io/numaproj/numaflow/sink/SinkService.java index 87c731ac..b81bdf5a 100644 --- a/src/main/java/io/numaproj/numaflow/sink/SinkService.java +++ b/src/main/java/io/numaproj/numaflow/sink/SinkService.java @@ -35,7 +35,7 @@ public void setSinkHandler(SinkHandler sinkHandler) { * Applies a function to each datum element in the stream. */ @Override - public StreamObserver sinkFn(StreamObserver responseObserver) { + public StreamObserver sinkFn(StreamObserver responseObserver) { if (this.sinkHandler == null) { return io.grpc.stub.ServerCalls.asyncUnimplementedStreamingCall( getMapFnMethod(), @@ -46,9 +46,9 @@ public StreamObserver sinkFn(StreamObserver r Future> result = sinkTaskExecutor.submit(() -> sinkHandler.processMessage( sinkDatumStream)); - return new StreamObserver() { + return new StreamObserver() { @Override - public void onNext(Udsink.Datum d) { + public void onNext(Udsink.DatumRequest d) { // get Datum from request HandlerDatum handlerDatum = new HandlerDatum( d.getKeysList().toArray(new String[0]), diff --git a/src/main/proto/function/v1/udfunction.proto b/src/main/proto/function/v1/udfunction.proto index a5b8429b..a43b15a9 100644 --- a/src/main/proto/function/v1/udfunction.proto +++ b/src/main/proto/function/v1/udfunction.proto @@ -9,15 +9,15 @@ package function.v1; service UserDefinedFunction { // MapFn applies a function to each datum element. - rpc MapFn(Datum) returns (DatumList); + rpc MapFn(DatumRequest) returns (DatumResponseList); // MapTFn applies a function to each datum element. // In addition to map function, MapTFn also supports assigning a new event time to datum. // MapTFn can be used only at source vertex by source data transformer. - rpc MapTFn(Datum) returns (DatumList); + rpc MapTFn(DatumRequest) returns (DatumResponseList); // ReduceFn applies a reduce function to a datum stream. - rpc ReduceFn(stream Datum) returns (stream DatumList); + rpc ReduceFn(stream DatumRequest) returns (stream DatumResponseList); // IsReady is the heartbeat endpoint for gRPC. rpc IsReady(google.protobuf.Empty) returns (ReadyResponse); @@ -37,20 +37,31 @@ message Watermark { } /** - * Datum represents a datum element. + * DatumRequest represents a datum request element. */ -message Datum { +message DatumRequest { repeated string keys = 1; bytes value = 2; EventTime event_time = 3; Watermark watermark = 4; } +/** + * DatumResponse represents a datum response element. + */ +message DatumResponse { + repeated string keys = 1; + bytes value = 2; + EventTime event_time = 3; + Watermark watermark = 4; + repeated string tags = 5; +} + /** * DatumList represents a list of datum elements. */ -message DatumList { - repeated Datum elements = 1; +message DatumResponseList { + repeated DatumResponse elements = 1; } /** diff --git a/src/main/proto/sink/v1/udsink.proto b/src/main/proto/sink/v1/udsink.proto index e2fe01ac..bd5c6df4 100644 --- a/src/main/proto/sink/v1/udsink.proto +++ b/src/main/proto/sink/v1/udsink.proto @@ -9,7 +9,7 @@ package sink.v1; service UserDefinedSink { // SinkFn writes the Datum to a user defined sink. - rpc SinkFn(stream Datum) returns (ResponseList); + rpc SinkFn(stream DatumRequest) returns (ResponseList); // IsReady is the heartbeat endpoint for gRPC. rpc IsReady(google.protobuf.Empty) returns (ReadyResponse); @@ -29,7 +29,7 @@ message Watermark { /** * Datum represents a datum element. */ -message Datum { +message DatumRequest { repeated string keys = 1; bytes value = 2; EventTime event_time = 3; diff --git a/src/test/java/io/numaproj/numaflow/function/FunctionServerTest.java b/src/test/java/io/numaproj/numaflow/function/FunctionServerTest.java index c3e97b86..20444a1d 100644 --- a/src/test/java/io/numaproj/numaflow/function/FunctionServerTest.java +++ b/src/test/java/io/numaproj/numaflow/function/FunctionServerTest.java @@ -38,30 +38,6 @@ public class FunctionServerTest { @Rule public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule(); - - private static class TestMapFn extends MapHandler { - @Override - public Message[] processMessage(String[] keys, Datum datum) { - String[] updatedKeys = Arrays.stream(keys).map(c -> c+PROCESSED_KEY_SUFFIX).toArray(String[]::new); - return new Message[]{Message.to( - updatedKeys, - (new String(datum.getValue()) - + PROCESSED_VALUE_SUFFIX).getBytes())}; - } - } - - private static class TestMapTFn extends MapTHandler { - @Override - public MessageT[] processMessage(String[] keys, Datum datum) { - String[] updatedKeys = Arrays.stream(keys).map(c -> c+PROCESSED_KEY_SUFFIX).toArray(String[]::new); - return new MessageT[]{MessageT.to( - TEST_EVENT_TIME, - updatedKeys, - (new String(datum.getValue()) - + PROCESSED_VALUE_SUFFIX).getBytes())}; - } - } - private FunctionServer server; private ManagedChannel inProcessChannel; @@ -93,13 +69,14 @@ public void tearDown() throws Exception { @Test public void mapper() { ByteString inValue = ByteString.copyFromUtf8("invalue"); - Udfunction.Datum inDatum = Udfunction.Datum + Udfunction.DatumRequest inDatum = Udfunction.DatumRequest .newBuilder() .addAllKeys(List.of("test-map-key")) .setValue(inValue) .build(); - String[] expectedKey = new String[]{"test-map-key" + PROCESSED_KEY_SUFFIX}; + String[] expectedKeys = new String[]{"test-map-key" + PROCESSED_KEY_SUFFIX}; + String[] expectedTags = new String[]{"test-tag"}; ByteString expectedValue = ByteString.copyFromUtf8("invalue" + PROCESSED_VALUE_SUFFIX); @@ -108,20 +85,26 @@ public void mapper() { .mapFn(inDatum); assertEquals(1, actualDatumList.getElementsCount()); - assertEquals(expectedKey, actualDatumList.getElements(0).getKeysList().toArray(new String[0])); + assertEquals( + expectedKeys, + actualDatumList.getElements(0).getKeysList().toArray(new String[0])); assertEquals(expectedValue, actualDatumList.getElements(0).getValue()); + assertEquals( + expectedTags, + actualDatumList.getElements(0).getTagsList().toArray(new String[0])); } @Test public void mapperT() { ByteString inValue = ByteString.copyFromUtf8("invalue"); - Udfunction.Datum inDatum = Udfunction.Datum + Udfunction.DatumRequest inDatum = Udfunction.DatumRequest .newBuilder() .addKeys("test-map-key") .setValue(inValue) .build(); String[] expectedKey = new String[]{"test-map-key" + PROCESSED_KEY_SUFFIX}; + String[] expectedTags = new String[]{"test-tag"}; ByteString expectedValue = ByteString.copyFromUtf8("invalue" + PROCESSED_VALUE_SUFFIX); var stub = UserDefinedFunctionGrpc.newBlockingStub(inProcessChannel); @@ -135,8 +118,13 @@ public void mapperT() { .setSeconds(TEST_EVENT_TIME.getEpochSecond()) .setNanos(TEST_EVENT_TIME.getNano())).build(), actualDatumList.getElements(0).getEventTime()); - assertEquals(expectedKey, actualDatumList.getElements(0).getKeysList().toArray(new String[0])); + assertEquals( + expectedKey, + actualDatumList.getElements(0).getKeysList().toArray(new String[0])); assertEquals(expectedValue, actualDatumList.getElements(0).getValue()); + assertEquals( + expectedTags, + actualDatumList.getElements(0).getTagsList().toArray(new String[0])); } @Test @@ -150,13 +138,13 @@ public void reducerWithOneKey() { //create an output stream observer ReduceOutputStreamObserver outputStreamObserver = new ReduceOutputStreamObserver(); - StreamObserver inputStreamObserver = UserDefinedFunctionGrpc + StreamObserver inputStreamObserver = UserDefinedFunctionGrpc .newStub(inProcessChannel) .withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)) .reduceFn(outputStreamObserver); for (int i = 1; i <= 10; i++) { - Udfunction.Datum inputDatum = Udfunction.Datum.newBuilder() + Udfunction.DatumRequest inputDatum = Udfunction.DatumRequest.newBuilder() .setValue(ByteString.copyFromUtf8(String.valueOf(i))) .addKeys(reduceKey) .build(); @@ -171,8 +159,13 @@ public void reducerWithOneKey() { while (!outputStreamObserver.completed.get()) ; assertEquals(1, outputStreamObserver.resultDatum.get().getElementsCount()); - assertEquals(expectedKeys, outputStreamObserver.resultDatum.get().getElements(0).getKeysList().toArray(new String[0])); - ; + assertEquals( + expectedKeys, + outputStreamObserver.resultDatum + .get() + .getElements(0) + .getKeysList() + .toArray(new String[0])); assertEquals( expectedValue, outputStreamObserver.resultDatum.get().getElements(0).getValue()); @@ -191,7 +184,7 @@ public void reducerWithMultipleKey() { //create an output stream observer ReduceOutputStreamObserver outputStreamObserver = new ReduceOutputStreamObserver(); - StreamObserver inputStreamObserver = UserDefinedFunctionGrpc + StreamObserver inputStreamObserver = UserDefinedFunctionGrpc .newStub(inProcessChannel) .withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)) .reduceFn(outputStreamObserver); @@ -199,7 +192,7 @@ public void reducerWithMultipleKey() { // send messages with 100 different keys for (int j = 0; j < keyCount; j++) { for (int i = 1; i <= 10; i++) { - Udfunction.Datum inputDatum = Udfunction.Datum.newBuilder() + Udfunction.DatumRequest inputDatum = Udfunction.DatumRequest.newBuilder() .addKeys(reduceKey + j) .setValue(ByteString.copyFromUtf8(String.valueOf(i))) .build(); @@ -213,10 +206,47 @@ public void reducerWithMultipleKey() { ByteString expectedValue = ByteString.copyFromUtf8(String.valueOf(55)); while (!outputStreamObserver.completed.get()) ; - Udfunction.DatumList result = outputStreamObserver.resultDatum.get(); + Udfunction.DatumResponseList result = outputStreamObserver.resultDatum.get(); assertEquals(100, result.getElementsCount()); for (int i = 0; i < keyCount; i++) { assertEquals(expectedValue, result.getElements(0).getValue()); } } + + private static class TestMapFn extends MapHandler { + @Override + public MessageList processMessage(String[] keys, Datum datum) { + String[] updatedKeys = Arrays + .stream(keys) + .map(c -> c + PROCESSED_KEY_SUFFIX) + .toArray(String[]::new); + return MessageList + .newBuilder() + .addMessage(new Message( + (new String(datum.getValue()) + + PROCESSED_VALUE_SUFFIX).getBytes(), + updatedKeys, + new String[]{"test-tag"})) + .build(); + } + } + + private static class TestMapTFn extends MapTHandler { + @Override + public MessageTList processMessage(String[] keys, Datum datum) { + String[] updatedKeys = Arrays + .stream(keys) + .map(c -> c + PROCESSED_KEY_SUFFIX) + .toArray(String[]::new); + return MessageTList + .newBuilder() + .addMessage(new MessageT( + (new String(datum.getValue()) + + PROCESSED_VALUE_SUFFIX).getBytes(), + TEST_EVENT_TIME, + updatedKeys, + new String[]{"test-tag"})) + .build(); + } + } } diff --git a/src/test/java/io/numaproj/numaflow/function/ReduceOutputStreamObserver.java b/src/test/java/io/numaproj/numaflow/function/ReduceOutputStreamObserver.java index 2dcd3961..d78978c9 100644 --- a/src/test/java/io/numaproj/numaflow/function/ReduceOutputStreamObserver.java +++ b/src/test/java/io/numaproj/numaflow/function/ReduceOutputStreamObserver.java @@ -7,14 +7,19 @@ import java.util.concurrent.atomic.AtomicReference; @Slf4j -public class ReduceOutputStreamObserver implements StreamObserver { +public class ReduceOutputStreamObserver implements StreamObserver { public AtomicReference completed = new AtomicReference<>(false); - public AtomicReference resultDatum = new AtomicReference<>(Udfunction.DatumList.newBuilder().build()); + public AtomicReference resultDatum = new AtomicReference<>( + Udfunction.DatumResponseList.newBuilder().build()); public Throwable t; @Override - public void onNext(Udfunction.DatumList datum) { - resultDatum.set(resultDatum.get().toBuilder().addAllElements(datum.getElementsList()).build()); + public void onNext(Udfunction.DatumResponseList datum) { + resultDatum.set(resultDatum + .get() + .toBuilder() + .addAllElements(datum.getElementsList()) + .build()); } @Override diff --git a/src/test/java/io/numaproj/numaflow/function/ReduceTestFactory.java b/src/test/java/io/numaproj/numaflow/function/ReduceTestFactory.java index 866e2d21..9c07a154 100644 --- a/src/test/java/io/numaproj/numaflow/function/ReduceTestFactory.java +++ b/src/test/java/io/numaproj/numaflow/function/ReduceTestFactory.java @@ -21,11 +21,15 @@ public void addMessage(String[] keys, Datum datum, Metadata md) { } @Override - public Message[] getOutput(String[] keys, Metadata md) { - String[] updatedKeys = Arrays.stream(keys).map(c -> c+"-processed").toArray(String[]::new); - return new Message[]{Message.to( - updatedKeys, - String.valueOf(sum).getBytes())}; + public MessageList getOutput(String[] keys, Metadata md) { + String[] updatedKeys = Arrays + .stream(keys) + .map(c -> c + "-processed") + .toArray(String[]::new); + return MessageList + .newBuilder() + .addMessage(new Message(String.valueOf(sum).getBytes(), updatedKeys)) + .build(); } } } diff --git a/src/test/java/io/numaproj/numaflow/function/reduce/ReduceSupervisorActorTest.java b/src/test/java/io/numaproj/numaflow/function/reduce/ReduceSupervisorActorTest.java index 54070a13..247b584a 100644 --- a/src/test/java/io/numaproj/numaflow/function/reduce/ReduceSupervisorActorTest.java +++ b/src/test/java/io/numaproj/numaflow/function/reduce/ReduceSupervisorActorTest.java @@ -5,6 +5,7 @@ import com.google.protobuf.ByteString; import io.numaproj.numaflow.function.Datum; import io.numaproj.numaflow.function.Message; +import io.numaproj.numaflow.function.MessageList; import io.numaproj.numaflow.function.ReduceOutputStreamObserver; import io.numaproj.numaflow.function.metadata.IntervalWindowImpl; import io.numaproj.numaflow.function.metadata.Metadata; @@ -27,7 +28,7 @@ public void invokeSingleActor() throws RuntimeException { CompletableFuture completableFuture = new CompletableFuture(); String reduceKey = "reduce-key"; - Udfunction.Datum.Builder inDatumBuilder = Udfunction.Datum. + Udfunction.DatumRequest.Builder inDatumBuilder = Udfunction.DatumRequest. newBuilder().addKeys(reduceKey); ActorRef shutdownActor = actorSystem @@ -46,7 +47,7 @@ public void invokeSingleActor() throws RuntimeException { .props(new TestReducerFactory(), md, shutdownActor, outputStreamObserver)); for (int i = 1; i <= 10; i++) { - Udfunction.Datum inputDatum = inDatumBuilder + Udfunction.DatumRequest inputDatum = inDatumBuilder .addKeys("reduce-test") .setValue(ByteString.copyFromUtf8(String.valueOf(i))) .build(); @@ -85,7 +86,7 @@ public void invokeMultipleActors() throws RuntimeException { new ReduceOutputStreamObserver())); for (int i = 1; i <= 10; i++) { - Udfunction.Datum inputDatum = Udfunction.Datum.newBuilder() + Udfunction.DatumRequest inputDatum = Udfunction.DatumRequest.newBuilder() .addKeys("reduce-test" + i) .setValue(ByteString.copyFromUtf8(String.valueOf(i))) .build(); @@ -117,8 +118,11 @@ public void addMessage(String[] keys, Datum datum, Metadata md) { } @Override - public Message[] getOutput(String[] keys, Metadata md) { - return new Message[]{Message.toAll(String.valueOf(count).getBytes())}; + public MessageList getOutput(String[] keys, Metadata md) { + return MessageList + .newBuilder() + .addMessage(new Message(String.valueOf(count).getBytes())) + .build(); } } } diff --git a/src/test/java/io/numaproj/numaflow/function/reduce/ShutDownActorTest.java b/src/test/java/io/numaproj/numaflow/function/reduce/ShutDownActorTest.java index 64d5d299..4be8968e 100644 --- a/src/test/java/io/numaproj/numaflow/function/reduce/ShutDownActorTest.java +++ b/src/test/java/io/numaproj/numaflow/function/reduce/ShutDownActorTest.java @@ -7,6 +7,7 @@ import com.google.protobuf.ByteString; import io.numaproj.numaflow.function.Datum; import io.numaproj.numaflow.function.Message; +import io.numaproj.numaflow.function.MessageList; import io.numaproj.numaflow.function.ReduceOutputStreamObserver; import io.numaproj.numaflow.function.metadata.IntervalWindowImpl; import io.numaproj.numaflow.function.metadata.Metadata; @@ -29,7 +30,7 @@ public void testFailure() { CompletableFuture completableFuture = new CompletableFuture(); String reduceKey = "reduce-key"; - Udfunction.Datum.Builder inDatumBuilder = Udfunction.Datum. + Udfunction.DatumRequest.Builder inDatumBuilder = Udfunction.DatumRequest. newBuilder().addKeys(reduceKey); ActorRef shutdownActor = actorSystem @@ -49,7 +50,7 @@ public void testFailure() { shutdownActor, new ReduceOutputStreamObserver())); - Udfunction.Datum inputDatum = inDatumBuilder + Udfunction.DatumRequest inputDatum = inDatumBuilder .addKeys("reduce-test") .setValue(ByteString.copyFromUtf8(String.valueOf(1))) .build(); @@ -117,8 +118,11 @@ public void addMessage(String[] keys, Datum datum, Metadata md) { } @Override - public Message[] getOutput(String[] keys, Metadata md) { - return new Message[]{Message.toAll(String.valueOf(count).getBytes())}; + public MessageList getOutput(String[] keys, Metadata md) { + return MessageList + .newBuilder() + .addMessage(new Message(String.valueOf(count).getBytes())) + .build(); } } } diff --git a/src/test/java/io/numaproj/numaflow/sink/SinkServerTest.java b/src/test/java/io/numaproj/numaflow/sink/SinkServerTest.java index 578b03e6..fab5f82d 100644 --- a/src/test/java/io/numaproj/numaflow/sink/SinkServerTest.java +++ b/src/test/java/io/numaproj/numaflow/sink/SinkServerTest.java @@ -19,7 +19,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.function.Function; import java.util.logging.Logger; import static org.junit.Assert.assertEquals; @@ -30,26 +29,6 @@ public class SinkServerTest { private final static String processedIdSuffix = "-id-processed"; @Rule public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule(); - - private static class TestSinkFn extends SinkHandler { - - @Override - public List processMessage(SinkDatumStream datumStream) { - List responses = new ArrayList<>(); - while (true) { - Datum datum = datumStream.ReadMessage(); - // null indicates the end of the input - if (datum == SinkDatumStream.EOF) { - break; - } - - logger.info(Arrays.toString(datum.getValue())); - responses.add(new Response(datum.getId() + processedIdSuffix, true, "")); - } - return responses; - } - } - private SinkServer server; private ManagedChannel inProcessChannel; @@ -76,16 +55,18 @@ public void sinker() { //create an output stream observer SinkOutputStreamObserver outputStreamObserver = new SinkOutputStreamObserver(); - StreamObserver inputStreamObserver = UserDefinedSinkGrpc + StreamObserver inputStreamObserver = UserDefinedSinkGrpc .newStub(inProcessChannel) .sinkFn(outputStreamObserver); - Udsink.Datum.Builder inDatumBuilder = Udsink.Datum.newBuilder().addKeys("sink"); + Udsink.DatumRequest.Builder inDatumBuilder = Udsink.DatumRequest + .newBuilder() + .addKeys("sink"); String actualId = "sink_test_id"; String expectedId = actualId + processedIdSuffix; for (int i = 1; i <= 10; i++) { - Udsink.Datum inputDatum = inDatumBuilder + Udsink.DatumRequest inputDatum = inDatumBuilder .setValue(ByteString.copyFromUtf8(String.valueOf(i))) .setId(actualId) .build(); @@ -99,4 +80,23 @@ public void sinker() { responseList.getResponsesList() .forEach(response -> assertEquals(response.getId(), expectedId)); } + + private static class TestSinkFn extends SinkHandler { + + @Override + public List processMessage(SinkDatumStream datumStream) { + List responses = new ArrayList<>(); + while (true) { + Datum datum = datumStream.ReadMessage(); + // null indicates the end of the input + if (datum == SinkDatumStream.EOF) { + break; + } + + logger.info(Arrays.toString(datum.getValue())); + responses.add(new Response(datum.getId() + processedIdSuffix, true, "")); + } + return responses; + } + } }