Skip to content

Commit

Permalink
feat: introduce handshake to client and gRPC server
Browse files Browse the repository at this point in the history
Java implementation of numaproj/numaflow-go#42

Signed-off-by: Keran Yang <[email protected]>
  • Loading branch information
KeranYang committed Apr 13, 2023
1 parent 53ea86c commit 2a30ed4
Show file tree
Hide file tree
Showing 19 changed files with 325 additions and 48 deletions.
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,13 @@
<scope>test</scope>
</dependency>

<!-- jackson-databind for converting POJOs to JSON and back -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.14.2</version>
</dependency>

</dependencies>

<dependencyManagement>
Expand Down
14 changes: 7 additions & 7 deletions src/main/java/io/numaproj/numaflow/common/GRPCServerConfig.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.numaproj.numaflow.common;

import io.numaproj.numaflow.function.Function;
import io.numaproj.numaflow.function.FunctionConstants;
import io.numaproj.numaflow.info.ServerInfoConstants;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
Expand All @@ -11,12 +12,11 @@
public class GRPCServerConfig {
private String socketPath;
private int maxMessageSize;
private String infoFilePath;

public GRPCServerConfig(int maxMessageSize) {
this(Function.SOCKET_PATH, maxMessageSize);
}

public GRPCServerConfig(String socketPath) {
this(socketPath, Function.DEFAULT_MESSAGE_SIZE);
public GRPCServerConfig() {
this.socketPath = FunctionConstants.DEFAULT_SOCKET_PATH;
this.maxMessageSize = FunctionConstants.DEFAULT_MESSAGE_SIZE;
this.infoFilePath = ServerInfoConstants.DEFAULT_SERVER_INFO_FILE_PATH;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import io.grpc.Context;
import io.grpc.Metadata;

public class Function {
public static final String SOCKET_PATH = "/var/run/numaflow/function.sock";
public class FunctionConstants {
public static final String DEFAULT_SOCKET_PATH = "/var/run/numaflow/function.sock";

public static final int DEFAULT_MESSAGE_SIZE = 1024 * 1024 * 4;

Expand All @@ -19,18 +19,17 @@ public class Function {
public static final String DELIMITTER = ":";

public static final Metadata.Key<String> DATUM_METADATA_WIN_START = Metadata.Key.of(
Function.WIN_START_KEY,
FunctionConstants.WIN_START_KEY,
Metadata.ASCII_STRING_MARSHALLER);

public static final Metadata.Key<String> DATUM_METADATA_WIN_END = Metadata.Key.of(
Function.WIN_END_KEY,
FunctionConstants.WIN_END_KEY,
Metadata.ASCII_STRING_MARSHALLER);

public static final Context.Key<String> WINDOW_START_TIME = Context.keyWithDefault(
Function.WIN_START_KEY,
FunctionConstants.WIN_START_KEY,
"");

public static final Context.Key<String> WINDOW_END_TIME = Context.keyWithDefault(
Function.WIN_END_KEY,
FunctionConstants.WIN_END_KEY,
"");
}
26 changes: 21 additions & 5 deletions src/main/java/io/numaproj/numaflow/function/FunctionServer.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.numaproj.numaflow.function;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.grpc.Context;
import io.grpc.Contexts;
import io.grpc.Metadata;
Expand All @@ -17,12 +18,18 @@
import io.numaproj.numaflow.function.mapt.MapTHandler;
import io.numaproj.numaflow.function.reduce.ReduceHandler;
import io.numaproj.numaflow.function.reduce.ReducerFactory;
import io.numaproj.numaflow.info.Language;
import io.numaproj.numaflow.info.Protocol;
import io.numaproj.numaflow.info.ServerInfo;
import io.numaproj.numaflow.info.ServerInfoAccessor;
import io.numaproj.numaflow.info.ServerInfoAccessorImpl;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;

@Slf4j
Expand All @@ -31,10 +38,11 @@ public class FunctionServer {
private final GRPCServerConfig grpcServerConfig;
private final ServerBuilder<?> serverBuilder;
private final FunctionService functionService = new FunctionService();
private final ServerInfoAccessor serverInfoAccessor = new ServerInfoAccessorImpl(new ObjectMapper());
private Server server;

public FunctionServer() {
this(new GRPCServerConfig(Function.SOCKET_PATH, Function.DEFAULT_MESSAGE_SIZE));
this(new GRPCServerConfig());
}

/**
Expand Down Expand Up @@ -89,6 +97,14 @@ public void start() throws IOException {
}
}

// write server info to file
ServerInfo serverInfo = new ServerInfo(
Protocol.UDS_PROTOCOL,
Language.JAVA,
serverInfoAccessor.getSDKVersion(),
new HashMap<>());
serverInfoAccessor.write(serverInfo, grpcServerConfig.getInfoFilePath());

// build server
ServerInterceptor interceptor = new ServerInterceptor() {
@Override
Expand All @@ -99,10 +115,10 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(

final var context =
Context.current().withValues(
Function.WINDOW_START_TIME,
headers.get(Function.DATUM_METADATA_WIN_START),
Function.WINDOW_END_TIME,
headers.get(Function.DATUM_METADATA_WIN_END));
FunctionConstants.WINDOW_START_TIME,
headers.get(FunctionConstants.DATUM_METADATA_WIN_START),
FunctionConstants.WINDOW_END_TIME,
headers.get(FunctionConstants.DATUM_METADATA_WIN_END));
return Contexts.interceptCall(context, call, headers, next);
}
};
Expand Down
14 changes: 7 additions & 7 deletions src/main/java/io/numaproj/numaflow/function/FunctionService.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;

import static io.numaproj.numaflow.function.Function.EOF;
import static io.numaproj.numaflow.function.FunctionConstants.EOF;
import static io.numaproj.numaflow.function.v1.UserDefinedFunctionGrpc.getMapFnMethod;
import static io.numaproj.numaflow.function.v1.UserDefinedFunctionGrpc.getReduceFnMethod;

Expand Down Expand Up @@ -144,8 +144,8 @@ public StreamObserver<Udfunction.DatumRequest> reduceFn(final StreamObserver<Udf
}

// get window start and end time from gPRC metadata
String winSt = Function.WINDOW_START_TIME.get();
String winEt = Function.WINDOW_END_TIME.get();
String winSt = FunctionConstants.WINDOW_START_TIME.get();
String winEt = FunctionConstants.WINDOW_END_TIME.get();

// convert the start and end time to Instant
Instant startTime = Instant.ofEpochMilli(Long.parseLong(winSt));
Expand Down Expand Up @@ -218,9 +218,9 @@ private Udfunction.DatumResponseList buildDatumListResponse(MessageList messageL
datumListBuilder.addElements(Udfunction.DatumResponse.newBuilder()
.setValue(ByteString.copyFrom(message.getValue()))
.addAllKeys(message.getKeys()
== null ? new ArrayList<>() : List.of(message.getKeys()))
== null ? new ArrayList<>():List.of(message.getKeys()))
.addAllTags(message.getTags()
== null ? new ArrayList<>() : List.of(message.getTags()))
== null ? new ArrayList<>():List.of(message.getTags()))
.build());
});
return datumListBuilder.build();
Expand All @@ -236,9 +236,9 @@ private Udfunction.DatumResponseList buildDatumListResponse(MessageTList message
.setNanos(messageT.getEventTime().getNano()))
)
.addAllKeys(messageT.getKeys()
== null ? new ArrayList<>() : List.of(messageT.getKeys()))
== null ? new ArrayList<>():List.of(messageT.getKeys()))
.addAllTags(messageT.getTags()
== null ? new ArrayList<>() : List.of(messageT.getTags()))
== null ? new ArrayList<>():List.of(messageT.getTags()))
.setValue(ByteString.copyFrom(messageT.getValue()))
.build());
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import akka.japi.pf.ReceiveBuilder;
import com.google.common.base.Preconditions;
import io.grpc.stub.StreamObserver;
import io.numaproj.numaflow.function.Function;
import io.numaproj.numaflow.function.FunctionConstants;
import io.numaproj.numaflow.function.FunctionService;
import io.numaproj.numaflow.function.HandlerDatum;
import io.numaproj.numaflow.function.HandlerDatumMetadata;
Expand Down Expand Up @@ -77,7 +77,7 @@ public SupervisorStrategy supervisorStrategy() {
@Override
public void postStop() {
log.debug("post stop of supervisor executed - {}", getSelf().toString());
shutdownActor.tell(Function.SUCCESS, ActorRef.noSender());
shutdownActor.tell(FunctionConstants.SUCCESS, ActorRef.noSender());
}

@Override
Expand All @@ -97,7 +97,7 @@ public Receive createReceive() {
*/
private void invokeActors(Udfunction.DatumRequest datumRequest) {
String[] keys = datumRequest.getKeysList().toArray(new String[0]);
String keyStr = String.join(Function.DELIMITTER, keys);
String keyStr = String.join(FunctionConstants.DELIMITTER, keys);
if (!actorsMap.containsKey(keyStr)) {
ReduceHandler reduceHandler = reducerFactory.createReducer();
ActorRef actorRef = getContext()
Expand Down Expand Up @@ -125,7 +125,7 @@ private void responseListener(ActorResponse actorResponse) {
*/

responseObserver.onNext(actorResponse.getDatumList());
actorsMap.remove(String.join(Function.DELIMITTER, actorResponse.getKeys()));
actorsMap.remove(String.join(FunctionConstants.DELIMITTER, actorResponse.getKeys()));
if (actorsMap.isEmpty()) {
responseObserver.onCompleted();
getContext().getSystem().stop(getSelf());
Expand Down
24 changes: 24 additions & 0 deletions src/main/java/io/numaproj/numaflow/info/Language.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package io.numaproj.numaflow.info;

import com.fasterxml.jackson.annotation.JsonValue;

/**
* Please exercise cautions when updating the values below because the exact same values are defined in other Numaflow SDKs
* to form a contract between server and clients.
*/
public enum Language {
GO("go"),
PYTHON("python"),
JAVA("java");

private final String name;

Language(String name) {
this.name = name;
}

@JsonValue
public String getName() {
return name;
}
}
23 changes: 23 additions & 0 deletions src/main/java/io/numaproj/numaflow/info/Protocol.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.numaproj.numaflow.info;

import com.fasterxml.jackson.annotation.JsonValue;

/**
* Please exercise cautions when updating the values below because the exact same values are defined in other Numaflow SDKs
* to form a contract between server and clients.
*/
public enum Protocol {
UDS_PROTOCOL("uds"),
TCP_PROTOCOL("tcp");

private final String name;

Protocol(String name) {
this.name = name;
}

@JsonValue
public String getName() {
return name;
}
}
31 changes: 31 additions & 0 deletions src/main/java/io/numaproj/numaflow/info/ServerInfo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package io.numaproj.numaflow.info;

import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

import java.util.Map;

/**
* Server Information to be used by client to determine:
* - protocol: what is right protocol to use (UDS or TCP)
* - language: what is language used by the server
* - version: what is the numaflow sdk version used by the server
* - metadata: other information
*/
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class ServerInfo {
@JsonProperty("protocol")
private Protocol protocol;
@JsonProperty("language")
private Language language;
@JsonProperty("version")
private String version;
@JsonProperty("metadata")
private Map<String, String> metadata;
}
34 changes: 34 additions & 0 deletions src/main/java/io/numaproj/numaflow/info/ServerInfoAccessor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package io.numaproj.numaflow.info;

import java.io.IOException;

public interface ServerInfoAccessor {
/**
* Get runtime Java SDK version.
*/
String getSDKVersion();

/**
* Delete filePath if it exists.
* Write serverInfo to filePath in Json format.
* Append {@link ServerInfoConstants#EOF} as a new line to indicate end of file.
*
* @param serverInfo server information POJO
* @param filePath file path to write to
*
* @throws IOException any IO exceptions are thrown to the caller.
*/
void write(ServerInfo serverInfo, String filePath) throws IOException;

/**
* Read from filePath to retrieve server information POJO.
* This API is only used for unit tests.
*
* @param filePath file path to read from
*
* @return server information POJO
*
* @throws IOException any IO exceptions are thrown to the caller.
*/
ServerInfo read(String filePath) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package io.numaproj.numaflow.info;

import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.AllArgsConstructor;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;

@AllArgsConstructor
public class ServerInfoAccessorImpl implements ServerInfoAccessor {
private ObjectMapper objectMapper;

@Override
public String getSDKVersion() {
// This only works for Java 9 and above.
// Since we already use 11+ for numaflow SDK, it's safe to apply this approach.
return String.valueOf(Runtime.version().version().get(0));
}

@Override
public void write(ServerInfo serverInfo, String filePath) throws IOException {
File file = new File(filePath);
if (file.exists()) {
file.delete();
}
FileWriter fileWriter = new FileWriter(filePath, false);
objectMapper.writeValue(fileWriter, serverInfo);
FileWriter eofWriter = new FileWriter(filePath, true);
eofWriter.append("\n").append(ServerInfoConstants.EOF);
eofWriter.close();
fileWriter.close();
}

@Override
public ServerInfo read(String filePath) throws IOException {
File file = new File(filePath);
BufferedReader bufferedReader = new BufferedReader(new FileReader(file));
StringBuilder stringBuilder = new StringBuilder();
String line;
while ((line = bufferedReader.readLine()) != null
&& !line.equals(ServerInfoConstants.EOF)) {
stringBuilder.append(line);
}
ServerInfo serverInfo = objectMapper.readValue(stringBuilder.toString(), ServerInfo.class);
bufferedReader.close();
return serverInfo;
}
}
Loading

0 comments on commit 2a30ed4

Please sign in to comment.