Skip to content

Commit

Permalink
chore: fix npe (#37)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 authored Apr 14, 2023
1 parent 53ea86c commit 560c537
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 8 deletions.
21 changes: 15 additions & 6 deletions src/main/java/io/numaproj/numaflow/function/FunctionService.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import akka.actor.AllDeadLetters;
import com.google.protobuf.ByteString;
import com.google.protobuf.Empty;
import com.google.protobuf.Timestamp;
import io.grpc.stub.StreamObserver;
import io.numaproj.numaflow.function.map.MapHandler;
import io.numaproj.numaflow.function.mapt.MapTHandler;
Expand Down Expand Up @@ -216,7 +217,8 @@ private Udfunction.DatumResponseList buildDatumListResponse(MessageList messageL
Udfunction.DatumResponseList.Builder datumListBuilder = Udfunction.DatumResponseList.newBuilder();
messageList.getMessages().forEach(message -> {
datumListBuilder.addElements(Udfunction.DatumResponse.newBuilder()
.setValue(ByteString.copyFrom(message.getValue()))
.setValue(message.getValue() == null ? ByteString.EMPTY : ByteString.copyFrom(
message.getValue()))
.addAllKeys(message.getKeys()
== null ? new ArrayList<>() : List.of(message.getKeys()))
.addAllTags(message.getTags()
Expand All @@ -230,16 +232,23 @@ private Udfunction.DatumResponseList buildDatumListResponse(MessageTList message
Udfunction.DatumResponseList.Builder datumListBuilder = Udfunction.DatumResponseList.newBuilder();
messageTList.getMessages().forEach(messageT -> {
datumListBuilder.addElements(Udfunction.DatumResponse.newBuilder()
.setEventTime(EventTime.newBuilder().setEventTime
(com.google.protobuf.Timestamp.newBuilder()
.setSeconds(messageT.getEventTime().getEpochSecond())
.setNanos(messageT.getEventTime().getNano()))
.setEventTime(
messageT.getEventTime() == null ? EventTime.newBuilder().setEventTime(
Timestamp.getDefaultInstance()) : EventTime
.newBuilder()
.setEventTime
(Timestamp.newBuilder()
.setSeconds(messageT
.getEventTime()
.getEpochSecond())
.setNanos(messageT.getEventTime().getNano()))
)
.addAllKeys(messageT.getKeys()
== null ? new ArrayList<>() : List.of(messageT.getKeys()))
.addAllTags(messageT.getTags()
== null ? new ArrayList<>() : List.of(messageT.getTags()))
.setValue(ByteString.copyFrom(messageT.getValue()))
.setValue(messageT.getValue() == null ? ByteString.EMPTY : ByteString.copyFrom(
messageT.getValue()))
.build());
});
return datumListBuilder.build();
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/numaproj/numaflow/function/Message.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,6 @@ public Message(byte[] value, String[] keys) {

// creates a Message to be dropped
public static Message toDrop() {
return new Message(null, null, new String[]{DROP});
return new Message(new byte[0], null, new String[]{DROP});
}
}
2 changes: 1 addition & 1 deletion src/main/java/io/numaproj/numaflow/function/MessageT.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,6 @@ public MessageT(byte[] value, Instant eventTime, String[] keys) {

// creates a MessageT to be dropped
public static MessageT toDrop() {
return new MessageT(null, null, null, new String[]{DROP});
return new MessageT(new byte[0], Instant.MIN, null, new String[]{DROP});
}
}

0 comments on commit 560c537

Please sign in to comment.