Skip to content

Commit

Permalink
feat: introduce handshake to client and gRPC server (#36)
Browse files Browse the repository at this point in the history
Java implementation of numaproj/numaflow-go#42
Signed-off-by: Keran Yang <[email protected]>
  • Loading branch information
KeranYang authored Apr 14, 2023
1 parent 90d5c57 commit 33b9bc9
Show file tree
Hide file tree
Showing 19 changed files with 325 additions and 54 deletions.
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,13 @@
<scope>test</scope>
</dependency>

<!-- jackson-databind for converting POJOs to JSON and back -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.14.2</version>
</dependency>

</dependencies>

<dependencyManagement>
Expand Down
14 changes: 7 additions & 7 deletions src/main/java/io/numaproj/numaflow/common/GRPCServerConfig.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.numaproj.numaflow.common;

import io.numaproj.numaflow.function.Function;
import io.numaproj.numaflow.function.FunctionConstants;
import io.numaproj.numaflow.info.ServerInfoConstants;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
Expand All @@ -11,12 +12,11 @@
public class GRPCServerConfig {
private String socketPath;
private int maxMessageSize;
private String infoFilePath;

public GRPCServerConfig(int maxMessageSize) {
this(Function.SOCKET_PATH, maxMessageSize);
}

public GRPCServerConfig(String socketPath) {
this(socketPath, Function.DEFAULT_MESSAGE_SIZE);
public GRPCServerConfig() {
this.socketPath = FunctionConstants.DEFAULT_SOCKET_PATH;
this.maxMessageSize = FunctionConstants.DEFAULT_MESSAGE_SIZE;
this.infoFilePath = ServerInfoConstants.DEFAULT_SERVER_INFO_FILE_PATH;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import io.grpc.Context;
import io.grpc.Metadata;

public class Function {
public static final String SOCKET_PATH = "/var/run/numaflow/function.sock";
public class FunctionConstants {
public static final String DEFAULT_SOCKET_PATH = "/var/run/numaflow/function.sock";

public static final int DEFAULT_MESSAGE_SIZE = 1024 * 1024 * 4;

Expand All @@ -19,18 +19,17 @@ public class Function {
public static final String DELIMITTER = ":";

public static final Metadata.Key<String> DATUM_METADATA_WIN_START = Metadata.Key.of(
Function.WIN_START_KEY,
FunctionConstants.WIN_START_KEY,
Metadata.ASCII_STRING_MARSHALLER);

public static final Metadata.Key<String> DATUM_METADATA_WIN_END = Metadata.Key.of(
Function.WIN_END_KEY,
FunctionConstants.WIN_END_KEY,
Metadata.ASCII_STRING_MARSHALLER);

public static final Context.Key<String> WINDOW_START_TIME = Context.keyWithDefault(
Function.WIN_START_KEY,
FunctionConstants.WIN_START_KEY,
"");

public static final Context.Key<String> WINDOW_END_TIME = Context.keyWithDefault(
Function.WIN_END_KEY,
FunctionConstants.WIN_END_KEY,
"");
}
38 changes: 27 additions & 11 deletions src/main/java/io/numaproj/numaflow/function/FunctionServer.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.numaproj.numaflow.function;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.grpc.Context;
import io.grpc.Contexts;
import io.grpc.Metadata;
Expand All @@ -17,12 +18,17 @@
import io.numaproj.numaflow.function.mapt.MapTHandler;
import io.numaproj.numaflow.function.reduce.ReduceHandler;
import io.numaproj.numaflow.function.reduce.ReducerFactory;
import io.numaproj.numaflow.info.Language;
import io.numaproj.numaflow.info.Protocol;
import io.numaproj.numaflow.info.ServerInfo;
import io.numaproj.numaflow.info.ServerInfoAccessor;
import io.numaproj.numaflow.info.ServerInfoAccessorImpl;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;

@Slf4j
Expand All @@ -31,10 +37,11 @@ public class FunctionServer {
private final GRPCServerConfig grpcServerConfig;
private final ServerBuilder<?> serverBuilder;
private final FunctionService functionService = new FunctionService();
private final ServerInfoAccessor serverInfoAccessor = new ServerInfoAccessorImpl(new ObjectMapper());
private Server server;

public FunctionServer() {
this(new GRPCServerConfig(Function.SOCKET_PATH, Function.DEFAULT_MESSAGE_SIZE));
this(new GRPCServerConfig());
}

/**
Expand Down Expand Up @@ -78,17 +85,26 @@ public FunctionServer registerReducerFactory(ReducerFactory<? extends ReduceHand
/**
* Start serving requests.
*/
public void start() throws IOException {
public void start() throws Exception {
String socketPath = grpcServerConfig.getSocketPath();
String infoFilePath = grpcServerConfig.getInfoFilePath();
// cleanup socket path if it exists (unit test builder doesn't use one)
if (grpcServerConfig.getSocketPath() != null) {
Path path = Paths.get(grpcServerConfig.getSocketPath());
if (socketPath != null) {
Path path = Paths.get(socketPath);
Files.deleteIfExists(path);
if (Files.exists(path)) {
log.error("Failed to clean up socket path \"" + grpcServerConfig.getSocketPath()
+ "\". Exiting");
log.error("Failed to clean up socket path {}. Exiting", socketPath);
}
}

// write server info to file
ServerInfo serverInfo = new ServerInfo(
Protocol.UDS_PROTOCOL,
Language.JAVA,
serverInfoAccessor.getSDKVersion(),
new HashMap<>());
serverInfoAccessor.write(serverInfo, infoFilePath);

// build server
ServerInterceptor interceptor = new ServerInterceptor() {
@Override
Expand All @@ -99,10 +115,10 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(

final var context =
Context.current().withValues(
Function.WINDOW_START_TIME,
headers.get(Function.DATUM_METADATA_WIN_START),
Function.WINDOW_END_TIME,
headers.get(Function.DATUM_METADATA_WIN_END));
FunctionConstants.WINDOW_START_TIME,
headers.get(FunctionConstants.DATUM_METADATA_WIN_START),
FunctionConstants.WINDOW_END_TIME,
headers.get(FunctionConstants.DATUM_METADATA_WIN_END));
return Contexts.interceptCall(context, call, headers, next);
}
};
Expand Down
10 changes: 5 additions & 5 deletions src/main/java/io/numaproj/numaflow/function/FunctionService.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;

import static io.numaproj.numaflow.function.Function.EOF;
import static io.numaproj.numaflow.function.FunctionConstants.EOF;
import static io.numaproj.numaflow.function.v1.UserDefinedFunctionGrpc.getMapFnMethod;
import static io.numaproj.numaflow.function.v1.UserDefinedFunctionGrpc.getReduceFnMethod;

Expand Down Expand Up @@ -145,8 +145,8 @@ public StreamObserver<Udfunction.DatumRequest> reduceFn(final StreamObserver<Udf
}

// get window start and end time from gPRC metadata
String winSt = Function.WINDOW_START_TIME.get();
String winEt = Function.WINDOW_END_TIME.get();
String winSt = FunctionConstants.WINDOW_START_TIME.get();
String winEt = FunctionConstants.WINDOW_END_TIME.get();

// convert the start and end time to Instant
Instant startTime = Instant.ofEpochMilli(Long.parseLong(winSt));
Expand Down Expand Up @@ -220,9 +220,9 @@ private Udfunction.DatumResponseList buildDatumListResponse(MessageList messageL
.setValue(message.getValue() == null ? ByteString.EMPTY : ByteString.copyFrom(
message.getValue()))
.addAllKeys(message.getKeys()
== null ? new ArrayList<>() : List.of(message.getKeys()))
== null ? new ArrayList<>():List.of(message.getKeys()))
.addAllTags(message.getTags()
== null ? new ArrayList<>() : List.of(message.getTags()))
== null ? new ArrayList<>():List.of(message.getTags()))
.build());
});
return datumListBuilder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import akka.japi.pf.ReceiveBuilder;
import com.google.common.base.Preconditions;
import io.grpc.stub.StreamObserver;
import io.numaproj.numaflow.function.Function;
import io.numaproj.numaflow.function.FunctionConstants;
import io.numaproj.numaflow.function.FunctionService;
import io.numaproj.numaflow.function.HandlerDatum;
import io.numaproj.numaflow.function.HandlerDatumMetadata;
Expand Down Expand Up @@ -77,7 +77,7 @@ public SupervisorStrategy supervisorStrategy() {
@Override
public void postStop() {
log.debug("post stop of supervisor executed - {}", getSelf().toString());
shutdownActor.tell(Function.SUCCESS, ActorRef.noSender());
shutdownActor.tell(FunctionConstants.SUCCESS, ActorRef.noSender());
}

@Override
Expand All @@ -97,7 +97,7 @@ public Receive createReceive() {
*/
private void invokeActors(Udfunction.DatumRequest datumRequest) {
String[] keys = datumRequest.getKeysList().toArray(new String[0]);
String keyStr = String.join(Function.DELIMITTER, keys);
String keyStr = String.join(FunctionConstants.DELIMITTER, keys);
if (!actorsMap.containsKey(keyStr)) {
ReduceHandler reduceHandler = reducerFactory.createReducer();
ActorRef actorRef = getContext()
Expand Down Expand Up @@ -125,7 +125,7 @@ private void responseListener(ActorResponse actorResponse) {
*/

responseObserver.onNext(actorResponse.getDatumList());
actorsMap.remove(String.join(Function.DELIMITTER, actorResponse.getKeys()));
actorsMap.remove(String.join(FunctionConstants.DELIMITTER, actorResponse.getKeys()));
if (actorsMap.isEmpty()) {
responseObserver.onCompleted();
getContext().getSystem().stop(getSelf());
Expand Down
24 changes: 24 additions & 0 deletions src/main/java/io/numaproj/numaflow/info/Language.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package io.numaproj.numaflow.info;

import com.fasterxml.jackson.annotation.JsonValue;

/**
* Please exercise cautions when updating the values below because the exact same values are defined in other Numaflow SDKs
* to form a contract between server and clients.
*/
public enum Language {
GO("go"),
PYTHON("python"),
JAVA("java");

private final String name;

Language(String name) {
this.name = name;
}

@JsonValue
public String getName() {
return name;
}
}
23 changes: 23 additions & 0 deletions src/main/java/io/numaproj/numaflow/info/Protocol.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.numaproj.numaflow.info;

import com.fasterxml.jackson.annotation.JsonValue;

/**
* Please exercise cautions when updating the values below because the exact same values are defined in other Numaflow SDKs
* to form a contract between server and clients.
*/
public enum Protocol {
UDS_PROTOCOL("uds"),
TCP_PROTOCOL("tcp");

private final String name;

Protocol(String name) {
this.name = name;
}

@JsonValue
public String getName() {
return name;
}
}
31 changes: 31 additions & 0 deletions src/main/java/io/numaproj/numaflow/info/ServerInfo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package io.numaproj.numaflow.info;

import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

import java.util.Map;

/**
* Server Information to be used by client to determine:
* - protocol: what is right protocol to use (UDS or TCP)
* - language: what is language used by the server
* - version: what is the numaflow sdk version used by the server
* - metadata: other information
*/
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class ServerInfo {
@JsonProperty("protocol")
private Protocol protocol;
@JsonProperty("language")
private Language language;
@JsonProperty("version")
private String version;
@JsonProperty("metadata")
private Map<String, String> metadata;
}
32 changes: 32 additions & 0 deletions src/main/java/io/numaproj/numaflow/info/ServerInfoAccessor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package io.numaproj.numaflow.info;

public interface ServerInfoAccessor {
/**
* Get runtime Java SDK version.
*/
String getSDKVersion();

/**
* Delete filePath if it exists.
* Write serverInfo to filePath in Json format.
* Append {@link ServerInfoConstants#EOF} as a new line to indicate end of file.
*
* @param serverInfo server information POJO
* @param filePath file path to write to
*
* @throws Exception any exceptions are thrown to the caller.
*/
void write(ServerInfo serverInfo, String filePath) throws Exception;

/**
* Read from filePath to retrieve server information POJO.
* This API is only used for unit tests.
*
* @param filePath file path to read from
*
* @return server information POJO
*
* @throws Exception any exceptions are thrown to the caller.
*/
ServerInfo read(String filePath) throws Exception;
}
Loading

0 comments on commit 33b9bc9

Please sign in to comment.