Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduce handshake to client and gRPC server #36

Merged
merged 5 commits into from
Apr 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,13 @@
<scope>test</scope>
</dependency>

<!-- jackson-databind for converting POJOs to JSON and back -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.14.2</version>
</dependency>

</dependencies>

<dependencyManagement>
Expand Down
14 changes: 7 additions & 7 deletions src/main/java/io/numaproj/numaflow/common/GRPCServerConfig.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.numaproj.numaflow.common;

import io.numaproj.numaflow.function.Function;
import io.numaproj.numaflow.function.FunctionConstants;
import io.numaproj.numaflow.info.ServerInfoConstants;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
Expand All @@ -11,12 +12,11 @@
public class GRPCServerConfig {
private String socketPath;
private int maxMessageSize;
private String infoFilePath;

public GRPCServerConfig(int maxMessageSize) {
this(Function.SOCKET_PATH, maxMessageSize);
}

public GRPCServerConfig(String socketPath) {
this(socketPath, Function.DEFAULT_MESSAGE_SIZE);
public GRPCServerConfig() {
this.socketPath = FunctionConstants.DEFAULT_SOCKET_PATH;
this.maxMessageSize = FunctionConstants.DEFAULT_MESSAGE_SIZE;
this.infoFilePath = ServerInfoConstants.DEFAULT_SERVER_INFO_FILE_PATH;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import io.grpc.Context;
import io.grpc.Metadata;

public class Function {
public static final String SOCKET_PATH = "/var/run/numaflow/function.sock";
public class FunctionConstants {
public static final String DEFAULT_SOCKET_PATH = "/var/run/numaflow/function.sock";

public static final int DEFAULT_MESSAGE_SIZE = 1024 * 1024 * 4;

Expand All @@ -19,18 +19,17 @@ public class Function {
public static final String DELIMITTER = ":";

public static final Metadata.Key<String> DATUM_METADATA_WIN_START = Metadata.Key.of(
Function.WIN_START_KEY,
FunctionConstants.WIN_START_KEY,
Metadata.ASCII_STRING_MARSHALLER);

public static final Metadata.Key<String> DATUM_METADATA_WIN_END = Metadata.Key.of(
Function.WIN_END_KEY,
FunctionConstants.WIN_END_KEY,
Metadata.ASCII_STRING_MARSHALLER);

public static final Context.Key<String> WINDOW_START_TIME = Context.keyWithDefault(
Function.WIN_START_KEY,
FunctionConstants.WIN_START_KEY,
"");

public static final Context.Key<String> WINDOW_END_TIME = Context.keyWithDefault(
Function.WIN_END_KEY,
FunctionConstants.WIN_END_KEY,
"");
}
38 changes: 27 additions & 11 deletions src/main/java/io/numaproj/numaflow/function/FunctionServer.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.numaproj.numaflow.function;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.grpc.Context;
import io.grpc.Contexts;
import io.grpc.Metadata;
Expand All @@ -17,12 +18,17 @@
import io.numaproj.numaflow.function.mapt.MapTHandler;
import io.numaproj.numaflow.function.reduce.ReduceHandler;
import io.numaproj.numaflow.function.reduce.ReducerFactory;
import io.numaproj.numaflow.info.Language;
import io.numaproj.numaflow.info.Protocol;
import io.numaproj.numaflow.info.ServerInfo;
import io.numaproj.numaflow.info.ServerInfoAccessor;
import io.numaproj.numaflow.info.ServerInfoAccessorImpl;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;

@Slf4j
Expand All @@ -31,10 +37,11 @@ public class FunctionServer {
private final GRPCServerConfig grpcServerConfig;
private final ServerBuilder<?> serverBuilder;
private final FunctionService functionService = new FunctionService();
private final ServerInfoAccessor serverInfoAccessor = new ServerInfoAccessorImpl(new ObjectMapper());
private Server server;

public FunctionServer() {
this(new GRPCServerConfig(Function.SOCKET_PATH, Function.DEFAULT_MESSAGE_SIZE));
this(new GRPCServerConfig());
}

/**
Expand Down Expand Up @@ -78,17 +85,26 @@ public FunctionServer registerReducerFactory(ReducerFactory<? extends ReduceHand
/**
* Start serving requests.
*/
public void start() throws IOException {
public void start() throws Exception {
String socketPath = grpcServerConfig.getSocketPath();
String infoFilePath = grpcServerConfig.getInfoFilePath();
// cleanup socket path if it exists (unit test builder doesn't use one)
if (grpcServerConfig.getSocketPath() != null) {
Path path = Paths.get(grpcServerConfig.getSocketPath());
if (socketPath != null) {
Path path = Paths.get(socketPath);
Files.deleteIfExists(path);
if (Files.exists(path)) {
log.error("Failed to clean up socket path \"" + grpcServerConfig.getSocketPath()
+ "\". Exiting");
log.error("Failed to clean up socket path {}. Exiting", socketPath);
}
}

// write server info to file
ServerInfo serverInfo = new ServerInfo(
Protocol.UDS_PROTOCOL,
Language.JAVA,
serverInfoAccessor.getSDKVersion(),
new HashMap<>());
serverInfoAccessor.write(serverInfo, infoFilePath);

// build server
ServerInterceptor interceptor = new ServerInterceptor() {
@Override
Expand All @@ -99,10 +115,10 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(

final var context =
Context.current().withValues(
Function.WINDOW_START_TIME,
headers.get(Function.DATUM_METADATA_WIN_START),
Function.WINDOW_END_TIME,
headers.get(Function.DATUM_METADATA_WIN_END));
FunctionConstants.WINDOW_START_TIME,
headers.get(FunctionConstants.DATUM_METADATA_WIN_START),
FunctionConstants.WINDOW_END_TIME,
headers.get(FunctionConstants.DATUM_METADATA_WIN_END));
return Contexts.interceptCall(context, call, headers, next);
}
};
Expand Down
10 changes: 5 additions & 5 deletions src/main/java/io/numaproj/numaflow/function/FunctionService.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;

import static io.numaproj.numaflow.function.Function.EOF;
import static io.numaproj.numaflow.function.FunctionConstants.EOF;
import static io.numaproj.numaflow.function.v1.UserDefinedFunctionGrpc.getMapFnMethod;
import static io.numaproj.numaflow.function.v1.UserDefinedFunctionGrpc.getReduceFnMethod;

Expand Down Expand Up @@ -145,8 +145,8 @@ public StreamObserver<Udfunction.DatumRequest> reduceFn(final StreamObserver<Udf
}

// get window start and end time from gPRC metadata
String winSt = Function.WINDOW_START_TIME.get();
String winEt = Function.WINDOW_END_TIME.get();
String winSt = FunctionConstants.WINDOW_START_TIME.get();
String winEt = FunctionConstants.WINDOW_END_TIME.get();

// convert the start and end time to Instant
Instant startTime = Instant.ofEpochMilli(Long.parseLong(winSt));
Expand Down Expand Up @@ -220,9 +220,9 @@ private Udfunction.DatumResponseList buildDatumListResponse(MessageList messageL
.setValue(message.getValue() == null ? ByteString.EMPTY : ByteString.copyFrom(
message.getValue()))
.addAllKeys(message.getKeys()
== null ? new ArrayList<>() : List.of(message.getKeys()))
== null ? new ArrayList<>():List.of(message.getKeys()))
.addAllTags(message.getTags()
== null ? new ArrayList<>() : List.of(message.getTags()))
== null ? new ArrayList<>():List.of(message.getTags()))
.build());
});
return datumListBuilder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import akka.japi.pf.ReceiveBuilder;
import com.google.common.base.Preconditions;
import io.grpc.stub.StreamObserver;
import io.numaproj.numaflow.function.Function;
import io.numaproj.numaflow.function.FunctionConstants;
import io.numaproj.numaflow.function.FunctionService;
import io.numaproj.numaflow.function.HandlerDatum;
import io.numaproj.numaflow.function.HandlerDatumMetadata;
Expand Down Expand Up @@ -77,7 +77,7 @@ public SupervisorStrategy supervisorStrategy() {
@Override
public void postStop() {
log.debug("post stop of supervisor executed - {}", getSelf().toString());
shutdownActor.tell(Function.SUCCESS, ActorRef.noSender());
shutdownActor.tell(FunctionConstants.SUCCESS, ActorRef.noSender());
}

@Override
Expand All @@ -97,7 +97,7 @@ public Receive createReceive() {
*/
private void invokeActors(Udfunction.DatumRequest datumRequest) {
String[] keys = datumRequest.getKeysList().toArray(new String[0]);
String keyStr = String.join(Function.DELIMITTER, keys);
String keyStr = String.join(FunctionConstants.DELIMITTER, keys);
if (!actorsMap.containsKey(keyStr)) {
ReduceHandler reduceHandler = reducerFactory.createReducer();
ActorRef actorRef = getContext()
Expand Down Expand Up @@ -125,7 +125,7 @@ private void responseListener(ActorResponse actorResponse) {
*/

responseObserver.onNext(actorResponse.getDatumList());
actorsMap.remove(String.join(Function.DELIMITTER, actorResponse.getKeys()));
actorsMap.remove(String.join(FunctionConstants.DELIMITTER, actorResponse.getKeys()));
if (actorsMap.isEmpty()) {
responseObserver.onCompleted();
getContext().getSystem().stop(getSelf());
Expand Down
24 changes: 24 additions & 0 deletions src/main/java/io/numaproj/numaflow/info/Language.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package io.numaproj.numaflow.info;

import com.fasterxml.jackson.annotation.JsonValue;

/**
* Please exercise cautions when updating the values below because the exact same values are defined in other Numaflow SDKs
* to form a contract between server and clients.
*/
public enum Language {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason why we didn't go with proto? Since it's common among all the sdks we could have kept a common proto file

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point Yashash. I think we should go with proto, that way we can catch any data format issue during development instead of runtime. @whynowy @vigith thoughts?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, we need to standardize our proto location before we do that. today we manually sink it between the repos.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be addressed by numaproj/numaflow#693

GO("go"),
PYTHON("python"),
JAVA("java");

private final String name;

Language(String name) {
this.name = name;
}

@JsonValue
public String getName() {
return name;
}
}
23 changes: 23 additions & 0 deletions src/main/java/io/numaproj/numaflow/info/Protocol.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.numaproj.numaflow.info;

import com.fasterxml.jackson.annotation.JsonValue;

/**
* Please exercise cautions when updating the values below because the exact same values are defined in other Numaflow SDKs
* to form a contract between server and clients.
*/
public enum Protocol {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment

UDS_PROTOCOL("uds"),
TCP_PROTOCOL("tcp");

private final String name;

Protocol(String name) {
this.name = name;
}

@JsonValue
public String getName() {
return name;
}
}
31 changes: 31 additions & 0 deletions src/main/java/io/numaproj/numaflow/info/ServerInfo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package io.numaproj.numaflow.info;

import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

import java.util.Map;

/**
* Server Information to be used by client to determine:
* - protocol: what is right protocol to use (UDS or TCP)
* - language: what is language used by the server
* - version: what is the numaflow sdk version used by the server
* - metadata: other information
*/
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class ServerInfo {
@JsonProperty("protocol")
private Protocol protocol;
@JsonProperty("language")
private Language language;
@JsonProperty("version")
private String version;
@JsonProperty("metadata")
private Map<String, String> metadata;
}
32 changes: 32 additions & 0 deletions src/main/java/io/numaproj/numaflow/info/ServerInfoAccessor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package io.numaproj.numaflow.info;

public interface ServerInfoAccessor {
/**
* Get runtime Java SDK version.
*/
String getSDKVersion();

/**
* Delete filePath if it exists.
* Write serverInfo to filePath in Json format.
* Append {@link ServerInfoConstants#EOF} as a new line to indicate end of file.
*
* @param serverInfo server information POJO
* @param filePath file path to write to
*
* @throws Exception any exceptions are thrown to the caller.
*/
void write(ServerInfo serverInfo, String filePath) throws Exception;

/**
* Read from filePath to retrieve server information POJO.
* This API is only used for unit tests.
*
* @param filePath file path to read from
*
* @return server information POJO
*
* @throws Exception any exceptions are thrown to the caller.
*/
ServerInfo read(String filePath) throws Exception;
}
Loading