Skip to content

Commit

Permalink
feat: introducing tags for conditional forwarding (#32)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 authored Apr 12, 2023
1 parent 539af1e commit b1466e6
Show file tree
Hide file tree
Showing 29 changed files with 410 additions and 234 deletions.
11 changes: 7 additions & 4 deletions examples/README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
## Build

The sdk artifacts are published as GitHub packages. Check the links below on how to use GitHub packages as dependencies in a Java Project.
The sdk artifacts are published as GitHub packages. Check the links below on how to use GitHub
packages as dependencies in a Java Project.

- [Reference](https://docs.github.com/en/packages/working-with-a-github-packages-registry/working-with-the-apache-maven-registry)
- [Reference](https://github.com/orgs/community/discussions/26634#discussioncomment-3252638)

### Maven users

Add this dependency to your project's POM:
Expand All @@ -27,8 +30,8 @@ compile "io.numaproj.numaflow:numaflow-java:${latest}"
### Examples on how to write UDFs and UDSinks in Java

* **User Defined Function(UDF)**
* [Map](src/main/java/io/numaproj/numaflow/examples/function/map)
* [Reduce](src/main/java/io/numaproj/numaflow/examples/function/reduce)
* [Map](src/main/java/io/numaproj/numaflow/examples/function/map)
* [Reduce](src/main/java/io/numaproj/numaflow/examples/function/reduce)

* **User Defined Sink(UDSink)**
* [Sink](src/main/java/io/numaproj/numaflow/examples/sink/simple)
* [Sink](src/main/java/io/numaproj/numaflow/examples/sink/simple)
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.numaproj.numaflow.function.Datum;
import io.numaproj.numaflow.function.FunctionServer;
import io.numaproj.numaflow.function.Message;
import io.numaproj.numaflow.function.MessageList;
import io.numaproj.numaflow.function.map.MapHandler;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -18,21 +19,27 @@
@Slf4j
public class EvenOddFunction extends MapHandler {

public Message[] processMessage(String[] keys, Datum data) {
public static void main(String[] args) throws IOException {
new FunctionServer().registerMapHandler(new EvenOddFunction()).start();
}

public MessageList processMessage(String[] keys, Datum data) {
int value = 0;
try {
value = Integer.parseInt(new String(data.getValue()));
} catch (NumberFormatException e) {
log.error("Error occurred while parsing int");
return new Message[]{Message.toDrop()};
}
if (value % 2 == 0) {
return new Message[]{Message.to(new String[]{"even"}, data.getValue())};
return MessageList.newBuilder().addMessage(Message.toDrop()).build();
}
return new Message[]{Message.to(new String[]{"odd"}, data.getValue())};
}

public static void main(String[] args) throws IOException {
new FunctionServer().registerMapHandler(new EvenOddFunction()).start();
String[] outputKeys = value % 2 == 0 ? new String[]{"even"} : new String[]{"odd"};

// tags will be used for conditional forwarding
String[] tags = value % 2 == 0 ? new String[]{"even-tag"} : new String[]{"odd-tag"};

return MessageList
.newBuilder()
.addMessage(new Message(data.getValue(), outputKeys, tags))
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.numaproj.numaflow.function.Datum;
import io.numaproj.numaflow.function.FunctionServer;
import io.numaproj.numaflow.function.MessageT;
import io.numaproj.numaflow.function.MessageTList;
import io.numaproj.numaflow.function.mapt.MapTHandler;

import java.io.IOException;
Expand All @@ -22,29 +23,34 @@ public class EventTimeFilterFunction extends MapTHandler {
private static final Instant januaryFirst2022 = Instant.ofEpochMilli(1640995200000L);
private static final Instant januaryFirst2023 = Instant.ofEpochMilli(1672531200000L);

public MessageT[] processMessage(String[] keys, Datum data) {
public static void main(String[] args) throws IOException {
new FunctionServer()
.registerMapTHandler(new EventTimeFilterFunction())
.start();
}

public MessageTList processMessage(String[] keys, Datum data) {
Instant eventTime = data.getEventTime();

if (eventTime.isBefore(januaryFirst2022)) {
return new MessageT[]{MessageT.toDrop()};
return MessageTList.newBuilder().addMessage(MessageT.toDrop()).build();
} else if (eventTime.isBefore(januaryFirst2023)) {
return new MessageT[]{
MessageT.to(
januaryFirst2022,
new String[]{"within_year_2022"},
data.getValue())};
return MessageTList
.newBuilder()
.addMessage(
new MessageT(
data.getValue(),
januaryFirst2022,
new String[]{"within_year_2022"}))
.build();
} else {
return new MessageT[]{
MessageT.to(
return MessageTList
.newBuilder()
.addMessage(new MessageT(
data.getValue(),
januaryFirst2023,
new String[]{"after_year_2022"},
data.getValue())};
new String[]{"after_year_2022"}))
.build();
}
}

public static void main(String[] args) throws IOException {
new FunctionServer()
.registerMapTHandler(new EventTimeFilterFunction())
.start();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.numaproj.numaflow.function.Datum;
import io.numaproj.numaflow.function.FunctionServer;
import io.numaproj.numaflow.function.Message;
import io.numaproj.numaflow.function.MessageList;
import io.numaproj.numaflow.function.map.MapHandler;

import java.io.IOException;
Expand All @@ -16,18 +17,19 @@

public class FlatMapFunction extends MapHandler {

public Message[] processMessage(String[] keys, Datum data) {
public static void main(String[] args) throws IOException {
new FunctionServer().registerMapHandler(new FlatMapFunction()).start();
}

public MessageList processMessage(String[] keys, Datum data) {
String msg = new String(data.getValue());
String[] strs = msg.split(",");
Message[] results = new Message[strs.length];
MessageList.MessageListBuilder listBuilder = MessageList.newBuilder();

for (int i = 0; i < strs.length; i++) {
results[i] = Message.toAll(strs[i].getBytes());
for (String str : strs) {
listBuilder.addMessage(new Message(str.getBytes()));
}
return results;
}

public static void main(String[] args) throws IOException {
new FunctionServer().registerMapHandler(new FlatMapFunction()).start();
return listBuilder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.numaproj.numaflow.function.Datum;
import io.numaproj.numaflow.function.FunctionServer;
import io.numaproj.numaflow.function.Message;
import io.numaproj.numaflow.function.MessageList;
import io.numaproj.numaflow.function.map.MapHandler;

import java.io.IOException;
Expand All @@ -12,11 +13,15 @@
*/

public class ForwardFunction extends MapHandler {
public Message[] processMessage(String[] keys, Datum data) {
return new Message[]{Message.toAll(data.getValue())};
}

public static void main(String[] args) throws IOException {
new FunctionServer().registerMapHandler(new ForwardFunction()).start();
}

public MessageList processMessage(String[] keys, Datum data) {
return MessageList
.newBuilder()
.addMessage(new Message(data.getValue()))
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.numaproj.numaflow.function.Datum;
import io.numaproj.numaflow.function.FunctionServer;
import io.numaproj.numaflow.function.Message;
import io.numaproj.numaflow.function.MessageList;
import io.numaproj.numaflow.function.metadata.Metadata;
import io.numaproj.numaflow.function.reduce.ReduceHandler;
import io.numaproj.numaflow.function.reduce.ReducerFactory;
Expand All @@ -11,13 +12,18 @@

import java.io.IOException;
import java.util.Arrays;
import java.util.Objects;

@Slf4j
@AllArgsConstructor
public class EvenOddCounterFactory extends ReducerFactory<EvenOddCounterFactory.EvenOddCounter> {
private Config config;

public static void main(String[] args) throws IOException {
log.info("counter udf was invoked");
Config config = new Config(1, 2);
new FunctionServer().registerReducerFactory(new EvenOddCounterFactory(config)).start();
}

@Override
public EvenOddCounter createReducer() {
return new EvenOddCounter(config);
Expand Down Expand Up @@ -49,25 +55,24 @@ public void addMessage(String[] keys, Datum datum, Metadata md) {
}

@Override
public Message[] getOutput(String[] keys, Metadata md) {
public MessageList getOutput(String[] keys, Metadata md) {
log.info(
"even and odd count - {} {}, window - {} {}",
evenCount,
oddCount,
md.getIntervalWindow().getStartTime().toString(),
md.getIntervalWindow().getEndTime().toString());

byte[] val;
if (Arrays.equals(keys, new String[]{"even"})) {
return new Message[]{Message.to(keys, String.valueOf(evenCount).getBytes())};
val = String.valueOf(evenCount).getBytes();
} else {
return new Message[]{Message.to(keys, String.valueOf(oddCount).getBytes())};
val = String.valueOf(oddCount).getBytes();
}
return MessageList
.newBuilder()
.addMessage(new Message(val))
.build();
}
}

public static void main(String[] args) throws IOException {
log.info("counter udf was invoked");
Config config = new Config(1, 2);
new FunctionServer().registerReducerFactory(new EvenOddCounterFactory(config)).start();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.numaproj.numaflow.function.Datum;
import io.numaproj.numaflow.function.Message;
import io.numaproj.numaflow.function.MessageList;
import io.numaproj.numaflow.function.metadata.Metadata;
import io.numaproj.numaflow.function.reduce.ReduceHandler;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -22,7 +23,10 @@ public void addMessage(String[] keys, Datum datum, Metadata md) {
}

@Override
public Message[] getOutput(String[] keys, Metadata md) {
return new Message[]{Message.toAll(String.valueOf(sum).getBytes())};
public MessageList getOutput(String[] keys, Metadata md) {
return MessageList
.newBuilder()
.addMessage(new Message(String.valueOf(sum).getBytes()))
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/**
Expand All @@ -19,6 +18,11 @@
@Slf4j
public class SimpleSink extends SinkHandler {

public static void main(String[] args) throws IOException {
new SinkServer().registerSinker(new SimpleSink()).start();
}

@Override
public List<Response> processMessage(SinkDatumStream datumStream) {
ArrayList<Response> responses = new ArrayList<>();

Expand All @@ -28,13 +32,9 @@ public List<Response> processMessage(SinkDatumStream datumStream) {
if (datum == SinkDatumStream.EOF) {
break;
}
log.info(Arrays.toString(datum.getValue()));
log.info(new String(datum.getValue()));
responses.add(new Response(datum.getId(), true, ""));
}
return responses;
}

public static void main(String[] args) throws IOException {
new SinkServer().registerSinker(new SimpleSink()).start();
}
}
Loading

0 comments on commit b1466e6

Please sign in to comment.