Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduce handshake to client and gRPC server #35

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,13 @@
<version>1.7.25</version>
</dependency>

<!-- jackson-databind for converting POJOs to JSON and back -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.14.2</version>
</dependency>

</dependencies>

<dependencyManagement>
Expand Down
14 changes: 7 additions & 7 deletions src/main/java/io/numaproj/numaflow/common/GRPCServerConfig.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.numaproj.numaflow.common;

import io.numaproj.numaflow.function.Function;
import io.numaproj.numaflow.function.FunctionConstants;
import io.numaproj.numaflow.info.ServerInfoConstants;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
Expand All @@ -11,12 +12,11 @@
public class GRPCServerConfig {
private String socketPath;
private int maxMessageSize;
private String infoFilePath;

public GRPCServerConfig(int maxMessageSize) {
this(Function.SOCKET_PATH, maxMessageSize);
}

public GRPCServerConfig(String socketPath) {
this(socketPath, Function.DEFAULT_MESSAGE_SIZE);
public GRPCServerConfig() {
this.socketPath = FunctionConstants.DEFAULT_SOCKET_PATH;
this.maxMessageSize = FunctionConstants.DEFAULT_MESSAGE_SIZE;
this.infoFilePath = ServerInfoConstants.DEFAULT_SERVER_INFO_FILE_PATH;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import io.grpc.Context;
import io.grpc.Metadata;

public class Function {
public static final String SOCKET_PATH = "/var/run/numaflow/function.sock";
public class FunctionConstants {
public static final String DEFAULT_SOCKET_PATH = "/var/run/numaflow/function.sock";

public static final int DEFAULT_MESSAGE_SIZE = 1024 * 1024 * 4;

Expand All @@ -19,26 +19,26 @@ public class Function {
public static final String SUCCESS = "SUCCESS";

public static final Metadata.Key<String> DATUM_METADATA_KEY = Metadata.Key.of(
Function.DATUM_KEY,
FunctionConstants.DATUM_KEY,
Metadata.ASCII_STRING_MARSHALLER);

public static final Metadata.Key<String> DATUM_METADATA_WIN_START = Metadata.Key.of(
Function.WIN_START_KEY,
FunctionConstants.WIN_START_KEY,
Metadata.ASCII_STRING_MARSHALLER);

public static final Metadata.Key<String> DATUM_METADATA_WIN_END = Metadata.Key.of(
Function.WIN_END_KEY,
FunctionConstants.WIN_END_KEY,
Metadata.ASCII_STRING_MARSHALLER);

public static final Context.Key<String> DATUM_CONTEXT_KEY = Context.keyWithDefault(
Function.DATUM_KEY,
FunctionConstants.DATUM_KEY,
"");

public static final Context.Key<String> WINDOW_START_TIME = Context.keyWithDefault(
Function.WIN_START_KEY,
FunctionConstants.WIN_START_KEY,
"");

public static final Context.Key<String> WINDOW_END_TIME = Context.keyWithDefault(
Function.WIN_END_KEY,
FunctionConstants.WIN_END_KEY,
"");
}
30 changes: 23 additions & 7 deletions src/main/java/io/numaproj/numaflow/function/FunctionServer.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.numaproj.numaflow.function;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.grpc.Context;
import io.grpc.Contexts;
import io.grpc.Metadata;
Expand All @@ -17,12 +18,18 @@
import io.numaproj.numaflow.function.mapt.MapTHandler;
import io.numaproj.numaflow.function.reduce.Reducer;
import io.numaproj.numaflow.function.reduce.ReducerFactory;
import io.numaproj.numaflow.info.Language;
import io.numaproj.numaflow.info.Protocol;
import io.numaproj.numaflow.info.ServerInfo;
import io.numaproj.numaflow.info.ServerInfoAccessor;
import io.numaproj.numaflow.info.ServerInfoAccessorImpl;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;

@Slf4j
Expand All @@ -31,10 +38,11 @@ public class FunctionServer {
private final GRPCServerConfig grpcServerConfig;
private final ServerBuilder<?> serverBuilder;
private final FunctionService functionService = new FunctionService();
private final ServerInfoAccessor serverInfoAccessor = new ServerInfoAccessorImpl(new ObjectMapper());
private Server server;

public FunctionServer() {
this(new GRPCServerConfig(Function.SOCKET_PATH, Function.DEFAULT_MESSAGE_SIZE));
this(new GRPCServerConfig());
}

/**
Expand Down Expand Up @@ -89,6 +97,14 @@ public void start() throws IOException {
}
}

// write server info to file
ServerInfo serverInfo = new ServerInfo(
Protocol.UDS_PROTOCOL,
Language.JAVA,
serverInfoAccessor.getSDKVersion(),
new HashMap<>());
serverInfoAccessor.write(serverInfo, grpcServerConfig.getInfoFilePath());

// build server
ServerInterceptor interceptor = new ServerInterceptor() {
@Override
Expand All @@ -99,12 +115,12 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(

final var context =
Context.current().withValues(
Function.DATUM_CONTEXT_KEY,
headers.get(Function.DATUM_METADATA_KEY),
Function.WINDOW_START_TIME,
headers.get(Function.DATUM_METADATA_WIN_START),
Function.WINDOW_END_TIME,
headers.get(Function.DATUM_METADATA_WIN_END));
FunctionConstants.DATUM_CONTEXT_KEY,
headers.get(FunctionConstants.DATUM_METADATA_KEY),
FunctionConstants.WINDOW_START_TIME,
headers.get(FunctionConstants.DATUM_METADATA_WIN_START),
FunctionConstants.WINDOW_END_TIME,
headers.get(FunctionConstants.DATUM_METADATA_WIN_END));
return Contexts.interceptCall(context, call, headers, next);
}
};
Expand Down
24 changes: 10 additions & 14 deletions src/main/java/io/numaproj/numaflow/function/FunctionService.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.dispatch.Futures;
import akka.dispatch.OnComplete;
import akka.pattern.Patterns;
import com.google.protobuf.ByteString;
import com.google.protobuf.Empty;
import io.grpc.stub.StreamObserver;
Expand All @@ -23,17 +20,12 @@
import io.numaproj.numaflow.function.v1.UserDefinedFunctionGrpc;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;

import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;

import static io.numaproj.numaflow.function.Function.EOF;
import static io.numaproj.numaflow.function.FunctionConstants.EOF;
import static io.numaproj.numaflow.function.v1.UserDefinedFunctionGrpc.getMapFnMethod;
import static io.numaproj.numaflow.function.v1.UserDefinedFunctionGrpc.getReduceFnMethod;

Expand Down Expand Up @@ -74,7 +66,7 @@ public void mapFn(
}

// get key from gPRC metadata
String key = Function.DATUM_CONTEXT_KEY.get();
String key = FunctionConstants.DATUM_CONTEXT_KEY.get();

// get Datum from request
HandlerDatum handlerDatum = new HandlerDatum(
Expand Down Expand Up @@ -108,7 +100,7 @@ public void mapTFn(
}

// get key from gPRC metadata
String key = Function.DATUM_CONTEXT_KEY.get();
String key = FunctionConstants.DATUM_CONTEXT_KEY.get();

// get Datum from request
HandlerDatum handlerDatum = new HandlerDatum(
Expand Down Expand Up @@ -142,8 +134,8 @@ public StreamObserver<Udfunction.Datum> reduceFn(final StreamObserver<Udfunction
}

// get window start and end time from gPRC metadata
String winSt = Function.WINDOW_START_TIME.get();
String winEt = Function.WINDOW_END_TIME.get();
String winSt = FunctionConstants.WINDOW_START_TIME.get();
String winEt = FunctionConstants.WINDOW_END_TIME.get();

// convert the start and end time to Instant
Instant startTime = Instant.ofEpochMilli(Long.parseLong(winSt));
Expand All @@ -165,7 +157,11 @@ public StreamObserver<Udfunction.Datum> reduceFn(final StreamObserver<Udfunction
we create a child actor for every key in a window.
*/
ActorRef supervisorActor = actorSystem
.actorOf(ReduceSupervisorActor.props(reducerFactory, md, shutdownActorRef, responseObserver));
.actorOf(ReduceSupervisorActor.props(
reducerFactory,
md,
shutdownActorRef,
responseObserver));


return new StreamObserver<Udfunction.Datum>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import akka.japi.pf.ReceiveBuilder;
import com.google.common.base.Preconditions;
import io.grpc.stub.StreamObserver;
import io.numaproj.numaflow.function.Function;
import io.numaproj.numaflow.function.FunctionConstants;
import io.numaproj.numaflow.function.FunctionService;
import io.numaproj.numaflow.function.HandlerDatum;
import io.numaproj.numaflow.function.metadata.Metadata;
Expand Down Expand Up @@ -55,7 +55,12 @@ public static Props props(
Metadata md,
ActorRef shutdownActor,
StreamObserver<Udfunction.DatumList> responseObserver) {
return Props.create(ReduceSupervisorActor.class, reducerFactory, md, shutdownActor, responseObserver);
return Props.create(
ReduceSupervisorActor.class,
reducerFactory,
md,
shutdownActor,
responseObserver);
}

// if there is an uncaught exception stop in the supervisor actor, send a signal to shut down
Expand All @@ -75,7 +80,7 @@ public SupervisorStrategy supervisorStrategy() {
@Override
public void postStop() {
log.debug("post stop of supervisor executed - {}", getSelf().toString());
shutdownActor.tell(Function.SUCCESS, ActorRef.noSender());
shutdownActor.tell(FunctionConstants.SUCCESS, ActorRef.noSender());
}

@Override
Expand Down
24 changes: 24 additions & 0 deletions src/main/java/io/numaproj/numaflow/info/Language.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package io.numaproj.numaflow.info;

import com.fasterxml.jackson.annotation.JsonValue;

/**
* Please exercise cautions when updating the values below because the exact same values are defined in other Numaflow SDKs
* to form a contract between server and clients.
*/
public enum Language {
GO("go"),
PYTHON("python"),
JAVA("java");

private final String name;

Language(String name) {
this.name = name;
}

@JsonValue
public String getName() {
return name;
}
}
23 changes: 23 additions & 0 deletions src/main/java/io/numaproj/numaflow/info/Protocol.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.numaproj.numaflow.info;

import com.fasterxml.jackson.annotation.JsonValue;

/**
* Please exercise cautions when updating the values below because the exact same values are defined in other Numaflow SDKs
* to form a contract between server and clients.
*/
public enum Protocol {
UDS_PROTOCOL("uds"),
TCP_PROTOCOL("tcp");

private final String name;

Protocol(String name) {
this.name = name;
}

@JsonValue
public String getName() {
return name;
}
}
31 changes: 31 additions & 0 deletions src/main/java/io/numaproj/numaflow/info/ServerInfo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package io.numaproj.numaflow.info;

import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

import java.util.Map;

/**
* Server Information to be used by client to determine:
* - protocol: what is right protocol to use (UDS or TCP)
* - language: what is language used by the server
* - version: what is the numaflow sdk version used by the server
* - metadata: other information
*/
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class ServerInfo {
@JsonProperty("protocol")
private Protocol protocol;
@JsonProperty("language")
private Language language;
@JsonProperty("version")
private String version;
@JsonProperty("metadata")
private Map<String, String> metadata;
}
34 changes: 34 additions & 0 deletions src/main/java/io/numaproj/numaflow/info/ServerInfoAccessor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package io.numaproj.numaflow.info;

import java.io.IOException;

public interface ServerInfoAccessor {
/**
* Get runtime Java SDK version.
*/
String getSDKVersion();

/**
* Delete filePath if it exists.
* Write serverInfo to filePath in Json format.
* Append {@link ServerInfoConstants#EOF} as a new line to indicate end of file.
*
* @param serverInfo server information POJO
* @param filePath file path to write to
*
* @throws IOException any IO exceptions are thrown to the caller.
*/
void write(ServerInfo serverInfo, String filePath) throws IOException;

/**
* Read from filePath to retrieve server information POJO.
* This API is only used for unit tests.
*
* @param filePath file path to read from
*
* @return server information POJO
*
* @throws IOException any IO exceptions are thrown to the caller.
*/
ServerInfo read(String filePath) throws IOException;
}
Loading