From 560c5377b2fd02c6e00347420c14b766fab3db37 Mon Sep 17 00:00:00 2001 From: Yashash H L Date: Fri, 14 Apr 2023 08:47:39 +0530 Subject: [PATCH] chore: fix npe (#37) Signed-off-by: Yashash H L --- .../numaflow/function/FunctionService.java | 21 +++++++++++++------ .../numaproj/numaflow/function/Message.java | 2 +- .../numaproj/numaflow/function/MessageT.java | 2 +- 3 files changed, 17 insertions(+), 8 deletions(-) diff --git a/src/main/java/io/numaproj/numaflow/function/FunctionService.java b/src/main/java/io/numaproj/numaflow/function/FunctionService.java index e4d76439..27ee88a1 100644 --- a/src/main/java/io/numaproj/numaflow/function/FunctionService.java +++ b/src/main/java/io/numaproj/numaflow/function/FunctionService.java @@ -5,6 +5,7 @@ import akka.actor.AllDeadLetters; import com.google.protobuf.ByteString; import com.google.protobuf.Empty; +import com.google.protobuf.Timestamp; import io.grpc.stub.StreamObserver; import io.numaproj.numaflow.function.map.MapHandler; import io.numaproj.numaflow.function.mapt.MapTHandler; @@ -216,7 +217,8 @@ private Udfunction.DatumResponseList buildDatumListResponse(MessageList messageL Udfunction.DatumResponseList.Builder datumListBuilder = Udfunction.DatumResponseList.newBuilder(); messageList.getMessages().forEach(message -> { datumListBuilder.addElements(Udfunction.DatumResponse.newBuilder() - .setValue(ByteString.copyFrom(message.getValue())) + .setValue(message.getValue() == null ? ByteString.EMPTY : ByteString.copyFrom( + message.getValue())) .addAllKeys(message.getKeys() == null ? new ArrayList<>() : List.of(message.getKeys())) .addAllTags(message.getTags() @@ -230,16 +232,23 @@ private Udfunction.DatumResponseList buildDatumListResponse(MessageTList message Udfunction.DatumResponseList.Builder datumListBuilder = Udfunction.DatumResponseList.newBuilder(); messageTList.getMessages().forEach(messageT -> { datumListBuilder.addElements(Udfunction.DatumResponse.newBuilder() - .setEventTime(EventTime.newBuilder().setEventTime - (com.google.protobuf.Timestamp.newBuilder() - .setSeconds(messageT.getEventTime().getEpochSecond()) - .setNanos(messageT.getEventTime().getNano())) + .setEventTime( + messageT.getEventTime() == null ? EventTime.newBuilder().setEventTime( + Timestamp.getDefaultInstance()) : EventTime + .newBuilder() + .setEventTime + (Timestamp.newBuilder() + .setSeconds(messageT + .getEventTime() + .getEpochSecond()) + .setNanos(messageT.getEventTime().getNano())) ) .addAllKeys(messageT.getKeys() == null ? new ArrayList<>() : List.of(messageT.getKeys())) .addAllTags(messageT.getTags() == null ? new ArrayList<>() : List.of(messageT.getTags())) - .setValue(ByteString.copyFrom(messageT.getValue())) + .setValue(messageT.getValue() == null ? ByteString.EMPTY : ByteString.copyFrom( + messageT.getValue())) .build()); }); return datumListBuilder.build(); diff --git a/src/main/java/io/numaproj/numaflow/function/Message.java b/src/main/java/io/numaproj/numaflow/function/Message.java index 4e00402b..4c4eec4b 100644 --- a/src/main/java/io/numaproj/numaflow/function/Message.java +++ b/src/main/java/io/numaproj/numaflow/function/Message.java @@ -33,6 +33,6 @@ public Message(byte[] value, String[] keys) { // creates a Message to be dropped public static Message toDrop() { - return new Message(null, null, new String[]{DROP}); + return new Message(new byte[0], null, new String[]{DROP}); } } diff --git a/src/main/java/io/numaproj/numaflow/function/MessageT.java b/src/main/java/io/numaproj/numaflow/function/MessageT.java index cef41828..d9e34d31 100644 --- a/src/main/java/io/numaproj/numaflow/function/MessageT.java +++ b/src/main/java/io/numaproj/numaflow/function/MessageT.java @@ -37,6 +37,6 @@ public MessageT(byte[] value, Instant eventTime, String[] keys) { // creates a MessageT to be dropped public static MessageT toDrop() { - return new MessageT(null, null, null, new String[]{DROP}); + return new MessageT(new byte[0], Instant.MIN, null, new String[]{DROP}); } }