diff --git a/examples/pom.xml b/examples/pom.xml
index 44eae2f3..888b9f2d 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -16,7 +16,7 @@
io.numaproj.numaflow
numaflow-java
- 0.3.4
+ 0.4.0
diff --git a/examples/src/main/java/io/numaproj/numaflow/examples/function/map/evenodd/EvenOddFunction.java b/examples/src/main/java/io/numaproj/numaflow/examples/function/map/evenodd/EvenOddFunction.java
index 842372b8..c5eb910e 100644
--- a/examples/src/main/java/io/numaproj/numaflow/examples/function/map/evenodd/EvenOddFunction.java
+++ b/examples/src/main/java/io/numaproj/numaflow/examples/function/map/evenodd/EvenOddFunction.java
@@ -10,7 +10,7 @@
/**
* This is a simple User Defined Function example which receives a message,
- * and attaches a key to the message based on the value, if the value is even
+ * and attaches keys to the message based on the value, if the value is even
* the key will be set as "even" if the value is odd the key will be set as
* "odd"
*/
@@ -18,7 +18,7 @@
@Slf4j
public class EvenOddFunction extends MapHandler {
- public Message[] processMessage(String key, Datum data) {
+ public Message[] processMessage(String[] keys, Datum data) {
int value = 0;
try {
value = Integer.parseInt(new String(data.getValue()));
@@ -27,9 +27,9 @@ public Message[] processMessage(String key, Datum data) {
return new Message[]{Message.toDrop()};
}
if (value % 2 == 0) {
- return new Message[]{Message.to("even", data.getValue())};
+ return new Message[]{Message.to(new String[]{"even"}, data.getValue())};
}
- return new Message[]{Message.to("odd", data.getValue())};
+ return new Message[]{Message.to(new String[]{"odd"}, data.getValue())};
}
public static void main(String[] args) throws IOException {
diff --git a/examples/src/main/java/io/numaproj/numaflow/examples/function/map/eventtimefilter/EventTimeFilterFunction.java b/examples/src/main/java/io/numaproj/numaflow/examples/function/map/eventtimefilter/EventTimeFilterFunction.java
index d3db52b6..b4d745e1 100644
--- a/examples/src/main/java/io/numaproj/numaflow/examples/function/map/eventtimefilter/EventTimeFilterFunction.java
+++ b/examples/src/main/java/io/numaproj/numaflow/examples/function/map/eventtimefilter/EventTimeFilterFunction.java
@@ -22,7 +22,7 @@ public class EventTimeFilterFunction extends MapTHandler {
private static final Instant januaryFirst2022 = Instant.ofEpochMilli(1640995200000L);
private static final Instant januaryFirst2023 = Instant.ofEpochMilli(1672531200000L);
- public MessageT[] processMessage(String key, Datum data) {
+ public MessageT[] processMessage(String[] keys, Datum data) {
Instant eventTime = data.getEventTime();
if (eventTime.isBefore(januaryFirst2022)) {
@@ -31,13 +31,13 @@ public MessageT[] processMessage(String key, Datum data) {
return new MessageT[]{
MessageT.to(
januaryFirst2022,
- "within_year_2022",
+ new String[]{"within_year_2022"},
data.getValue())};
} else {
return new MessageT[]{
MessageT.to(
januaryFirst2023,
- "after_year_2022",
+ new String[]{"after_year_2022"},
data.getValue())};
}
}
diff --git a/examples/src/main/java/io/numaproj/numaflow/examples/function/map/flatmap/FlatMapFunction.java b/examples/src/main/java/io/numaproj/numaflow/examples/function/map/flatmap/FlatMapFunction.java
index 5fc8e1f1..09265aa4 100644
--- a/examples/src/main/java/io/numaproj/numaflow/examples/function/map/flatmap/FlatMapFunction.java
+++ b/examples/src/main/java/io/numaproj/numaflow/examples/function/map/flatmap/FlatMapFunction.java
@@ -16,7 +16,7 @@
public class FlatMapFunction extends MapHandler {
- public Message[] processMessage(String key, Datum data) {
+ public Message[] processMessage(String[] keys, Datum data) {
String msg = new String(data.getValue());
String[] strs = msg.split(",");
Message[] results = new Message[strs.length];
diff --git a/examples/src/main/java/io/numaproj/numaflow/examples/function/map/forward/ForwardFunction.java b/examples/src/main/java/io/numaproj/numaflow/examples/function/map/forward/ForwardFunction.java
index 26797b1e..a930f4d5 100644
--- a/examples/src/main/java/io/numaproj/numaflow/examples/function/map/forward/ForwardFunction.java
+++ b/examples/src/main/java/io/numaproj/numaflow/examples/function/map/forward/ForwardFunction.java
@@ -12,7 +12,7 @@
*/
public class ForwardFunction extends MapHandler {
- public Message[] processMessage(String key, Datum data) {
+ public Message[] processMessage(String[] keys, Datum data) {
return new Message[]{Message.toAll(data.getValue())};
}
diff --git a/examples/src/main/java/io/numaproj/numaflow/examples/function/reduce/count/EvenOddCounterFactory.java b/examples/src/main/java/io/numaproj/numaflow/examples/function/reduce/count/EvenOddCounterFactory.java
index 9ac42a2d..395dcd19 100644
--- a/examples/src/main/java/io/numaproj/numaflow/examples/function/reduce/count/EvenOddCounterFactory.java
+++ b/examples/src/main/java/io/numaproj/numaflow/examples/function/reduce/count/EvenOddCounterFactory.java
@@ -10,6 +10,7 @@
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
+import java.util.Arrays;
import java.util.Objects;
@Slf4j
@@ -33,7 +34,7 @@ public EvenOddCounter(Config config) {
}
@Override
- public void addMessage(String key, Datum datum, Metadata md) {
+ public void addMessage(String[] keys, Datum datum, Metadata md) {
try {
int val = Integer.parseInt(new String(datum.getValue()));
// increment based on the value specified in the config
@@ -48,7 +49,7 @@ public void addMessage(String key, Datum datum, Metadata md) {
}
@Override
- public Message[] getOutput(String key, Metadata md) {
+ public Message[] getOutput(String[] keys, Metadata md) {
log.info(
"even and odd count - {} {}, window - {} {}",
evenCount,
@@ -56,10 +57,10 @@ public Message[] getOutput(String key, Metadata md) {
md.getIntervalWindow().getStartTime().toString(),
md.getIntervalWindow().getEndTime().toString());
- if (Objects.equals(key, "even")) {
- return new Message[]{Message.to(key, String.valueOf(evenCount).getBytes())};
+ if (Arrays.equals(keys, new String[]{"even"})) {
+ return new Message[]{Message.to(keys, String.valueOf(evenCount).getBytes())};
} else {
- return new Message[]{Message.to(key, String.valueOf(oddCount).getBytes())};
+ return new Message[]{Message.to(keys, String.valueOf(oddCount).getBytes())};
}
}
}
diff --git a/examples/src/main/java/io/numaproj/numaflow/examples/function/reduce/sum/SumFunction.java b/examples/src/main/java/io/numaproj/numaflow/examples/function/reduce/sum/SumFunction.java
index 5d64891d..cb02c56d 100644
--- a/examples/src/main/java/io/numaproj/numaflow/examples/function/reduce/sum/SumFunction.java
+++ b/examples/src/main/java/io/numaproj/numaflow/examples/function/reduce/sum/SumFunction.java
@@ -13,7 +13,7 @@ public class SumFunction extends ReduceHandler {
private int sum = 0;
@Override
- public void addMessage(String key, Datum datum, Metadata md) {
+ public void addMessage(String[] keys, Datum datum, Metadata md) {
try {
sum += Integer.parseInt(new String(datum.getValue()));
} catch (NumberFormatException e) {
@@ -22,7 +22,7 @@ public void addMessage(String key, Datum datum, Metadata md) {
}
@Override
- public Message[] getOutput(String key, Metadata md) {
+ public Message[] getOutput(String[] keys, Metadata md) {
return new Message[]{Message.toAll(String.valueOf(sum).getBytes())};
}
}
diff --git a/pom.xml b/pom.xml
index d5a64efc..2e43451e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
io.numaproj.numaflow
numaflow-java
- 0.3.4
+ 0.4.0
jar
numaflow-java
diff --git a/src/main/java/io/numaproj/numaflow/function/Function.java b/src/main/java/io/numaproj/numaflow/function/Function.java
index 41f5c343..03a85035 100644
--- a/src/main/java/io/numaproj/numaflow/function/Function.java
+++ b/src/main/java/io/numaproj/numaflow/function/Function.java
@@ -8,8 +8,6 @@ public class Function {
public static final int DEFAULT_MESSAGE_SIZE = 1024 * 1024 * 4;
- public static final String DATUM_KEY = "x-numaflow-datum-key";
-
public static final String WIN_START_KEY = "x-numaflow-win-start-time";
public static final String WIN_END_KEY = "x-numaflow-win-end-time";
@@ -18,9 +16,7 @@ public class Function {
public static final String SUCCESS = "SUCCESS";
- public static final Metadata.Key DATUM_METADATA_KEY = Metadata.Key.of(
- Function.DATUM_KEY,
- Metadata.ASCII_STRING_MARSHALLER);
+ public static final String DELIMITTER = ":";
public static final Metadata.Key DATUM_METADATA_WIN_START = Metadata.Key.of(
Function.WIN_START_KEY,
@@ -30,10 +26,6 @@ public class Function {
Function.WIN_END_KEY,
Metadata.ASCII_STRING_MARSHALLER);
- public static final Context.Key DATUM_CONTEXT_KEY = Context.keyWithDefault(
- Function.DATUM_KEY,
- "");
-
public static final Context.Key WINDOW_START_TIME = Context.keyWithDefault(
Function.WIN_START_KEY,
"");
diff --git a/src/main/java/io/numaproj/numaflow/function/FunctionServer.java b/src/main/java/io/numaproj/numaflow/function/FunctionServer.java
index afa07906..8e6e8aea 100644
--- a/src/main/java/io/numaproj/numaflow/function/FunctionServer.java
+++ b/src/main/java/io/numaproj/numaflow/function/FunctionServer.java
@@ -99,8 +99,6 @@ public ServerCall.Listener interceptCall(
final var context =
Context.current().withValues(
- Function.DATUM_CONTEXT_KEY,
- headers.get(Function.DATUM_METADATA_KEY),
Function.WINDOW_START_TIME,
headers.get(Function.DATUM_METADATA_WIN_START),
Function.WINDOW_END_TIME,
diff --git a/src/main/java/io/numaproj/numaflow/function/FunctionService.java b/src/main/java/io/numaproj/numaflow/function/FunctionService.java
index d681a7bc..4e25c4b9 100644
--- a/src/main/java/io/numaproj/numaflow/function/FunctionService.java
+++ b/src/main/java/io/numaproj/numaflow/function/FunctionService.java
@@ -3,7 +3,6 @@
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.AllDeadLetters;
-import akka.actor.DeadLetter;
import com.google.protobuf.ByteString;
import com.google.protobuf.Empty;
import io.grpc.stub.StreamObserver;
@@ -25,6 +24,7 @@
import java.time.Instant;
import java.util.Arrays;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
import static io.numaproj.numaflow.function.Function.EOF;
@@ -67,9 +67,6 @@ public void mapFn(
return;
}
- // get key from gPRC metadata
- String key = Function.DATUM_CONTEXT_KEY.get();
-
// get Datum from request
HandlerDatum handlerDatum = new HandlerDatum(
request.getValue().toByteArray(),
@@ -82,7 +79,7 @@ public void mapFn(
);
// process Datum
- Message[] messages = mapHandler.processMessage(key, handlerDatum);
+ Message[] messages = mapHandler.processMessage(request.getKeysList().toArray(new String[0]), handlerDatum);
// set response
responseObserver.onNext(buildDatumListResponse(messages));
@@ -101,9 +98,6 @@ public void mapTFn(
return;
}
- // get key from gPRC metadata
- String key = Function.DATUM_CONTEXT_KEY.get();
-
// get Datum from request
HandlerDatum handlerDatum = new HandlerDatum(
request.getValue().toByteArray(),
@@ -116,7 +110,7 @@ public void mapTFn(
);
// process Datum
- MessageT[] messageTs = mapTHandler.processMessage(key, handlerDatum);
+ MessageT[] messageTs = mapTHandler.processMessage(request.getKeysList().toArray(new String[0]), handlerDatum);
// set response
responseObserver.onNext(buildDatumListResponse(messageTs));
@@ -159,7 +153,7 @@ public StreamObserver reduceFn(final StreamObserver {
datumListBuilder.addElements(Udfunction.Datum.newBuilder()
- .setKey(message.getKey())
+ .addAllKeys(List.of(message.getKeys()))
.setValue(ByteString.copyFrom(message.getValue()))
.build());
});
@@ -220,7 +214,7 @@ private Udfunction.DatumList buildDatumListResponse(MessageT[] messageTs) {
.setSeconds(messageT.getEventTime().getEpochSecond())
.setNanos(messageT.getEventTime().getNano()))
)
- .setKey(messageT.getKey())
+ .addAllKeys(List.of(messageT.getKeys()))
.setValue(ByteString.copyFrom(messageT.getValue()))
.build());
});
diff --git a/src/main/java/io/numaproj/numaflow/function/Message.java b/src/main/java/io/numaproj/numaflow/function/Message.java
index 1e9e10ef..f93745ed 100644
--- a/src/main/java/io/numaproj/numaflow/function/Message.java
+++ b/src/main/java/io/numaproj/numaflow/function/Message.java
@@ -10,21 +10,21 @@ public class Message {
public static final String ALL = "U+005C__ALL__";
public static final String DROP = "U+005C__DROP__";
- private final String key;
+ private final String[] keys;
private final byte[] value;
// creates a Message to be dropped
public static Message toDrop() {
- return new Message(DROP, new byte[0]);
+ return new Message(new String[]{DROP}, new byte[0]);
}
// creates a Message that will forward to all
public static Message toAll(byte[] value) {
- return new Message(ALL, value);
+ return new Message(new String[]{ALL}, value);
}
// creates a Message that will forward to specified "to"
- public static Message to(String to, byte[] value) {
+ public static Message to(String[] to, byte[] value) {
return new Message(to, value);
}
}
diff --git a/src/main/java/io/numaproj/numaflow/function/MessageT.java b/src/main/java/io/numaproj/numaflow/function/MessageT.java
index c9f78467..e7f0fdd6 100644
--- a/src/main/java/io/numaproj/numaflow/function/MessageT.java
+++ b/src/main/java/io/numaproj/numaflow/function/MessageT.java
@@ -18,21 +18,21 @@
public class MessageT {
private Instant eventTime;
- private final String key;
+ private final String[] keys;
private final byte[] value;
// creates a MessageT to be dropped
public static MessageT toDrop() {
- return new MessageT(Instant.MIN, DROP, new byte[0]);
+ return new MessageT(Instant.MIN, new String[]{DROP}, new byte[0]);
}
// creates a MessageT that will forward to all
public static MessageT toAll(Instant eventTime, byte[] value) {
- return new MessageT(eventTime, ALL, value);
+ return new MessageT(eventTime, new String[]{ALL}, value);
}
// creates a MessageT that will forward to specified "to"
- public static MessageT to(Instant eventTime, String to, byte[] value) {
+ public static MessageT to(Instant eventTime, String[] to, byte[] value) {
return new MessageT(eventTime, to, value);
}
}
diff --git a/src/main/java/io/numaproj/numaflow/function/map/MapHandler.java b/src/main/java/io/numaproj/numaflow/function/map/MapHandler.java
index d1753e38..f8432d76 100644
--- a/src/main/java/io/numaproj/numaflow/function/map/MapHandler.java
+++ b/src/main/java/io/numaproj/numaflow/function/map/MapHandler.java
@@ -14,5 +14,5 @@ public abstract class MapHandler {
processMessage will be invoked for each input message.
this method will be used for processing messages
*/
- public abstract Message[] processMessage(String key, Datum datum);
+ public abstract Message[] processMessage(String[] key, Datum datum);
}
diff --git a/src/main/java/io/numaproj/numaflow/function/mapt/MapTHandler.java b/src/main/java/io/numaproj/numaflow/function/mapt/MapTHandler.java
index 7298d58a..97f4ff9f 100644
--- a/src/main/java/io/numaproj/numaflow/function/mapt/MapTHandler.java
+++ b/src/main/java/io/numaproj/numaflow/function/mapt/MapTHandler.java
@@ -16,6 +16,6 @@ public abstract class MapTHandler {
this method will be used for processing and transforming
the messages
*/
- public abstract MessageT[] processMessage(String key, Datum datum);
+ public abstract MessageT[] processMessage(String[] keys, Datum datum);
}
diff --git a/src/main/java/io/numaproj/numaflow/function/reduce/ActorResponse.java b/src/main/java/io/numaproj/numaflow/function/reduce/ActorResponse.java
index 654ec632..60bbdcab 100644
--- a/src/main/java/io/numaproj/numaflow/function/reduce/ActorResponse.java
+++ b/src/main/java/io/numaproj/numaflow/function/reduce/ActorResponse.java
@@ -9,6 +9,6 @@
@Getter
@AllArgsConstructor
public class ActorResponse {
- String key;
+ String[] keys;
Udfunction.DatumList datumList;
}
diff --git a/src/main/java/io/numaproj/numaflow/function/reduce/ReduceActor.java b/src/main/java/io/numaproj/numaflow/function/reduce/ReduceActor.java
index 3f239ac2..e518445a 100644
--- a/src/main/java/io/numaproj/numaflow/function/reduce/ReduceActor.java
+++ b/src/main/java/io/numaproj/numaflow/function/reduce/ReduceActor.java
@@ -12,6 +12,7 @@
import lombok.extern.slf4j.Slf4j;
import java.util.Arrays;
+import java.util.List;
/**
* Reduce actor invokes the user defined code and returns the result.
@@ -21,12 +22,12 @@
@AllArgsConstructor
public class ReduceActor extends AbstractActor {
- private String key;
+ private String[] keys;
private Metadata md;
private ReduceHandler groupBy;
- public static Props props(String key, Metadata md, ReduceHandler groupBy) {
- return Props.create(ReduceActor.class, key, md, groupBy);
+ public static Props props(String[] keys, Metadata md, ReduceHandler groupBy) {
+ return Props.create(ReduceActor.class, keys, md, groupBy);
}
@Override
@@ -39,11 +40,11 @@ public Receive createReceive() {
}
private void invokeHandler(HandlerDatum handlerDatum) {
- this.groupBy.addMessage(key, handlerDatum, md);
+ this.groupBy.addMessage(keys, handlerDatum, md);
}
private void getResult(String eof) {
- Message[] resultMessages = this.groupBy.getOutput(key, md);
+ Message[] resultMessages = this.groupBy.getOutput(keys, md);
// send the result back to sender(parent actor)
getSender().tell(buildDatumListResponse(resultMessages), getSelf());
}
@@ -52,11 +53,11 @@ private ActorResponse buildDatumListResponse(Message[] messages) {
Udfunction.DatumList.Builder datumListBuilder = Udfunction.DatumList.newBuilder();
Arrays.stream(messages).forEach(message -> {
datumListBuilder.addElements(Udfunction.Datum.newBuilder()
- .setKey(message.getKey())
+ .addAllKeys(List.of(message.getKeys()))
.setValue(ByteString.copyFrom(message.getValue()))
.build());
});
- return new ActorResponse(this.key, datumListBuilder.build());
+ return new ActorResponse(this.keys, datumListBuilder.build());
}
}
diff --git a/src/main/java/io/numaproj/numaflow/function/reduce/ReduceHandler.java b/src/main/java/io/numaproj/numaflow/function/reduce/ReduceHandler.java
index 2da11f50..4d31bb28 100644
--- a/src/main/java/io/numaproj/numaflow/function/reduce/ReduceHandler.java
+++ b/src/main/java/io/numaproj/numaflow/function/reduce/ReduceHandler.java
@@ -13,13 +13,13 @@ public abstract class ReduceHandler {
/*
addMessage will be invoked for each input message.
It can be used to read the input data from datum and
- update the result for a given key.
+ update the result for given keys.
*/
- public abstract void addMessage(String key, Datum datum, Metadata md);
+ public abstract void addMessage(String[] keys, Datum datum, Metadata md);
/*
getOutput will be invoked at the end of input.
It can is used to return the aggregated result.
*/
- public abstract Message[] getOutput(String key, Metadata md);
+ public abstract Message[] getOutput(String[] keys, Metadata md);
}
diff --git a/src/main/java/io/numaproj/numaflow/function/reduce/ReduceSupervisorActor.java b/src/main/java/io/numaproj/numaflow/function/reduce/ReduceSupervisorActor.java
index 9c97b4ce..52394bb2 100644
--- a/src/main/java/io/numaproj/numaflow/function/reduce/ReduceSupervisorActor.java
+++ b/src/main/java/io/numaproj/numaflow/function/reduce/ReduceSupervisorActor.java
@@ -17,14 +17,13 @@
import lombok.extern.slf4j.Slf4j;
import scala.PartialFunction;
import scala.collection.Iterable;
-import scala.concurrent.Future;
import java.time.Instant;
-import java.util.ArrayList;
+import java.util.Comparator;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.TreeMap;
/**
* Supervisor actor distributes the messages to actors and handles failure.
@@ -37,7 +36,6 @@ public class ReduceSupervisorActor extends AbstractActor {
private final ActorRef shutdownActor;
private final StreamObserver responseObserver;
private final Map actorsMap = new HashMap<>();
- private final List> results = new ArrayList<>();
public ReduceSupervisorActor(
ReducerFactory extends ReduceHandler> reducerFactory,
@@ -89,22 +87,22 @@ public Receive createReceive() {
}
/*
- based on key of the input message invoke the right actor
- if there is no actor for an incoming key, create a new actor
+ based on the keys of the input message invoke the right actor
+ if there is no actor for an incoming set of keys, create a new actor
track all the child actors using actors map
*/
private void invokeActors(Udfunction.Datum datum) {
- if (!actorsMap.containsKey(datum.getKey())) {
-
+ String[] keys = datum.getKeysList().toArray(new String[0]);
+ String keyStr = String.join(Function.DELIMITTER, keys);
+ if (!actorsMap.containsKey(keyStr)) {
ReduceHandler reduceHandler = reducerFactory.createReducer();
-
ActorRef actorRef = getContext()
- .actorOf(ReduceActor.props(datum.getKey(), md, reduceHandler));
+ .actorOf(ReduceActor.props(keys, md, reduceHandler));
- actorsMap.put(datum.getKey(), actorRef);
+ actorsMap.put(keyStr, actorRef);
}
HandlerDatum handlerDatum = constructHandlerDatum(datum);
- actorsMap.get(datum.getKey()).tell(handlerDatum, getSelf());
+ actorsMap.get(keyStr).tell(handlerDatum, getSelf());
}
private void sendEOF(String EOF) {
@@ -123,7 +121,7 @@ private void responseListener(ActorResponse actorResponse) {
*/
responseObserver.onNext(actorResponse.getDatumList());
- actorsMap.remove(actorResponse.getKey());
+ actorsMap.remove(String.join(Function.DELIMITTER, actorResponse.getKeys()));
if (actorsMap.isEmpty()) {
responseObserver.onCompleted();
getContext().getSystem().stop(getSelf());
diff --git a/src/main/java/io/numaproj/numaflow/sink/Datum.java b/src/main/java/io/numaproj/numaflow/sink/Datum.java
index 6133642d..aa45e76c 100644
--- a/src/main/java/io/numaproj/numaflow/sink/Datum.java
+++ b/src/main/java/io/numaproj/numaflow/sink/Datum.java
@@ -3,6 +3,8 @@
import java.time.Instant;
public interface Datum {
+ public abstract String[] getKeys();
+
public abstract byte[] getValue();
public abstract Instant getEventTime();
diff --git a/src/main/java/io/numaproj/numaflow/sink/HandlerDatum.java b/src/main/java/io/numaproj/numaflow/sink/HandlerDatum.java
index 6eef0586..86cd1308 100644
--- a/src/main/java/io/numaproj/numaflow/sink/HandlerDatum.java
+++ b/src/main/java/io/numaproj/numaflow/sink/HandlerDatum.java
@@ -7,6 +7,7 @@
@AllArgsConstructor
public class HandlerDatum implements Datum {
+ private String[] keys;
private byte[] value;
private Instant watermark;
private Instant eventTime;
@@ -15,7 +16,12 @@ public class HandlerDatum implements Datum {
// poison packet for reduce stream, to indicate EOF
public static HandlerDatum EOF() {
- return new HandlerDatum(null, null, null, null, true);
+ return new HandlerDatum(null, null, null, null, null, true);
+ }
+
+ @Override
+ public String[] getKeys() {
+ return keys;
}
@Override
diff --git a/src/main/java/io/numaproj/numaflow/sink/SinkService.java b/src/main/java/io/numaproj/numaflow/sink/SinkService.java
index 12b81c2c..87c731ac 100644
--- a/src/main/java/io/numaproj/numaflow/sink/SinkService.java
+++ b/src/main/java/io/numaproj/numaflow/sink/SinkService.java
@@ -51,6 +51,7 @@ public StreamObserver sinkFn(StreamObserver r
public void onNext(Udsink.Datum d) {
// get Datum from request
HandlerDatum handlerDatum = new HandlerDatum(
+ d.getKeysList().toArray(new String[0]),
d.getValue().toByteArray(),
Instant.ofEpochSecond(
d.getWatermark().getWatermark().getSeconds(),
diff --git a/src/main/proto/function/v1/udfunction.proto b/src/main/proto/function/v1/udfunction.proto
index a24b34cc..a5b8429b 100644
--- a/src/main/proto/function/v1/udfunction.proto
+++ b/src/main/proto/function/v1/udfunction.proto
@@ -40,7 +40,7 @@ message Watermark {
* Datum represents a datum element.
*/
message Datum {
- string key = 1;
+ repeated string keys = 1;
bytes value = 2;
EventTime event_time = 3;
Watermark watermark = 4;
diff --git a/src/main/proto/sink/v1/udsink.proto b/src/main/proto/sink/v1/udsink.proto
index 8fc7ecf2..e2fe01ac 100644
--- a/src/main/proto/sink/v1/udsink.proto
+++ b/src/main/proto/sink/v1/udsink.proto
@@ -30,7 +30,7 @@ message Watermark {
* Datum represents a datum element.
*/
message Datum {
- string key = 1;
+ repeated string keys = 1;
bytes value = 2;
EventTime event_time = 3;
Watermark watermark = 4;
diff --git a/src/test/java/io/numaproj/numaflow/function/FunctionServerTest.java b/src/test/java/io/numaproj/numaflow/function/FunctionServerTest.java
index 402e8982..c3e97b86 100644
--- a/src/test/java/io/numaproj/numaflow/function/FunctionServerTest.java
+++ b/src/test/java/io/numaproj/numaflow/function/FunctionServerTest.java
@@ -22,8 +22,9 @@
import org.junit.runners.JUnit4;
import java.time.Instant;
+import java.util.Arrays;
+import java.util.List;
-import static io.numaproj.numaflow.function.Function.DATUM_KEY;
import static io.numaproj.numaflow.function.Function.WIN_END_KEY;
import static io.numaproj.numaflow.function.Function.WIN_START_KEY;
import static org.junit.Assert.assertEquals;
@@ -40,9 +41,10 @@ public class FunctionServerTest {
private static class TestMapFn extends MapHandler {
@Override
- public Message[] processMessage(String key, Datum datum) {
+ public Message[] processMessage(String[] keys, Datum datum) {
+ String[] updatedKeys = Arrays.stream(keys).map(c -> c+PROCESSED_KEY_SUFFIX).toArray(String[]::new);
return new Message[]{Message.to(
- key + PROCESSED_KEY_SUFFIX,
+ updatedKeys,
(new String(datum.getValue())
+ PROCESSED_VALUE_SUFFIX).getBytes())};
}
@@ -50,10 +52,11 @@ public Message[] processMessage(String key, Datum datum) {
private static class TestMapTFn extends MapTHandler {
@Override
- public MessageT[] processMessage(String key, Datum datum) {
+ public MessageT[] processMessage(String[] keys, Datum datum) {
+ String[] updatedKeys = Arrays.stream(keys).map(c -> c+PROCESSED_KEY_SUFFIX).toArray(String[]::new);
return new MessageT[]{MessageT.to(
TEST_EVENT_TIME,
- key + PROCESSED_KEY_SUFFIX,
+ updatedKeys,
(new String(datum.getValue())
+ PROCESSED_VALUE_SUFFIX).getBytes())};
}
@@ -92,23 +95,20 @@ public void mapper() {
ByteString inValue = ByteString.copyFromUtf8("invalue");
Udfunction.Datum inDatum = Udfunction.Datum
.newBuilder()
- .setKey("not-my-key")
+ .addAllKeys(List.of("test-map-key"))
.setValue(inValue)
.build();
- String expectedKey = "inkey" + PROCESSED_KEY_SUFFIX;
+ String[] expectedKey = new String[]{"test-map-key" + PROCESSED_KEY_SUFFIX};
ByteString expectedValue = ByteString.copyFromUtf8("invalue" + PROCESSED_VALUE_SUFFIX);
- Metadata metadata = new Metadata();
- metadata.put(Metadata.Key.of(DATUM_KEY, Metadata.ASCII_STRING_MARSHALLER), "inkey");
var stub = UserDefinedFunctionGrpc.newBlockingStub(inProcessChannel);
var actualDatumList = stub
- .withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata))
.mapFn(inDatum);
assertEquals(1, actualDatumList.getElementsCount());
- assertEquals(expectedKey, actualDatumList.getElements(0).getKey());
+ assertEquals(expectedKey, actualDatumList.getElements(0).getKeysList().toArray(new String[0]));
assertEquals(expectedValue, actualDatumList.getElements(0).getValue());
}
@@ -117,19 +117,15 @@ public void mapperT() {
ByteString inValue = ByteString.copyFromUtf8("invalue");
Udfunction.Datum inDatum = Udfunction.Datum
.newBuilder()
- .setKey("not-my-key")
+ .addKeys("test-map-key")
.setValue(inValue)
.build();
- String expectedKey = "inkey" + PROCESSED_KEY_SUFFIX;
+ String[] expectedKey = new String[]{"test-map-key" + PROCESSED_KEY_SUFFIX};
ByteString expectedValue = ByteString.copyFromUtf8("invalue" + PROCESSED_VALUE_SUFFIX);
- Metadata metadata = new Metadata();
- metadata.put(Metadata.Key.of(DATUM_KEY, Metadata.ASCII_STRING_MARSHALLER), "inkey");
-
var stub = UserDefinedFunctionGrpc.newBlockingStub(inProcessChannel);
var actualDatumList = stub
- .withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata))
.mapTFn(inDatum);
assertEquals(1, actualDatumList.getElementsCount());
@@ -139,17 +135,15 @@ public void mapperT() {
.setSeconds(TEST_EVENT_TIME.getEpochSecond())
.setNanos(TEST_EVENT_TIME.getNano())).build(),
actualDatumList.getElements(0).getEventTime());
- assertEquals(expectedKey, actualDatumList.getElements(0).getKey());
+ assertEquals(expectedKey, actualDatumList.getElements(0).getKeysList().toArray(new String[0]));
assertEquals(expectedValue, actualDatumList.getElements(0).getValue());
}
@Test
public void reducerWithOneKey() {
String reduceKey = "reduce-key";
- Udfunction.Datum.Builder inDatumBuilder = Udfunction.Datum.newBuilder().setKey(reduceKey);
Metadata metadata = new Metadata();
- metadata.put(Metadata.Key.of(DATUM_KEY, Metadata.ASCII_STRING_MARSHALLER), reduceKey);
metadata.put(Metadata.Key.of(WIN_START_KEY, Metadata.ASCII_STRING_MARSHALLER), "60000");
metadata.put(Metadata.Key.of(WIN_END_KEY, Metadata.ASCII_STRING_MARSHALLER), "120000");
@@ -162,21 +156,22 @@ public void reducerWithOneKey() {
.reduceFn(outputStreamObserver);
for (int i = 1; i <= 10; i++) {
- Udfunction.Datum inputDatum = inDatumBuilder
+ Udfunction.Datum inputDatum = Udfunction.Datum.newBuilder()
.setValue(ByteString.copyFromUtf8(String.valueOf(i)))
+ .addKeys(reduceKey)
.build();
inputStreamObserver.onNext(inputDatum);
}
inputStreamObserver.onCompleted();
- String expectedKey = reduceKey + REDUCE_PROCESSED_KEY_SUFFIX;
+ String[] expectedKeys = new String[]{reduceKey + REDUCE_PROCESSED_KEY_SUFFIX};
// sum of first 10 numbers 1 to 10 -> 55
ByteString expectedValue = ByteString.copyFromUtf8(String.valueOf(55));
while (!outputStreamObserver.completed.get()) ;
assertEquals(1, outputStreamObserver.resultDatum.get().getElementsCount());
- assertEquals(expectedKey, outputStreamObserver.resultDatum.get().getElements(0).getKey());
+ assertEquals(expectedKeys, outputStreamObserver.resultDatum.get().getElements(0).getKeysList().toArray(new String[0]));
;
assertEquals(
expectedValue,
@@ -188,10 +183,8 @@ public void reducerWithOneKey() {
public void reducerWithMultipleKey() {
String reduceKey = "reduce-key";
int keyCount = 100;
- Udfunction.Datum.Builder inDatumBuilder = Udfunction.Datum.newBuilder().setKey(reduceKey);
Metadata metadata = new Metadata();
- metadata.put(Metadata.Key.of(DATUM_KEY, Metadata.ASCII_STRING_MARSHALLER), reduceKey);
metadata.put(Metadata.Key.of(WIN_START_KEY, Metadata.ASCII_STRING_MARSHALLER), "60000");
metadata.put(Metadata.Key.of(WIN_END_KEY, Metadata.ASCII_STRING_MARSHALLER), "120000");
@@ -206,8 +199,8 @@ public void reducerWithMultipleKey() {
// send messages with 100 different keys
for (int j = 0; j < keyCount; j++) {
for (int i = 1; i <= 10; i++) {
- Udfunction.Datum inputDatum = inDatumBuilder
- .setKey(reduceKey + j)
+ Udfunction.Datum inputDatum = Udfunction.Datum.newBuilder()
+ .addKeys(reduceKey + j)
.setValue(ByteString.copyFromUtf8(String.valueOf(i)))
.build();
inputStreamObserver.onNext(inputDatum);
diff --git a/src/test/java/io/numaproj/numaflow/function/ReduceTestFactory.java b/src/test/java/io/numaproj/numaflow/function/ReduceTestFactory.java
index d9f84db3..866e2d21 100644
--- a/src/test/java/io/numaproj/numaflow/function/ReduceTestFactory.java
+++ b/src/test/java/io/numaproj/numaflow/function/ReduceTestFactory.java
@@ -4,6 +4,8 @@
import io.numaproj.numaflow.function.reduce.ReduceHandler;
import io.numaproj.numaflow.function.reduce.ReducerFactory;
+import java.util.Arrays;
+
public class ReduceTestFactory extends ReducerFactory {
@Override
public ReduceTestFn createReducer() {
@@ -14,14 +16,15 @@ public static class ReduceTestFn extends ReduceHandler {
private int sum = 0;
@Override
- public void addMessage(String key, Datum datum, Metadata md) {
+ public void addMessage(String[] keys, Datum datum, Metadata md) {
sum += Integer.parseInt(new String(datum.getValue()));
}
@Override
- public Message[] getOutput(String key, Metadata md) {
+ public Message[] getOutput(String[] keys, Metadata md) {
+ String[] updatedKeys = Arrays.stream(keys).map(c -> c+"-processed").toArray(String[]::new);
return new Message[]{Message.to(
- key + "-processed",
+ updatedKeys,
String.valueOf(sum).getBytes())};
}
}
diff --git a/src/test/java/io/numaproj/numaflow/function/reduce/ReduceSupervisorActorTest.java b/src/test/java/io/numaproj/numaflow/function/reduce/ReduceSupervisorActorTest.java
index 8410ac3c..54070a13 100644
--- a/src/test/java/io/numaproj/numaflow/function/reduce/ReduceSupervisorActorTest.java
+++ b/src/test/java/io/numaproj/numaflow/function/reduce/ReduceSupervisorActorTest.java
@@ -28,7 +28,7 @@ public void invokeSingleActor() throws RuntimeException {
String reduceKey = "reduce-key";
Udfunction.Datum.Builder inDatumBuilder = Udfunction.Datum.
- newBuilder().setKey(reduceKey);
+ newBuilder().addKeys(reduceKey);
ActorRef shutdownActor = actorSystem
.actorOf(ShutdownActor
@@ -47,7 +47,7 @@ public void invokeSingleActor() throws RuntimeException {
for (int i = 1; i <= 10; i++) {
Udfunction.Datum inputDatum = inDatumBuilder
- .setKey("reduce-test")
+ .addKeys("reduce-test")
.setValue(ByteString.copyFromUtf8(String.valueOf(i)))
.build();
supervisor.tell(inputDatum, ActorRef.noSender());
@@ -67,10 +67,6 @@ public void invokeMultipleActors() throws RuntimeException {
final ActorSystem actorSystem = ActorSystem.create("test-system-2");
CompletableFuture completableFuture = new CompletableFuture();
- String reduceKey = "reduce-key";
- Udfunction.Datum.Builder inDatumBuilder = Udfunction.Datum.
- newBuilder().setKey(reduceKey);
-
ActorRef shutdownActor = actorSystem
.actorOf(ShutdownActor
.props(
@@ -89,8 +85,8 @@ public void invokeMultipleActors() throws RuntimeException {
new ReduceOutputStreamObserver()));
for (int i = 1; i <= 10; i++) {
- Udfunction.Datum inputDatum = inDatumBuilder
- .setKey("reduce-test" + i)
+ Udfunction.Datum inputDatum = Udfunction.Datum.newBuilder()
+ .addKeys("reduce-test" + i)
.setValue(ByteString.copyFromUtf8(String.valueOf(i)))
.build();
supervisor.tell(inputDatum, ActorRef.noSender());
@@ -116,12 +112,12 @@ public static class TestReduceHandler extends ReduceHandler {
int count = 0;
@Override
- public void addMessage(String key, Datum datum, Metadata md) {
+ public void addMessage(String[] keys, Datum datum, Metadata md) {
count += 1;
}
@Override
- public Message[] getOutput(String key, Metadata md) {
+ public Message[] getOutput(String[] keys, Metadata md) {
return new Message[]{Message.toAll(String.valueOf(count).getBytes())};
}
}
diff --git a/src/test/java/io/numaproj/numaflow/function/reduce/ShutDownActorTest.java b/src/test/java/io/numaproj/numaflow/function/reduce/ShutDownActorTest.java
index 1ac93f92..64d5d299 100644
--- a/src/test/java/io/numaproj/numaflow/function/reduce/ShutDownActorTest.java
+++ b/src/test/java/io/numaproj/numaflow/function/reduce/ShutDownActorTest.java
@@ -4,7 +4,6 @@
import akka.actor.ActorSystem;
import akka.actor.AllDeadLetters;
import akka.actor.DeadLetter;
-import akka.actor.DeadLetter$;
import com.google.protobuf.ByteString;
import io.numaproj.numaflow.function.Datum;
import io.numaproj.numaflow.function.Message;
@@ -31,7 +30,7 @@ public void testFailure() {
String reduceKey = "reduce-key";
Udfunction.Datum.Builder inDatumBuilder = Udfunction.Datum.
- newBuilder().setKey(reduceKey);
+ newBuilder().addKeys(reduceKey);
ActorRef shutdownActor = actorSystem
.actorOf(ShutdownActor
@@ -51,7 +50,7 @@ public void testFailure() {
new ReduceOutputStreamObserver()));
Udfunction.Datum inputDatum = inDatumBuilder
- .setKey("reduce-test")
+ .addKeys("reduce-test")
.setValue(ByteString.copyFromUtf8(String.valueOf(1)))
.build();
supervisor.tell(inputDatum, ActorRef.noSender());
@@ -112,13 +111,13 @@ public static class TestException extends ReduceHandler {
int count = 0;
@Override
- public void addMessage(String key, Datum datum, Metadata md) {
+ public void addMessage(String[] keys, Datum datum, Metadata md) {
count += 1;
throw new RuntimeException("UDF Failure");
}
@Override
- public Message[] getOutput(String key, Metadata md) {
+ public Message[] getOutput(String[] keys, Metadata md) {
return new Message[]{Message.toAll(String.valueOf(count).getBytes())};
}
}
diff --git a/src/test/java/io/numaproj/numaflow/sink/SinkServerTest.java b/src/test/java/io/numaproj/numaflow/sink/SinkServerTest.java
index d3ed8529..578b03e6 100644
--- a/src/test/java/io/numaproj/numaflow/sink/SinkServerTest.java
+++ b/src/test/java/io/numaproj/numaflow/sink/SinkServerTest.java
@@ -80,7 +80,7 @@ public void sinker() {
.newStub(inProcessChannel)
.sinkFn(outputStreamObserver);
- Udsink.Datum.Builder inDatumBuilder = Udsink.Datum.newBuilder().setKey("sink");
+ Udsink.Datum.Builder inDatumBuilder = Udsink.Datum.newBuilder().addKeys("sink");
String actualId = "sink_test_id";
String expectedId = actualId + processedIdSuffix;