diff --git a/pom.xml b/pom.xml index 6e7115b4..3fe50d0b 100644 --- a/pom.xml +++ b/pom.xml @@ -112,6 +112,13 @@ test + + + com.fasterxml.jackson.core + jackson-databind + 2.14.2 + + diff --git a/src/main/java/io/numaproj/numaflow/common/GRPCServerConfig.java b/src/main/java/io/numaproj/numaflow/common/GRPCServerConfig.java index cb8f3340..ad391f8f 100644 --- a/src/main/java/io/numaproj/numaflow/common/GRPCServerConfig.java +++ b/src/main/java/io/numaproj/numaflow/common/GRPCServerConfig.java @@ -1,6 +1,7 @@ package io.numaproj.numaflow.common; -import io.numaproj.numaflow.function.Function; +import io.numaproj.numaflow.function.FunctionConstants; +import io.numaproj.numaflow.info.ServerInfoConstants; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.Setter; @@ -11,12 +12,11 @@ public class GRPCServerConfig { private String socketPath; private int maxMessageSize; + private String infoFilePath; - public GRPCServerConfig(int maxMessageSize) { - this(Function.SOCKET_PATH, maxMessageSize); - } - - public GRPCServerConfig(String socketPath) { - this(socketPath, Function.DEFAULT_MESSAGE_SIZE); + public GRPCServerConfig() { + this.socketPath = FunctionConstants.DEFAULT_SOCKET_PATH; + this.maxMessageSize = FunctionConstants.DEFAULT_MESSAGE_SIZE; + this.infoFilePath = ServerInfoConstants.DEFAULT_SERVER_INFO_FILE_PATH; } } diff --git a/src/main/java/io/numaproj/numaflow/function/Function.java b/src/main/java/io/numaproj/numaflow/function/FunctionConstants.java similarity index 75% rename from src/main/java/io/numaproj/numaflow/function/Function.java rename to src/main/java/io/numaproj/numaflow/function/FunctionConstants.java index 03a85035..e4493557 100644 --- a/src/main/java/io/numaproj/numaflow/function/Function.java +++ b/src/main/java/io/numaproj/numaflow/function/FunctionConstants.java @@ -3,8 +3,8 @@ import io.grpc.Context; import io.grpc.Metadata; -public class Function { - public static final String SOCKET_PATH = "/var/run/numaflow/function.sock"; +public class FunctionConstants { + public static final String DEFAULT_SOCKET_PATH = "/var/run/numaflow/function.sock"; public static final int DEFAULT_MESSAGE_SIZE = 1024 * 1024 * 4; @@ -19,18 +19,17 @@ public class Function { public static final String DELIMITTER = ":"; public static final Metadata.Key DATUM_METADATA_WIN_START = Metadata.Key.of( - Function.WIN_START_KEY, + FunctionConstants.WIN_START_KEY, Metadata.ASCII_STRING_MARSHALLER); public static final Metadata.Key DATUM_METADATA_WIN_END = Metadata.Key.of( - Function.WIN_END_KEY, + FunctionConstants.WIN_END_KEY, Metadata.ASCII_STRING_MARSHALLER); - public static final Context.Key WINDOW_START_TIME = Context.keyWithDefault( - Function.WIN_START_KEY, + FunctionConstants.WIN_START_KEY, ""); public static final Context.Key WINDOW_END_TIME = Context.keyWithDefault( - Function.WIN_END_KEY, + FunctionConstants.WIN_END_KEY, ""); } diff --git a/src/main/java/io/numaproj/numaflow/function/FunctionServer.java b/src/main/java/io/numaproj/numaflow/function/FunctionServer.java index 8e6e8aea..d6a3dff4 100644 --- a/src/main/java/io/numaproj/numaflow/function/FunctionServer.java +++ b/src/main/java/io/numaproj/numaflow/function/FunctionServer.java @@ -1,5 +1,6 @@ package io.numaproj.numaflow.function; +import com.fasterxml.jackson.databind.ObjectMapper; import io.grpc.Context; import io.grpc.Contexts; import io.grpc.Metadata; @@ -17,12 +18,17 @@ import io.numaproj.numaflow.function.mapt.MapTHandler; import io.numaproj.numaflow.function.reduce.ReduceHandler; import io.numaproj.numaflow.function.reduce.ReducerFactory; +import io.numaproj.numaflow.info.Language; +import io.numaproj.numaflow.info.Protocol; +import io.numaproj.numaflow.info.ServerInfo; +import io.numaproj.numaflow.info.ServerInfoAccessor; +import io.numaproj.numaflow.info.ServerInfoAccessorImpl; import lombok.extern.slf4j.Slf4j; -import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.HashMap; import java.util.concurrent.TimeUnit; @Slf4j @@ -31,10 +37,11 @@ public class FunctionServer { private final GRPCServerConfig grpcServerConfig; private final ServerBuilder serverBuilder; private final FunctionService functionService = new FunctionService(); + private final ServerInfoAccessor serverInfoAccessor = new ServerInfoAccessorImpl(new ObjectMapper()); private Server server; public FunctionServer() { - this(new GRPCServerConfig(Function.SOCKET_PATH, Function.DEFAULT_MESSAGE_SIZE)); + this(new GRPCServerConfig()); } /** @@ -78,17 +85,26 @@ public FunctionServer registerReducerFactory(ReducerFactory()); + serverInfoAccessor.write(serverInfo, infoFilePath); + // build server ServerInterceptor interceptor = new ServerInterceptor() { @Override @@ -99,10 +115,10 @@ public ServerCall.Listener interceptCall( final var context = Context.current().withValues( - Function.WINDOW_START_TIME, - headers.get(Function.DATUM_METADATA_WIN_START), - Function.WINDOW_END_TIME, - headers.get(Function.DATUM_METADATA_WIN_END)); + FunctionConstants.WINDOW_START_TIME, + headers.get(FunctionConstants.DATUM_METADATA_WIN_START), + FunctionConstants.WINDOW_END_TIME, + headers.get(FunctionConstants.DATUM_METADATA_WIN_END)); return Contexts.interceptCall(context, call, headers, next); } }; diff --git a/src/main/java/io/numaproj/numaflow/function/FunctionService.java b/src/main/java/io/numaproj/numaflow/function/FunctionService.java index 27ee88a1..f1b779f4 100644 --- a/src/main/java/io/numaproj/numaflow/function/FunctionService.java +++ b/src/main/java/io/numaproj/numaflow/function/FunctionService.java @@ -28,7 +28,7 @@ import java.util.List; import java.util.concurrent.CompletableFuture; -import static io.numaproj.numaflow.function.Function.EOF; +import static io.numaproj.numaflow.function.FunctionConstants.EOF; import static io.numaproj.numaflow.function.v1.UserDefinedFunctionGrpc.getMapFnMethod; import static io.numaproj.numaflow.function.v1.UserDefinedFunctionGrpc.getReduceFnMethod; @@ -145,8 +145,8 @@ public StreamObserver reduceFn(final StreamObserver() : List.of(message.getKeys())) + == null ? new ArrayList<>():List.of(message.getKeys())) .addAllTags(message.getTags() - == null ? new ArrayList<>() : List.of(message.getTags())) + == null ? new ArrayList<>():List.of(message.getTags())) .build()); }); return datumListBuilder.build(); diff --git a/src/main/java/io/numaproj/numaflow/function/reduce/ReduceSupervisorActor.java b/src/main/java/io/numaproj/numaflow/function/reduce/ReduceSupervisorActor.java index dcc3a90d..e33bee5c 100644 --- a/src/main/java/io/numaproj/numaflow/function/reduce/ReduceSupervisorActor.java +++ b/src/main/java/io/numaproj/numaflow/function/reduce/ReduceSupervisorActor.java @@ -9,7 +9,7 @@ import akka.japi.pf.ReceiveBuilder; import com.google.common.base.Preconditions; import io.grpc.stub.StreamObserver; -import io.numaproj.numaflow.function.Function; +import io.numaproj.numaflow.function.FunctionConstants; import io.numaproj.numaflow.function.FunctionService; import io.numaproj.numaflow.function.HandlerDatum; import io.numaproj.numaflow.function.HandlerDatumMetadata; @@ -77,7 +77,7 @@ public SupervisorStrategy supervisorStrategy() { @Override public void postStop() { log.debug("post stop of supervisor executed - {}", getSelf().toString()); - shutdownActor.tell(Function.SUCCESS, ActorRef.noSender()); + shutdownActor.tell(FunctionConstants.SUCCESS, ActorRef.noSender()); } @Override @@ -97,7 +97,7 @@ public Receive createReceive() { */ private void invokeActors(Udfunction.DatumRequest datumRequest) { String[] keys = datumRequest.getKeysList().toArray(new String[0]); - String keyStr = String.join(Function.DELIMITTER, keys); + String keyStr = String.join(FunctionConstants.DELIMITTER, keys); if (!actorsMap.containsKey(keyStr)) { ReduceHandler reduceHandler = reducerFactory.createReducer(); ActorRef actorRef = getContext() @@ -125,7 +125,7 @@ private void responseListener(ActorResponse actorResponse) { */ responseObserver.onNext(actorResponse.getDatumList()); - actorsMap.remove(String.join(Function.DELIMITTER, actorResponse.getKeys())); + actorsMap.remove(String.join(FunctionConstants.DELIMITTER, actorResponse.getKeys())); if (actorsMap.isEmpty()) { responseObserver.onCompleted(); getContext().getSystem().stop(getSelf()); diff --git a/src/main/java/io/numaproj/numaflow/info/Language.java b/src/main/java/io/numaproj/numaflow/info/Language.java new file mode 100644 index 00000000..0e6f4e7b --- /dev/null +++ b/src/main/java/io/numaproj/numaflow/info/Language.java @@ -0,0 +1,24 @@ +package io.numaproj.numaflow.info; + +import com.fasterxml.jackson.annotation.JsonValue; + +/** + * Please exercise cautions when updating the values below because the exact same values are defined in other Numaflow SDKs + * to form a contract between server and clients. + */ +public enum Language { + GO("go"), + PYTHON("python"), + JAVA("java"); + + private final String name; + + Language(String name) { + this.name = name; + } + + @JsonValue + public String getName() { + return name; + } +} diff --git a/src/main/java/io/numaproj/numaflow/info/Protocol.java b/src/main/java/io/numaproj/numaflow/info/Protocol.java new file mode 100644 index 00000000..2ef872e7 --- /dev/null +++ b/src/main/java/io/numaproj/numaflow/info/Protocol.java @@ -0,0 +1,23 @@ +package io.numaproj.numaflow.info; + +import com.fasterxml.jackson.annotation.JsonValue; + +/** + * Please exercise cautions when updating the values below because the exact same values are defined in other Numaflow SDKs + * to form a contract between server and clients. + */ +public enum Protocol { + UDS_PROTOCOL("uds"), + TCP_PROTOCOL("tcp"); + + private final String name; + + Protocol(String name) { + this.name = name; + } + + @JsonValue + public String getName() { + return name; + } +} diff --git a/src/main/java/io/numaproj/numaflow/info/ServerInfo.java b/src/main/java/io/numaproj/numaflow/info/ServerInfo.java new file mode 100644 index 00000000..7788941b --- /dev/null +++ b/src/main/java/io/numaproj/numaflow/info/ServerInfo.java @@ -0,0 +1,31 @@ +package io.numaproj.numaflow.info; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +import java.util.Map; + +/** + * Server Information to be used by client to determine: + * - protocol: what is right protocol to use (UDS or TCP) + * - language: what is language used by the server + * - version: what is the numaflow sdk version used by the server + * - metadata: other information + */ +@Getter +@Setter +@NoArgsConstructor +@AllArgsConstructor +public class ServerInfo { + @JsonProperty("protocol") + private Protocol protocol; + @JsonProperty("language") + private Language language; + @JsonProperty("version") + private String version; + @JsonProperty("metadata") + private Map metadata; +} diff --git a/src/main/java/io/numaproj/numaflow/info/ServerInfoAccessor.java b/src/main/java/io/numaproj/numaflow/info/ServerInfoAccessor.java new file mode 100644 index 00000000..e218de79 --- /dev/null +++ b/src/main/java/io/numaproj/numaflow/info/ServerInfoAccessor.java @@ -0,0 +1,32 @@ +package io.numaproj.numaflow.info; + +public interface ServerInfoAccessor { + /** + * Get runtime Java SDK version. + */ + String getSDKVersion(); + + /** + * Delete filePath if it exists. + * Write serverInfo to filePath in Json format. + * Append {@link ServerInfoConstants#EOF} as a new line to indicate end of file. + * + * @param serverInfo server information POJO + * @param filePath file path to write to + * + * @throws Exception any exceptions are thrown to the caller. + */ + void write(ServerInfo serverInfo, String filePath) throws Exception; + + /** + * Read from filePath to retrieve server information POJO. + * This API is only used for unit tests. + * + * @param filePath file path to read from + * + * @return server information POJO + * + * @throws Exception any exceptions are thrown to the caller. + */ + ServerInfo read(String filePath) throws Exception; +} diff --git a/src/main/java/io/numaproj/numaflow/info/ServerInfoAccessorImpl.java b/src/main/java/io/numaproj/numaflow/info/ServerInfoAccessorImpl.java new file mode 100644 index 00000000..a37526ff --- /dev/null +++ b/src/main/java/io/numaproj/numaflow/info/ServerInfoAccessorImpl.java @@ -0,0 +1,57 @@ +package io.numaproj.numaflow.info; + +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.AllArgsConstructor; + +import java.io.File; +import java.io.FileWriter; +import java.nio.file.Files; +import java.nio.file.Path; + +@AllArgsConstructor +public class ServerInfoAccessorImpl implements ServerInfoAccessor { + private ObjectMapper objectMapper; + + @Override + public String getSDKVersion() { + // This only works for Java 9 and above. + // Since we already use 11+ for numaflow SDK, it's safe to apply this approach. + return String.valueOf(Runtime.version().version().get(0)); + } + + @Override + public void write(ServerInfo serverInfo, String filePath) throws Exception { + File file = new File(filePath); + if (file.exists()) { + file.delete(); + } + FileWriter fileWriter = new FileWriter(filePath, false); + FileWriter eofWriter = new FileWriter(filePath, true); + try { + objectMapper.writeValue(fileWriter, serverInfo); + eofWriter.append(ServerInfoConstants.EOF); + } finally { + eofWriter.close(); + fileWriter.close(); + } + } + + @Override + public ServerInfo read(String filePath) throws Exception { + String content = Files.readString(Path.of(filePath)); + String trimmedContent = verifyEOFAtEndAndTrim(content); + ServerInfo serverInfo = objectMapper.readValue(trimmedContent, ServerInfo.class); + return serverInfo; + } + + private String verifyEOFAtEndAndTrim(String content) throws Exception { + int eofIndex = content.lastIndexOf(ServerInfoConstants.EOF); + if (eofIndex == -1) { + throw new Exception("EOF marker not found in the file content"); + } + if (eofIndex != content.length() - ServerInfoConstants.EOF.length()) { + throw new Exception("EOF marker is not at the end of the file content"); + } + return content.substring(0, eofIndex); + } +} diff --git a/src/main/java/io/numaproj/numaflow/info/ServerInfoConstants.java b/src/main/java/io/numaproj/numaflow/info/ServerInfoConstants.java new file mode 100644 index 00000000..5fdfed7d --- /dev/null +++ b/src/main/java/io/numaproj/numaflow/info/ServerInfoConstants.java @@ -0,0 +1,11 @@ +package io.numaproj.numaflow.info; + +/** + * Please exercise cautions when updating the values below because the exact same values are defined in other Numaflow SDKs + * to form a contract between server and clients. + */ +public class ServerInfoConstants { + public static final String DEFAULT_SERVER_INFO_FILE_PATH = "/var/run/numaflow/server-info"; + public static final String EOF = "U+005C__END__"; +} + diff --git a/src/main/java/io/numaproj/numaflow/sink/Sink.java b/src/main/java/io/numaproj/numaflow/sink/Sink.java deleted file mode 100644 index bb8d4e79..00000000 --- a/src/main/java/io/numaproj/numaflow/sink/Sink.java +++ /dev/null @@ -1,6 +0,0 @@ -package io.numaproj.numaflow.sink; - -public class Sink { - public static final String SOCKET_PATH = "/var/run/numaflow/udsink.sock"; - public static final int DEFAULT_MESSAGE_SIZE = 1024 * 1024 * 4; -} diff --git a/src/main/java/io/numaproj/numaflow/sink/SinkConstants.java b/src/main/java/io/numaproj/numaflow/sink/SinkConstants.java new file mode 100644 index 00000000..6116224e --- /dev/null +++ b/src/main/java/io/numaproj/numaflow/sink/SinkConstants.java @@ -0,0 +1,6 @@ +package io.numaproj.numaflow.sink; + +public class SinkConstants { + public static final String DEFAULT_SOCKET_PATH = "/var/run/numaflow/udsink.sock"; + public static final int DEFAULT_MESSAGE_SIZE = 1024 * 1024 * 4; +} diff --git a/src/main/java/io/numaproj/numaflow/sink/SinkServer.java b/src/main/java/io/numaproj/numaflow/sink/SinkServer.java index 4c039149..ebd6e3bd 100644 --- a/src/main/java/io/numaproj/numaflow/sink/SinkServer.java +++ b/src/main/java/io/numaproj/numaflow/sink/SinkServer.java @@ -1,5 +1,6 @@ package io.numaproj.numaflow.sink; +import com.fasterxml.jackson.databind.ObjectMapper; import io.grpc.Server; import io.grpc.ServerBuilder; import io.grpc.netty.NettyServerBuilder; @@ -7,12 +8,17 @@ import io.netty.channel.epoll.EpollServerDomainSocketChannel; import io.netty.channel.unix.DomainSocketAddress; import io.numaproj.numaflow.common.GRPCServerConfig; +import io.numaproj.numaflow.info.Language; +import io.numaproj.numaflow.info.Protocol; +import io.numaproj.numaflow.info.ServerInfo; +import io.numaproj.numaflow.info.ServerInfoAccessor; +import io.numaproj.numaflow.info.ServerInfoAccessorImpl; import lombok.extern.slf4j.Slf4j; -import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.HashMap; import java.util.concurrent.TimeUnit; @Slf4j @@ -21,10 +27,11 @@ public class SinkServer { private final GRPCServerConfig grpcServerConfig; private final ServerBuilder serverBuilder; private final SinkService sinkService = new SinkService(); + private final ServerInfoAccessor serverInfoAccessor = new ServerInfoAccessorImpl(new ObjectMapper()); private Server server; public SinkServer() { - this(new GRPCServerConfig(Sink.SOCKET_PATH, Sink.DEFAULT_MESSAGE_SIZE)); + this(new GRPCServerConfig()); } /** @@ -58,17 +65,26 @@ public SinkServer registerSinker(SinkHandler sinkHandler) { /** * Start serving requests. */ - public void start() throws IOException { + public void start() throws Exception { + String socketPath = grpcServerConfig.getSocketPath(); + String infoFilePath = grpcServerConfig.getInfoFilePath(); // cleanup socket path if it exists (unit test builder doesn't use one) - if (grpcServerConfig.getSocketPath() != null) { - Path path = Paths.get(grpcServerConfig.getSocketPath()); + if (socketPath != null) { + Path path = Paths.get(socketPath); Files.deleteIfExists(path); if (Files.exists(path)) { - log.error("Failed to clean up socket path \"" + grpcServerConfig.getSocketPath() - + "\". Exiting"); + log.error("Failed to clean up socket path {}. Exiting", socketPath); } } + // write server info to file + ServerInfo serverInfo = new ServerInfo( + Protocol.UDS_PROTOCOL, + Language.JAVA, + serverInfoAccessor.getSDKVersion(), + new HashMap<>()); + serverInfoAccessor.write(serverInfo, infoFilePath); + // build server server = serverBuilder .addService(this.sinkService) diff --git a/src/test/java/io/numaproj/numaflow/function/FunctionServerTest.java b/src/test/java/io/numaproj/numaflow/function/FunctionServerTest.java index 20444a1d..39a396f7 100644 --- a/src/test/java/io/numaproj/numaflow/function/FunctionServerTest.java +++ b/src/test/java/io/numaproj/numaflow/function/FunctionServerTest.java @@ -25,8 +25,8 @@ import java.util.Arrays; import java.util.List; -import static io.numaproj.numaflow.function.Function.WIN_END_KEY; -import static io.numaproj.numaflow.function.Function.WIN_START_KEY; +import static io.numaproj.numaflow.function.FunctionConstants.WIN_END_KEY; +import static io.numaproj.numaflow.function.FunctionConstants.WIN_START_KEY; import static org.junit.Assert.assertEquals; @RunWith(JUnit4.class) @@ -45,12 +45,13 @@ public class FunctionServerTest { public void setUp() throws Exception { String serverName = InProcessServerBuilder.generateName(); + GRPCServerConfig grpcServerConfig = new GRPCServerConfig(); + grpcServerConfig.setInfoFilePath("/tmp/numaflow-test-server-info"); server = new FunctionServer( InProcessServerBuilder.forName(serverName).directExecutor(), - new GRPCServerConfig(Function.SOCKET_PATH, Function.DEFAULT_MESSAGE_SIZE)); + grpcServerConfig); - server - .registerMapHandler(new TestMapFn()) + server.registerMapHandler(new TestMapFn()) .registerMapTHandler(new TestMapTFn()) .registerReducerFactory(new ReduceTestFactory()) .start(); diff --git a/src/test/java/io/numaproj/numaflow/function/reduce/ReduceSupervisorActorTest.java b/src/test/java/io/numaproj/numaflow/function/reduce/ReduceSupervisorActorTest.java index 247b584a..d4f0126a 100644 --- a/src/test/java/io/numaproj/numaflow/function/reduce/ReduceSupervisorActorTest.java +++ b/src/test/java/io/numaproj/numaflow/function/reduce/ReduceSupervisorActorTest.java @@ -17,7 +17,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import static io.numaproj.numaflow.function.Function.EOF; +import static io.numaproj.numaflow.function.FunctionConstants.EOF; import static org.junit.Assert.fail; public class ReduceSupervisorActorTest { diff --git a/src/test/java/io/numaproj/numaflow/info/ServerInfoAccessorImplTest.java b/src/test/java/io/numaproj/numaflow/info/ServerInfoAccessorImplTest.java new file mode 100644 index 00000000..47575b52 --- /dev/null +++ b/src/test/java/io/numaproj/numaflow/info/ServerInfoAccessorImplTest.java @@ -0,0 +1,52 @@ +package io.numaproj.numaflow.info; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.util.HashMap; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.fail; + +@RunWith(JUnit4.class) +public class ServerInfoAccessorImplTest { + private ServerInfoAccessor underTest = new ServerInfoAccessorImpl(new ObjectMapper()); + + @Test + public void given_localEnvironment_when_getJavaSDKVersion_then_returnAValidVersion() { + String got = this.underTest.getSDKVersion(); + assertNotEquals("", got); + try { + Integer.parseInt(got); + } catch (NumberFormatException e) { + fail("Expected java SDK version to be a valid integer."); + } + } + + @Test + public void given_writeServerInfo_when_read_then_returnExactSame() { + ServerInfo testServerInfo = new ServerInfo( + Protocol.TCP_PROTOCOL, + Language.JAVA, + "11", + new HashMap<>() {{ + put("key1", "value1"); + put("key2", "value2"); + }} + ); + String testFilePath = "/var/tmp/test-path"; + try { + this.underTest.write(testServerInfo, testFilePath); + ServerInfo got = this.underTest.read(testFilePath); + assertEquals(testServerInfo.getLanguage(), got.getLanguage()); + assertEquals(testServerInfo.getProtocol(), got.getProtocol()); + assertEquals(testServerInfo.getVersion(), got.getVersion()); + assertEquals(testServerInfo.getMetadata(), got.getMetadata()); + } catch (Exception e) { + fail("Expected no exception."); + } + } +} diff --git a/src/test/java/io/numaproj/numaflow/sink/SinkServerTest.java b/src/test/java/io/numaproj/numaflow/sink/SinkServerTest.java index fab5f82d..ed7f3ab4 100644 --- a/src/test/java/io/numaproj/numaflow/sink/SinkServerTest.java +++ b/src/test/java/io/numaproj/numaflow/sink/SinkServerTest.java @@ -35,9 +35,11 @@ public class SinkServerTest { @Before public void setUp() throws Exception { String serverName = InProcessServerBuilder.generateName(); + GRPCServerConfig grpcServerConfig = new GRPCServerConfig(); + grpcServerConfig.setInfoFilePath("/tmp/numaflow-test-server-info"); server = new SinkServer( InProcessServerBuilder.forName(serverName).directExecutor(), - new GRPCServerConfig(Sink.SOCKET_PATH, Sink.DEFAULT_MESSAGE_SIZE)); + grpcServerConfig); server.registerSinker(new TestSinkFn()).start(); inProcessChannel = grpcCleanup.register(InProcessChannelBuilder .forName(serverName)