diff --git a/examples/src/main/java/io/numaproj/numaflow/examples/sink/simple/SimpleSink.java b/examples/src/main/java/io/numaproj/numaflow/examples/sink/simple/SimpleSink.java index 76ba8d03..db3ba1a2 100644 --- a/examples/src/main/java/io/numaproj/numaflow/examples/sink/simple/SimpleSink.java +++ b/examples/src/main/java/io/numaproj/numaflow/examples/sink/simple/SimpleSink.java @@ -24,16 +24,11 @@ public static void main(String[] args) throws Exception { // wait for the server to shut down server.awaitTermination(); - - log.info("Server stopped."); } @Override public ResponseList processMessages(DatumIterator datumIterator) { ResponseList.ResponseListBuilder responseListBuilder = ResponseList.newBuilder(); - if (1 == 1){ - throw new RuntimeException("keran's test runtime exception."); - } while (true) { Datum datum = null; try { diff --git a/examples/src/test/java/io/numaproj/numaflow/examples/server/ServerTest.java b/examples/src/test/java/io/numaproj/numaflow/examples/server/ServerTest.java new file mode 100644 index 00000000..56397421 --- /dev/null +++ b/examples/src/test/java/io/numaproj/numaflow/examples/server/ServerTest.java @@ -0,0 +1,273 @@ +package io.numaproj.numaflow.examples.server; + +import io.numaproj.numaflow.examples.map.evenodd.EvenOddFunction; +import io.numaproj.numaflow.examples.map.flatmap.FlatMapFunction; +import io.numaproj.numaflow.examples.reduce.sum.SumFactory; +import io.numaproj.numaflow.examples.sink.simple.SimpleSink; +import io.numaproj.numaflow.examples.sourcetransformer.eventtimefilter.EventTimeFilterFunction; +import io.numaproj.numaflow.mapper.MapperTestKit; +import io.numaproj.numaflow.mapper.Message; +import io.numaproj.numaflow.mapper.MessageList; +import io.numaproj.numaflow.reducer.Datum; +import io.numaproj.numaflow.reducer.ReducerTestKit; +import io.numaproj.numaflow.sinker.Response; +import io.numaproj.numaflow.sinker.ResponseList; +import io.numaproj.numaflow.sinker.SinkerTestKit; +import io.numaproj.numaflow.sourcetransformer.SourceTransformerTestKit; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestMethodOrder; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +@Slf4j +public class ServerTest { + + @Test + @Order(1) + public void testMapServerInvocation() { + MapperTestKit mapperTestKit = new MapperTestKit(new EvenOddFunction()); + try { + mapperTestKit.startServer(); + } catch (Exception e) { + log.error("Failed to start server", e); + Assertions.fail("Failed to start server"); + } + + // Create a client which can send requests to the server + MapperTestKit.Client client = new MapperTestKit.Client(); + MapperTestKit.TestDatum datum = MapperTestKit.TestDatum + .builder() + .value("2".getBytes()) + .build(); + MessageList result = client.sendRequest(new String[]{}, datum); + + List messages = result.getMessages(); + Assertions.assertEquals(1, messages.size()); + Assertions.assertEquals("even", messages.get(0).getKeys()[0]); + + try { + client.close(); + mapperTestKit.stopServer(); + } catch (Exception e) { + log.error("Failed to stop server", e); + Assertions.fail("Failed to stop server"); + } + } + + @Test + @Order(2) + public void testFlatMapServerInvocation() { + MapperTestKit mapperTestKit = new MapperTestKit(new FlatMapFunction()); + try { + mapperTestKit.startServer(); + } catch (Exception e) { + log.error("Failed to start server", e); + } + + MapperTestKit.Client client = new MapperTestKit.Client(); + MapperTestKit.TestDatum datum = MapperTestKit.TestDatum + .builder() + .value("apple,banana,carrot".getBytes()) + .build(); + + MessageList result = client.sendRequest(new String[]{}, datum); + + List messages = result.getMessages(); + Assertions.assertEquals(3, messages.size()); + + Assertions.assertEquals("apple", new String(messages.get(0).getValue())); + Assertions.assertEquals("banana", new String(messages.get(1).getValue())); + Assertions.assertEquals("carrot", new String(messages.get(2).getValue())); + + try { + client.close(); + mapperTestKit.stopServer(); + } catch (Exception e) { + log.error("Failed to stop server", e); + } + } + + @Test + @Order(3) + public void testReduceServerInvocation() { + SumFactory sumFactory = new SumFactory(); + + ReducerTestKit reducerTestKit = new ReducerTestKit(sumFactory); + + // Start the server + try { + reducerTestKit.startServer(); + } catch (Exception e) { + Assertions.fail("Failed to start server"); + } + + // List of datum to be sent to the server + // create 10 datum with values 1 to 10 + List datumList = new ArrayList<>(); + for (int i = 1; i <= 10; i++) { + datumList.add(ReducerTestKit.TestDatum + .builder() + .value(Integer.toString(i).getBytes()) + .build()); + } + + // create a client and send requests to the server + ReducerTestKit.Client client = new ReducerTestKit.Client(); + + ReducerTestKit.TestReduceRequest testReduceRequest = ReducerTestKit.TestReduceRequest + .builder() + .datumList(datumList) + .keys(new String[]{"test-key"}) + .startTime(Instant.ofEpochSecond(60000)) + .endTime(Instant.ofEpochSecond(60010)) + .build(); + + try { + io.numaproj.numaflow.reducer.MessageList messageList = client.sendReduceRequest( + testReduceRequest); + // check if the response is correct + if (messageList.getMessages().size() != 1) { + Assertions.fail("Expected 1 message in the response"); + } + Assertions.assertEquals("55", new String(messageList.getMessages().get(0).getValue())); + + } catch (Exception e) { + e.printStackTrace(); + Assertions.fail("Failed to send request to server - "); + } + + // Stop the server + try { + client.close(); + reducerTestKit.stopServer(); + } catch (InterruptedException e) { + Assertions.fail("Failed to stop server"); + } + } + + @Test + @Order(4) + public void testSinkServerInvocation() { + int datumCount = 10; + SinkerTestKit sinkerTestKit = new SinkerTestKit(new SimpleSink()); + + // Start the server + try { + sinkerTestKit.startServer(); + } catch (Exception e) { + Assertions.fail("Failed to start server"); + } + + // Create a test datum iterator with 10 messages + SinkerTestKit.TestListIterator testListIterator = new SinkerTestKit.TestListIterator(); + for (int i = 0; i < datumCount; i++) { + testListIterator.addDatum(SinkerTestKit.TestDatum + .builder() + .id("id-" + i) + .value(("value-" + i).getBytes()) + .headers(Map.of("test-key", "test-value")) + .build()); + } + + SinkerTestKit.Client client = new SinkerTestKit.Client(); + try { + ResponseList responseList = client.sendRequest(testListIterator); + Assertions.assertEquals(datumCount, responseList.getResponses().size()); + for (Response response : responseList.getResponses()) { + Assertions.assertEquals(true, response.getSuccess()); + } + } catch (Exception e) { + Assertions.fail("Failed to send requests"); + } + + // Stop the server + try { + client.close(); + sinkerTestKit.stopServer(); + } catch (InterruptedException e) { + Assertions.fail("Failed to stop server"); + } + + // we can add the logic to verify if the messages were + // successfully written to the sink(could be a file, database, etc.) + } +// FIXME: once tester kit changes are done for bidirectional streaming source +// @Ignore +// @Test +// @Order(5) +// public void testSourceServerInvocation() { +// SimpleSource simpleSource = new SimpleSource(); +// +// SourcerTestKit sourcerTestKit = new SourcerTestKit(simpleSource); +// try { +// sourcerTestKit.startServer(); +// } catch (Exception e) { +// Assertions.fail("Failed to start server"); +// } +// +// // create a client to send requests to the server +// SourcerTestKit.Client sourcerClient = new SourcerTestKit.Client(); +// // create a test observer to receive messages from the server +// SourcerTestKit.TestListBasedObserver testObserver = new SourcerTestKit.TestListBasedObserver(); +// // create a read request with count 10 and timeout 1 second +// SourcerTestKit.TestReadRequest testReadRequest = SourcerTestKit.TestReadRequest.builder() +// .count(10).timeout(Duration.ofSeconds(1)).build(); +// +// try { +// sourcerClient.sendReadRequest(testReadRequest, testObserver); +// Assertions.assertEquals(10, testObserver.getMessages().size()); +// } catch (Exception e) { +// Assertions.fail("Failed to send request to server"); +// } +// +// try { +// sourcerClient.close(); +// sourcerTestKit.stopServer(); +// } catch (InterruptedException e) { +// Assertions.fail("Failed to stop server"); +// } +// } + + @Test + @Order(6) + public void testSourceTransformerServerInvocation() { + SourceTransformerTestKit sourceTransformerTestKit = new SourceTransformerTestKit(new EventTimeFilterFunction()); + try { + sourceTransformerTestKit.startServer(); + } catch (Exception e) { + Assertions.fail("Failed to start server"); + } + + // Create a client which can send requests to the server + SourceTransformerTestKit.Client client = new SourceTransformerTestKit.Client(); + + SourceTransformerTestKit.TestDatum datum = SourceTransformerTestKit.TestDatum.builder() + .eventTime(Instant.ofEpochMilli(1640995200000L)) + .value("test".getBytes()) + .build(); + io.numaproj.numaflow.sourcetransformer.MessageList result = client.sendRequest( + new String[]{}, + datum); + + List messages = result.getMessages(); + Assertions.assertEquals(1, messages.size()); + + Assertions.assertEquals("test", new String(messages.get(0).getValue())); + Assertions.assertEquals("within_year_2022", messages.get(0).getTags()[0]); + + try { + client.close(); + sourceTransformerTestKit.stopServer(); + } catch (Exception e) { + Assertions.fail("Failed to stop server"); + } + } +} diff --git a/examples/src/test/java/io/numaproj/numaflow/examples/sink/simple/SimpleSinkTest.java b/examples/src/test/java/io/numaproj/numaflow/examples/sink/simple/SimpleSinkTest.java new file mode 100644 index 00000000..3c8d34aa --- /dev/null +++ b/examples/src/test/java/io/numaproj/numaflow/examples/sink/simple/SimpleSinkTest.java @@ -0,0 +1,37 @@ +package io.numaproj.numaflow.examples.sink.simple; + +import io.numaproj.numaflow.sinker.Response; +import io.numaproj.numaflow.sinker.ResponseList; +import io.numaproj.numaflow.sinker.SinkerTestKit; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + + +@Slf4j +public class SimpleSinkTest { + + @Test + public void testSimpleSink() { + int datumCount = 10; + SimpleSink simpleSink = new SimpleSink(); + + // Create a test datum iterator with 10 messages + SinkerTestKit.TestListIterator testListIterator = new SinkerTestKit.TestListIterator(); + for (int i = 0; i < datumCount; i++) { + testListIterator.addDatum(SinkerTestKit.TestDatum + .builder() + .id("id-" + i) + .value(("value-" + i).getBytes()) + .build()); + } + + ResponseList responseList = simpleSink.processMessages(testListIterator); + Assertions.assertEquals(datumCount, responseList.getResponses().size()); + for (Response response : responseList.getResponses()) { + Assertions.assertEquals(true, response.getSuccess()); + } + // we can add the logic to verify if the messages were + // successfully written to the sink(could be a file, database, etc.) + } +} diff --git a/src/main/java/io/numaproj/numaflow/sinker/GrpcServerHelper.java b/src/main/java/io/numaproj/numaflow/shared/GrpcServerHelper.java similarity index 96% rename from src/main/java/io/numaproj/numaflow/sinker/GrpcServerHelper.java rename to src/main/java/io/numaproj/numaflow/shared/GrpcServerHelper.java index 5abea38b..975997cc 100644 --- a/src/main/java/io/numaproj/numaflow/sinker/GrpcServerHelper.java +++ b/src/main/java/io/numaproj/numaflow/shared/GrpcServerHelper.java @@ -1,4 +1,4 @@ -package io.numaproj.numaflow.sinker; +package io.numaproj.numaflow.shared; import io.grpc.Context; import io.grpc.Contexts; @@ -11,8 +11,6 @@ import io.grpc.netty.NettyServerBuilder; import io.netty.channel.EventLoopGroup; import io.netty.channel.unix.DomainSocketAddress; -import io.numaproj.numaflow.shared.GrpcServerUtils; -import io.numaproj.numaflow.shared.ThreadUtils; import static io.numaproj.numaflow.shared.GrpcServerUtils.DATUM_METADATA_WIN_END; import static io.numaproj.numaflow.shared.GrpcServerUtils.DATUM_METADATA_WIN_START; diff --git a/src/main/java/io/numaproj/numaflow/sinker/Server.java b/src/main/java/io/numaproj/numaflow/sinker/Server.java index 9976937a..e3e5a7d6 100644 --- a/src/main/java/io/numaproj/numaflow/sinker/Server.java +++ b/src/main/java/io/numaproj/numaflow/sinker/Server.java @@ -5,6 +5,7 @@ import io.numaproj.numaflow.info.ContainerType; import io.numaproj.numaflow.info.ServerInfoAccessor; import io.numaproj.numaflow.info.ServerInfoAccessorImpl; +import io.numaproj.numaflow.shared.GrpcServerHelper; import io.numaproj.numaflow.shared.GrpcServerUtils; import lombok.extern.slf4j.Slf4j; @@ -22,7 +23,7 @@ public class Server { public final CompletableFuture shutdownSignal; private final ServerInfoAccessor serverInfoAccessor = new ServerInfoAccessorImpl(new ObjectMapper()); private io.grpc.Server server; - private GrpcServerHelper grpcServerHelper; + private final GrpcServerHelper grpcServerHelper; /** * constructor to create sink gRPC server. @@ -78,30 +79,26 @@ public void start() throws Exception { server.start(); log.info( - "Server started, listening on {}", + "server started, listening on {}", grpcConfig.isLocal() ? "localhost:" + grpcConfig.getPort() : grpcConfig.getSocketPath()); - // register shutdown hook + // register shutdown hook to gracefully shut down the server Runtime.getRuntime().addShutdownHook(new Thread(() -> { // Use stderr here since the logger may have been reset by its JVM shutdown hook. System.err.println("*** shutting down sink gRPC server since JVM is shutting down"); if (server.isTerminated()) { - log.info("Server already terminated"); return; } try { - log.info("stopping server"); Server.this.stop(); - log.info("gracefully shutdown event loop groups"); + log.info("gracefully shutting down event loop groups"); this.grpcServerHelper.gracefullyShutdownEventLoopGroups(); - log.info("event loop groups are gracefully shutdown"); } catch (InterruptedException e) { Thread.interrupted(); e.printStackTrace(System.err); } })); - log.info("Sink server shutdown hook registered"); // if there are any exceptions, shutdown the server gracefully. shutdownSignal.whenCompleteAsync((v, e) -> { @@ -116,13 +113,11 @@ public void start() throws Exception { Server.this.stop(); log.info("gracefully shutdown event loop groups"); this.grpcServerHelper.gracefullyShutdownEventLoopGroups(); - log.info("event loop groups are gracefully shutdown"); } catch (InterruptedException ex) { Thread.interrupted(); ex.printStackTrace(System.err); } } - // System.exit(0); }); } @@ -134,9 +129,9 @@ public void start() throws Exception { * @throws InterruptedException if the current thread is interrupted while waiting */ public void awaitTermination() throws InterruptedException { - log.info("Server is waiting for termination"); + log.info("sink server is waiting for termination"); server.awaitTermination(); - log.info("Server is terminated"); + log.info("sink server is terminated"); } /** @@ -146,21 +141,17 @@ public void awaitTermination() throws InterruptedException { * @throws InterruptedException if shutdown is interrupted */ public void stop() throws InterruptedException { - log.info("Server.stop started. Shutting down sink service"); + log.info("server.stop started. Shutting down sink service"); this.service.shutDown(); if (server != null) { server.shutdown().awaitTermination(30, TimeUnit.SECONDS); // force shutdown if not terminated if (!server.isTerminated()) { - log.info("Server did not terminate in {} seconds. Shutting down forcefully", 30); - server.shutdownNow(); - } - if (!server.isTerminated()) { - log.info("Server did not terminate in {} seconds. Shutting down forcefully", 30); + log.info("server did not terminate in {} seconds. Shutting down forcefully", 30); server.shutdownNow(); } } - log.info("Server.stop successfully completed"); + log.info("server.stop successfully completed"); } /** diff --git a/src/main/java/io/numaproj/numaflow/sinker/Service.java b/src/main/java/io/numaproj/numaflow/sinker/Service.java index a93a8d7a..1a6e7dbb 100644 --- a/src/main/java/io/numaproj/numaflow/sinker/Service.java +++ b/src/main/java/io/numaproj/numaflow/sinker/Service.java @@ -98,17 +98,17 @@ public void onNext(SinkOuterClass.SinkRequest request) { datumStream.writeMessage(constructHandlerDatum(request)); } } catch (Exception e) { - log.error("Encountered error in sinkFn - {}", e.getMessage()); + log.error("Encountered error in sinkFn onNext - {}", e.getMessage()); shutdownSignal.completeExceptionally(e); responseObserver.onError(Status.INTERNAL.withDescription(e.getMessage()).asException()); - // responseObserver.onError(e); } } @Override public void onError(Throwable throwable) { log.error("Encountered error in sinkFn - {}", throwable.getMessage()); - responseObserver.onError(throwable); + shutdownSignal.completeExceptionally(throwable); + responseObserver.onError(Status.INTERNAL.withDescription(throwable.getMessage()).asException()); } @Override diff --git a/src/test/java/io/numaproj/numaflow/sinker/ServerErrTest.java b/src/test/java/io/numaproj/numaflow/sinker/ServerErrTest.java new file mode 100644 index 00000000..da88c8fb --- /dev/null +++ b/src/test/java/io/numaproj/numaflow/sinker/ServerErrTest.java @@ -0,0 +1,160 @@ +package io.numaproj.numaflow.sinker; + +import com.google.protobuf.ByteString; +import io.grpc.ManagedChannel; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.stub.StreamObserver; +import io.grpc.testing.GrpcCleanupRule; +import io.numaproj.numaflow.sink.v1.SinkGrpc; +import io.numaproj.numaflow.sink.v1.SinkOuterClass; +import lombok.extern.slf4j.Slf4j; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; + +@Slf4j +@RunWith(JUnit4.class) +public class ServerErrTest { + @Rule + public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule(); + private Server server; + private ManagedChannel inProcessChannel; + + @Before + public void setUp() throws Exception { + String serverName = InProcessServerBuilder.generateName(); + + GRPCConfig grpcServerConfig = GRPCConfig.newBuilder() + .maxMessageSize(Constants.DEFAULT_MESSAGE_SIZE) + .socketPath(Constants.DEFAULT_SOCKET_PATH) + .infoFilePath("/tmp/numaflow-test-server-info)") + .build(); + + server = new Server( + new TestSinkFnErr(), + grpcServerConfig); + + server.setServerBuilder(InProcessServerBuilder.forName(serverName) + .directExecutor()); + + server.start(); + + inProcessChannel = grpcCleanup.register(InProcessChannelBuilder + .forName(serverName) + .directExecutor() + .build()); + } + + @After + public void tearDown() throws Exception { + server.stop(); + } + + @Test + public void sinkerException() { + //create an output stream observer + SinkOutputStreamObserver outputStreamObserver = new SinkOutputStreamObserver(); + + Thread t = new Thread(() -> { + while (outputStreamObserver.t == null) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + assertEquals( + "UNKNOWN: java.lang.RuntimeException: unknown exception", + outputStreamObserver.t.getMessage()); + }); + t.start(); + + StreamObserver inputStreamObserver = SinkGrpc + .newStub(inProcessChannel) + .sinkFn(outputStreamObserver); + String actualId = "sink_test_id"; + + // send handshake request + inputStreamObserver.onNext(SinkOuterClass.SinkRequest.newBuilder() + .setHandshake(SinkOuterClass.Handshake.newBuilder().setSot(true).build()) + .build()); + + for (int i = 1; i <= 100; i++) { + String[] keys; + if (i < 100) { + keys = new String[]{"valid-key"}; + } else { + keys = new String[]{"invalid-key"}; + } + SinkOuterClass.SinkRequest.Request request = SinkOuterClass.SinkRequest.Request + .newBuilder() + .setValue(ByteString.copyFromUtf8(String.valueOf(i))) + .setId(actualId) + .addAllKeys(List.of(keys)) + .build(); + inputStreamObserver.onNext(SinkOuterClass.SinkRequest + .newBuilder() + .setRequest(request) + .build()); + } + + // send eot message + inputStreamObserver.onNext(SinkOuterClass.SinkRequest.newBuilder() + .setStatus(SinkOuterClass.TransmissionStatus.newBuilder().setEot(true)).build()); + + inputStreamObserver.onCompleted(); + + try { + t.join(); + } catch (InterruptedException e) { + fail("Thread interrupted"); + } + } + + @Test + public void sinkerNoHandshake() { + // Create an output stream observer + SinkOutputStreamObserver outputStreamObserver = new SinkOutputStreamObserver(); + + StreamObserver inputStreamObserver = SinkGrpc + .newStub(inProcessChannel) + .sinkFn(outputStreamObserver); + + // Send a request without sending a handshake request + SinkOuterClass.SinkRequest request = SinkOuterClass.SinkRequest.newBuilder() + .setRequest(SinkOuterClass.SinkRequest.Request.newBuilder() + .setValue(ByteString.copyFromUtf8("test")) + .setId("test_id") + .addKeys("test_key") + .build()) + .build(); + inputStreamObserver.onNext(request); + + // Wait for the server to process the request + while (!outputStreamObserver.completed.get()) ; + + // Check if an error was received + assertNotNull(outputStreamObserver.t); + assertEquals( + "INVALID_ARGUMENT: Handshake request not received", + outputStreamObserver.t.getMessage()); + } + + @Slf4j + private static class TestSinkFnErr extends Sinker { + @Override + public ResponseList processMessages(DatumIterator datumIterator) { + throw new RuntimeException("unknown exception"); + } + } +}