diff --git a/pom.xml b/pom.xml
index 5e0d2473..07d1e438 100644
--- a/pom.xml
+++ b/pom.xml
@@ -103,6 +103,13 @@
test
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+ 2.14.2
+
+
diff --git a/src/main/java/io/numaproj/numaflow/common/GRPCServerConfig.java b/src/main/java/io/numaproj/numaflow/common/GRPCServerConfig.java
index cb8f3340..ad391f8f 100644
--- a/src/main/java/io/numaproj/numaflow/common/GRPCServerConfig.java
+++ b/src/main/java/io/numaproj/numaflow/common/GRPCServerConfig.java
@@ -1,6 +1,7 @@
package io.numaproj.numaflow.common;
-import io.numaproj.numaflow.function.Function;
+import io.numaproj.numaflow.function.FunctionConstants;
+import io.numaproj.numaflow.info.ServerInfoConstants;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
@@ -11,12 +12,11 @@
public class GRPCServerConfig {
private String socketPath;
private int maxMessageSize;
+ private String infoFilePath;
- public GRPCServerConfig(int maxMessageSize) {
- this(Function.SOCKET_PATH, maxMessageSize);
- }
-
- public GRPCServerConfig(String socketPath) {
- this(socketPath, Function.DEFAULT_MESSAGE_SIZE);
+ public GRPCServerConfig() {
+ this.socketPath = FunctionConstants.DEFAULT_SOCKET_PATH;
+ this.maxMessageSize = FunctionConstants.DEFAULT_MESSAGE_SIZE;
+ this.infoFilePath = ServerInfoConstants.DEFAULT_SERVER_INFO_FILE_PATH;
}
}
diff --git a/src/main/java/io/numaproj/numaflow/function/Function.java b/src/main/java/io/numaproj/numaflow/function/FunctionConstants.java
similarity index 75%
rename from src/main/java/io/numaproj/numaflow/function/Function.java
rename to src/main/java/io/numaproj/numaflow/function/FunctionConstants.java
index 03a85035..e4493557 100644
--- a/src/main/java/io/numaproj/numaflow/function/Function.java
+++ b/src/main/java/io/numaproj/numaflow/function/FunctionConstants.java
@@ -3,8 +3,8 @@
import io.grpc.Context;
import io.grpc.Metadata;
-public class Function {
- public static final String SOCKET_PATH = "/var/run/numaflow/function.sock";
+public class FunctionConstants {
+ public static final String DEFAULT_SOCKET_PATH = "/var/run/numaflow/function.sock";
public static final int DEFAULT_MESSAGE_SIZE = 1024 * 1024 * 4;
@@ -19,18 +19,17 @@ public class Function {
public static final String DELIMITTER = ":";
public static final Metadata.Key DATUM_METADATA_WIN_START = Metadata.Key.of(
- Function.WIN_START_KEY,
+ FunctionConstants.WIN_START_KEY,
Metadata.ASCII_STRING_MARSHALLER);
public static final Metadata.Key DATUM_METADATA_WIN_END = Metadata.Key.of(
- Function.WIN_END_KEY,
+ FunctionConstants.WIN_END_KEY,
Metadata.ASCII_STRING_MARSHALLER);
-
public static final Context.Key WINDOW_START_TIME = Context.keyWithDefault(
- Function.WIN_START_KEY,
+ FunctionConstants.WIN_START_KEY,
"");
public static final Context.Key WINDOW_END_TIME = Context.keyWithDefault(
- Function.WIN_END_KEY,
+ FunctionConstants.WIN_END_KEY,
"");
}
diff --git a/src/main/java/io/numaproj/numaflow/function/FunctionServer.java b/src/main/java/io/numaproj/numaflow/function/FunctionServer.java
index 8e6e8aea..3b56e124 100644
--- a/src/main/java/io/numaproj/numaflow/function/FunctionServer.java
+++ b/src/main/java/io/numaproj/numaflow/function/FunctionServer.java
@@ -1,5 +1,6 @@
package io.numaproj.numaflow.function;
+import com.fasterxml.jackson.databind.ObjectMapper;
import io.grpc.Context;
import io.grpc.Contexts;
import io.grpc.Metadata;
@@ -17,12 +18,18 @@
import io.numaproj.numaflow.function.mapt.MapTHandler;
import io.numaproj.numaflow.function.reduce.ReduceHandler;
import io.numaproj.numaflow.function.reduce.ReducerFactory;
+import io.numaproj.numaflow.info.Language;
+import io.numaproj.numaflow.info.Protocol;
+import io.numaproj.numaflow.info.ServerInfo;
+import io.numaproj.numaflow.info.ServerInfoAccessor;
+import io.numaproj.numaflow.info.ServerInfoAccessorImpl;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.HashMap;
import java.util.concurrent.TimeUnit;
@Slf4j
@@ -31,10 +38,11 @@ public class FunctionServer {
private final GRPCServerConfig grpcServerConfig;
private final ServerBuilder> serverBuilder;
private final FunctionService functionService = new FunctionService();
+ private final ServerInfoAccessor serverInfoAccessor = new ServerInfoAccessorImpl(new ObjectMapper());
private Server server;
public FunctionServer() {
- this(new GRPCServerConfig(Function.SOCKET_PATH, Function.DEFAULT_MESSAGE_SIZE));
+ this(new GRPCServerConfig());
}
/**
@@ -89,6 +97,14 @@ public void start() throws IOException {
}
}
+ // write server info to file
+ ServerInfo serverInfo = new ServerInfo(
+ Protocol.UDS_PROTOCOL,
+ Language.JAVA,
+ serverInfoAccessor.getSDKVersion(),
+ new HashMap<>());
+ serverInfoAccessor.write(serverInfo, grpcServerConfig.getInfoFilePath());
+
// build server
ServerInterceptor interceptor = new ServerInterceptor() {
@Override
@@ -99,10 +115,10 @@ public ServerCall.Listener interceptCall(
final var context =
Context.current().withValues(
- Function.WINDOW_START_TIME,
- headers.get(Function.DATUM_METADATA_WIN_START),
- Function.WINDOW_END_TIME,
- headers.get(Function.DATUM_METADATA_WIN_END));
+ FunctionConstants.WINDOW_START_TIME,
+ headers.get(FunctionConstants.DATUM_METADATA_WIN_START),
+ FunctionConstants.WINDOW_END_TIME,
+ headers.get(FunctionConstants.DATUM_METADATA_WIN_END));
return Contexts.interceptCall(context, call, headers, next);
}
};
diff --git a/src/main/java/io/numaproj/numaflow/function/FunctionService.java b/src/main/java/io/numaproj/numaflow/function/FunctionService.java
index e4d76439..2a8268f6 100644
--- a/src/main/java/io/numaproj/numaflow/function/FunctionService.java
+++ b/src/main/java/io/numaproj/numaflow/function/FunctionService.java
@@ -27,7 +27,7 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
-import static io.numaproj.numaflow.function.Function.EOF;
+import static io.numaproj.numaflow.function.FunctionConstants.EOF;
import static io.numaproj.numaflow.function.v1.UserDefinedFunctionGrpc.getMapFnMethod;
import static io.numaproj.numaflow.function.v1.UserDefinedFunctionGrpc.getReduceFnMethod;
@@ -144,8 +144,8 @@ public StreamObserver reduceFn(final StreamObserver() : List.of(message.getKeys()))
+ == null ? new ArrayList<>():List.of(message.getKeys()))
.addAllTags(message.getTags()
- == null ? new ArrayList<>() : List.of(message.getTags()))
+ == null ? new ArrayList<>():List.of(message.getTags()))
.build());
});
return datumListBuilder.build();
@@ -236,9 +236,9 @@ private Udfunction.DatumResponseList buildDatumListResponse(MessageTList message
.setNanos(messageT.getEventTime().getNano()))
)
.addAllKeys(messageT.getKeys()
- == null ? new ArrayList<>() : List.of(messageT.getKeys()))
+ == null ? new ArrayList<>():List.of(messageT.getKeys()))
.addAllTags(messageT.getTags()
- == null ? new ArrayList<>() : List.of(messageT.getTags()))
+ == null ? new ArrayList<>():List.of(messageT.getTags()))
.setValue(ByteString.copyFrom(messageT.getValue()))
.build());
});
diff --git a/src/main/java/io/numaproj/numaflow/function/reduce/ReduceSupervisorActor.java b/src/main/java/io/numaproj/numaflow/function/reduce/ReduceSupervisorActor.java
index dcc3a90d..e33bee5c 100644
--- a/src/main/java/io/numaproj/numaflow/function/reduce/ReduceSupervisorActor.java
+++ b/src/main/java/io/numaproj/numaflow/function/reduce/ReduceSupervisorActor.java
@@ -9,7 +9,7 @@
import akka.japi.pf.ReceiveBuilder;
import com.google.common.base.Preconditions;
import io.grpc.stub.StreamObserver;
-import io.numaproj.numaflow.function.Function;
+import io.numaproj.numaflow.function.FunctionConstants;
import io.numaproj.numaflow.function.FunctionService;
import io.numaproj.numaflow.function.HandlerDatum;
import io.numaproj.numaflow.function.HandlerDatumMetadata;
@@ -77,7 +77,7 @@ public SupervisorStrategy supervisorStrategy() {
@Override
public void postStop() {
log.debug("post stop of supervisor executed - {}", getSelf().toString());
- shutdownActor.tell(Function.SUCCESS, ActorRef.noSender());
+ shutdownActor.tell(FunctionConstants.SUCCESS, ActorRef.noSender());
}
@Override
@@ -97,7 +97,7 @@ public Receive createReceive() {
*/
private void invokeActors(Udfunction.DatumRequest datumRequest) {
String[] keys = datumRequest.getKeysList().toArray(new String[0]);
- String keyStr = String.join(Function.DELIMITTER, keys);
+ String keyStr = String.join(FunctionConstants.DELIMITTER, keys);
if (!actorsMap.containsKey(keyStr)) {
ReduceHandler reduceHandler = reducerFactory.createReducer();
ActorRef actorRef = getContext()
@@ -125,7 +125,7 @@ private void responseListener(ActorResponse actorResponse) {
*/
responseObserver.onNext(actorResponse.getDatumList());
- actorsMap.remove(String.join(Function.DELIMITTER, actorResponse.getKeys()));
+ actorsMap.remove(String.join(FunctionConstants.DELIMITTER, actorResponse.getKeys()));
if (actorsMap.isEmpty()) {
responseObserver.onCompleted();
getContext().getSystem().stop(getSelf());
diff --git a/src/main/java/io/numaproj/numaflow/info/Language.java b/src/main/java/io/numaproj/numaflow/info/Language.java
new file mode 100644
index 00000000..0e6f4e7b
--- /dev/null
+++ b/src/main/java/io/numaproj/numaflow/info/Language.java
@@ -0,0 +1,24 @@
+package io.numaproj.numaflow.info;
+
+import com.fasterxml.jackson.annotation.JsonValue;
+
+/**
+ * Please exercise cautions when updating the values below because the exact same values are defined in other Numaflow SDKs
+ * to form a contract between server and clients.
+ */
+public enum Language {
+ GO("go"),
+ PYTHON("python"),
+ JAVA("java");
+
+ private final String name;
+
+ Language(String name) {
+ this.name = name;
+ }
+
+ @JsonValue
+ public String getName() {
+ return name;
+ }
+}
diff --git a/src/main/java/io/numaproj/numaflow/info/Protocol.java b/src/main/java/io/numaproj/numaflow/info/Protocol.java
new file mode 100644
index 00000000..2ef872e7
--- /dev/null
+++ b/src/main/java/io/numaproj/numaflow/info/Protocol.java
@@ -0,0 +1,23 @@
+package io.numaproj.numaflow.info;
+
+import com.fasterxml.jackson.annotation.JsonValue;
+
+/**
+ * Please exercise cautions when updating the values below because the exact same values are defined in other Numaflow SDKs
+ * to form a contract between server and clients.
+ */
+public enum Protocol {
+ UDS_PROTOCOL("uds"),
+ TCP_PROTOCOL("tcp");
+
+ private final String name;
+
+ Protocol(String name) {
+ this.name = name;
+ }
+
+ @JsonValue
+ public String getName() {
+ return name;
+ }
+}
diff --git a/src/main/java/io/numaproj/numaflow/info/ServerInfo.java b/src/main/java/io/numaproj/numaflow/info/ServerInfo.java
new file mode 100644
index 00000000..7788941b
--- /dev/null
+++ b/src/main/java/io/numaproj/numaflow/info/ServerInfo.java
@@ -0,0 +1,31 @@
+package io.numaproj.numaflow.info;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+
+import java.util.Map;
+
+/**
+ * Server Information to be used by client to determine:
+ * - protocol: what is right protocol to use (UDS or TCP)
+ * - language: what is language used by the server
+ * - version: what is the numaflow sdk version used by the server
+ * - metadata: other information
+ */
+@Getter
+@Setter
+@NoArgsConstructor
+@AllArgsConstructor
+public class ServerInfo {
+ @JsonProperty("protocol")
+ private Protocol protocol;
+ @JsonProperty("language")
+ private Language language;
+ @JsonProperty("version")
+ private String version;
+ @JsonProperty("metadata")
+ private Map metadata;
+}
diff --git a/src/main/java/io/numaproj/numaflow/info/ServerInfoAccessor.java b/src/main/java/io/numaproj/numaflow/info/ServerInfoAccessor.java
new file mode 100644
index 00000000..07c12105
--- /dev/null
+++ b/src/main/java/io/numaproj/numaflow/info/ServerInfoAccessor.java
@@ -0,0 +1,34 @@
+package io.numaproj.numaflow.info;
+
+import java.io.IOException;
+
+public interface ServerInfoAccessor {
+ /**
+ * Get runtime Java SDK version.
+ */
+ String getSDKVersion();
+
+ /**
+ * Delete filePath if it exists.
+ * Write serverInfo to filePath in Json format.
+ * Append {@link ServerInfoConstants#EOF} as a new line to indicate end of file.
+ *
+ * @param serverInfo server information POJO
+ * @param filePath file path to write to
+ *
+ * @throws IOException any IO exceptions are thrown to the caller.
+ */
+ void write(ServerInfo serverInfo, String filePath) throws IOException;
+
+ /**
+ * Read from filePath to retrieve server information POJO.
+ * This API is only used for unit tests.
+ *
+ * @param filePath file path to read from
+ *
+ * @return server information POJO
+ *
+ * @throws IOException any IO exceptions are thrown to the caller.
+ */
+ ServerInfo read(String filePath) throws IOException;
+}
diff --git a/src/main/java/io/numaproj/numaflow/info/ServerInfoAccessorImpl.java b/src/main/java/io/numaproj/numaflow/info/ServerInfoAccessorImpl.java
new file mode 100644
index 00000000..2d08fe26
--- /dev/null
+++ b/src/main/java/io/numaproj/numaflow/info/ServerInfoAccessorImpl.java
@@ -0,0 +1,51 @@
+package io.numaproj.numaflow.info;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.AllArgsConstructor;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+
+@AllArgsConstructor
+public class ServerInfoAccessorImpl implements ServerInfoAccessor {
+ private ObjectMapper objectMapper;
+
+ @Override
+ public String getSDKVersion() {
+ // This only works for Java 9 and above.
+ // Since we already use 11+ for numaflow SDK, it's safe to apply this approach.
+ return String.valueOf(Runtime.version().version().get(0));
+ }
+
+ @Override
+ public void write(ServerInfo serverInfo, String filePath) throws IOException {
+ File file = new File(filePath);
+ if (file.exists()) {
+ file.delete();
+ }
+ FileWriter fileWriter = new FileWriter(filePath, false);
+ objectMapper.writeValue(fileWriter, serverInfo);
+ FileWriter eofWriter = new FileWriter(filePath, true);
+ eofWriter.append("\n").append(ServerInfoConstants.EOF);
+ eofWriter.close();
+ fileWriter.close();
+ }
+
+ @Override
+ public ServerInfo read(String filePath) throws IOException {
+ File file = new File(filePath);
+ BufferedReader bufferedReader = new BufferedReader(new FileReader(file));
+ StringBuilder stringBuilder = new StringBuilder();
+ String line;
+ while ((line = bufferedReader.readLine()) != null
+ && !line.equals(ServerInfoConstants.EOF)) {
+ stringBuilder.append(line);
+ }
+ ServerInfo serverInfo = objectMapper.readValue(stringBuilder.toString(), ServerInfo.class);
+ bufferedReader.close();
+ return serverInfo;
+ }
+}
diff --git a/src/main/java/io/numaproj/numaflow/info/ServerInfoConstants.java b/src/main/java/io/numaproj/numaflow/info/ServerInfoConstants.java
new file mode 100644
index 00000000..5fdfed7d
--- /dev/null
+++ b/src/main/java/io/numaproj/numaflow/info/ServerInfoConstants.java
@@ -0,0 +1,11 @@
+package io.numaproj.numaflow.info;
+
+/**
+ * Please exercise cautions when updating the values below because the exact same values are defined in other Numaflow SDKs
+ * to form a contract between server and clients.
+ */
+public class ServerInfoConstants {
+ public static final String DEFAULT_SERVER_INFO_FILE_PATH = "/var/run/numaflow/server-info";
+ public static final String EOF = "U+005C__END__";
+}
+
diff --git a/src/main/java/io/numaproj/numaflow/sink/Sink.java b/src/main/java/io/numaproj/numaflow/sink/Sink.java
deleted file mode 100644
index bb8d4e79..00000000
--- a/src/main/java/io/numaproj/numaflow/sink/Sink.java
+++ /dev/null
@@ -1,6 +0,0 @@
-package io.numaproj.numaflow.sink;
-
-public class Sink {
- public static final String SOCKET_PATH = "/var/run/numaflow/udsink.sock";
- public static final int DEFAULT_MESSAGE_SIZE = 1024 * 1024 * 4;
-}
diff --git a/src/main/java/io/numaproj/numaflow/sink/SinkConstants.java b/src/main/java/io/numaproj/numaflow/sink/SinkConstants.java
new file mode 100644
index 00000000..6116224e
--- /dev/null
+++ b/src/main/java/io/numaproj/numaflow/sink/SinkConstants.java
@@ -0,0 +1,6 @@
+package io.numaproj.numaflow.sink;
+
+public class SinkConstants {
+ public static final String DEFAULT_SOCKET_PATH = "/var/run/numaflow/udsink.sock";
+ public static final int DEFAULT_MESSAGE_SIZE = 1024 * 1024 * 4;
+}
diff --git a/src/main/java/io/numaproj/numaflow/sink/SinkServer.java b/src/main/java/io/numaproj/numaflow/sink/SinkServer.java
index 4c039149..f779f32c 100644
--- a/src/main/java/io/numaproj/numaflow/sink/SinkServer.java
+++ b/src/main/java/io/numaproj/numaflow/sink/SinkServer.java
@@ -1,5 +1,6 @@
package io.numaproj.numaflow.sink;
+import com.fasterxml.jackson.databind.ObjectMapper;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.netty.NettyServerBuilder;
@@ -7,12 +8,18 @@
import io.netty.channel.epoll.EpollServerDomainSocketChannel;
import io.netty.channel.unix.DomainSocketAddress;
import io.numaproj.numaflow.common.GRPCServerConfig;
+import io.numaproj.numaflow.info.Language;
+import io.numaproj.numaflow.info.Protocol;
+import io.numaproj.numaflow.info.ServerInfo;
+import io.numaproj.numaflow.info.ServerInfoAccessor;
+import io.numaproj.numaflow.info.ServerInfoAccessorImpl;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.HashMap;
import java.util.concurrent.TimeUnit;
@Slf4j
@@ -21,10 +28,11 @@ public class SinkServer {
private final GRPCServerConfig grpcServerConfig;
private final ServerBuilder> serverBuilder;
private final SinkService sinkService = new SinkService();
+ private final ServerInfoAccessor serverInfoAccessor = new ServerInfoAccessorImpl(new ObjectMapper());
private Server server;
public SinkServer() {
- this(new GRPCServerConfig(Sink.SOCKET_PATH, Sink.DEFAULT_MESSAGE_SIZE));
+ this(new GRPCServerConfig());
}
/**
@@ -59,16 +67,25 @@ public SinkServer registerSinker(SinkHandler sinkHandler) {
* Start serving requests.
*/
public void start() throws IOException {
+ String socketPath = grpcServerConfig.getSocketPath();
+ String infoFilePath = grpcServerConfig.getInfoFilePath();
// cleanup socket path if it exists (unit test builder doesn't use one)
- if (grpcServerConfig.getSocketPath() != null) {
- Path path = Paths.get(grpcServerConfig.getSocketPath());
+ if (socketPath != null) {
+ Path path = Paths.get(socketPath);
Files.deleteIfExists(path);
if (Files.exists(path)) {
- log.error("Failed to clean up socket path \"" + grpcServerConfig.getSocketPath()
- + "\". Exiting");
+ log.error("Failed to clean up socket path {}. Exiting", socketPath);
}
}
+ // write server info to file
+ ServerInfo serverInfo = new ServerInfo(
+ Protocol.UDS_PROTOCOL,
+ Language.JAVA,
+ serverInfoAccessor.getSDKVersion(),
+ new HashMap<>());
+ serverInfoAccessor.write(serverInfo, grpcServerConfig.getInfoFilePath());
+
// build server
server = serverBuilder
.addService(this.sinkService)
diff --git a/src/test/java/io/numaproj/numaflow/function/FunctionServerTest.java b/src/test/java/io/numaproj/numaflow/function/FunctionServerTest.java
index 20444a1d..39a396f7 100644
--- a/src/test/java/io/numaproj/numaflow/function/FunctionServerTest.java
+++ b/src/test/java/io/numaproj/numaflow/function/FunctionServerTest.java
@@ -25,8 +25,8 @@
import java.util.Arrays;
import java.util.List;
-import static io.numaproj.numaflow.function.Function.WIN_END_KEY;
-import static io.numaproj.numaflow.function.Function.WIN_START_KEY;
+import static io.numaproj.numaflow.function.FunctionConstants.WIN_END_KEY;
+import static io.numaproj.numaflow.function.FunctionConstants.WIN_START_KEY;
import static org.junit.Assert.assertEquals;
@RunWith(JUnit4.class)
@@ -45,12 +45,13 @@ public class FunctionServerTest {
public void setUp() throws Exception {
String serverName = InProcessServerBuilder.generateName();
+ GRPCServerConfig grpcServerConfig = new GRPCServerConfig();
+ grpcServerConfig.setInfoFilePath("/tmp/numaflow-test-server-info");
server = new FunctionServer(
InProcessServerBuilder.forName(serverName).directExecutor(),
- new GRPCServerConfig(Function.SOCKET_PATH, Function.DEFAULT_MESSAGE_SIZE));
+ grpcServerConfig);
- server
- .registerMapHandler(new TestMapFn())
+ server.registerMapHandler(new TestMapFn())
.registerMapTHandler(new TestMapTFn())
.registerReducerFactory(new ReduceTestFactory())
.start();
diff --git a/src/test/java/io/numaproj/numaflow/function/reduce/ReduceSupervisorActorTest.java b/src/test/java/io/numaproj/numaflow/function/reduce/ReduceSupervisorActorTest.java
index 247b584a..d4f0126a 100644
--- a/src/test/java/io/numaproj/numaflow/function/reduce/ReduceSupervisorActorTest.java
+++ b/src/test/java/io/numaproj/numaflow/function/reduce/ReduceSupervisorActorTest.java
@@ -17,7 +17,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
-import static io.numaproj.numaflow.function.Function.EOF;
+import static io.numaproj.numaflow.function.FunctionConstants.EOF;
import static org.junit.Assert.fail;
public class ReduceSupervisorActorTest {
diff --git a/src/test/java/io/numaproj/numaflow/info/ServerInfoAccessorImplTest.java b/src/test/java/io/numaproj/numaflow/info/ServerInfoAccessorImplTest.java
new file mode 100644
index 00000000..488a04fd
--- /dev/null
+++ b/src/test/java/io/numaproj/numaflow/info/ServerInfoAccessorImplTest.java
@@ -0,0 +1,61 @@
+package io.numaproj.numaflow.info;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.fail;
+
+@RunWith(JUnit4.class)
+public class ServerInfoAccessorImplTest {
+ private ServerInfoAccessor underTest = new ServerInfoAccessorImpl(new ObjectMapper());
+
+ @Test
+ public void given_localEnvironment_when_getJavaSDKVersion_then_returnAValidVersion() {
+ String got = this.underTest.getSDKVersion();
+ assertNotEquals("", got);
+ try {
+ Integer.parseInt(got);
+ } catch (NumberFormatException e) {
+ fail("Expected java SDK version to be a valid integer.");
+ }
+ }
+
+ @Test
+ public void given_writeServerInfo_when_read_then_returnExactSame() {
+ ServerInfo testServerInfo = new ServerInfo(
+ Protocol.TCP_PROTOCOL,
+ Language.JAVA,
+ "11",
+ new HashMap<>() {{
+ put("key1", "value1");
+ put("key2", "value2");
+ }}
+ );
+ String testFilePath = "/var/tmp/test-path";
+ try {
+ this.underTest.write(testServerInfo, testFilePath);
+ ServerInfo got = this.underTest.read(testFilePath);
+ assertEquals(testServerInfo.getLanguage(), got.getLanguage());
+ assertEquals(testServerInfo.getProtocol(), got.getProtocol());
+ assertEquals(testServerInfo.getVersion(), got.getVersion());
+ assertEquals(testServerInfo.getMetadata(), got.getMetadata());
+ } catch (IOException e) {
+ e.printStackTrace();
+ System.out.println(e.getMessage());
+ fail("Expected no exception.");
+ }
+ }
+
+ @Test(expected = IOException.class)
+ public void given_fileNotExist_when_read_then_throwIOException() throws IOException {
+ String testFilePath = "/var/tmp/test-no-existing-path";
+ this.underTest.read(testFilePath);
+ }
+}
diff --git a/src/test/java/io/numaproj/numaflow/sink/SinkServerTest.java b/src/test/java/io/numaproj/numaflow/sink/SinkServerTest.java
index fab5f82d..ed7f3ab4 100644
--- a/src/test/java/io/numaproj/numaflow/sink/SinkServerTest.java
+++ b/src/test/java/io/numaproj/numaflow/sink/SinkServerTest.java
@@ -35,9 +35,11 @@ public class SinkServerTest {
@Before
public void setUp() throws Exception {
String serverName = InProcessServerBuilder.generateName();
+ GRPCServerConfig grpcServerConfig = new GRPCServerConfig();
+ grpcServerConfig.setInfoFilePath("/tmp/numaflow-test-server-info");
server = new SinkServer(
InProcessServerBuilder.forName(serverName).directExecutor(),
- new GRPCServerConfig(Sink.SOCKET_PATH, Sink.DEFAULT_MESSAGE_SIZE));
+ grpcServerConfig);
server.registerSinker(new TestSinkFn()).start();
inProcessChannel = grpcCleanup.register(InProcessChannelBuilder
.forName(serverName)