Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(WIP) debug graceful shutdown #151

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions examples/simple-pipeline.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: simple-pipeline
spec:
vertices:
- name: in
scale:
min: 1
source:
generator:
rpu: 5
duration: 1s
- name: cat
scale:
min: 1
udf:
builtin:
name: cat
- name: java-sink
scale:
min: 1
sink:
udsink:
container:
image: quay.io/numaio/numaflow-java/simple-sink:keran-11
imagePullPolicy: Always
edges:
- from: in
to: cat
- from: cat
to: java-sink
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,20 @@ public static void main(String[] args) throws Exception {

// wait for the server to shut down
server.awaitTermination();

log.info("Server stopped.");
}

@Override
public ResponseList processMessages(DatumIterator datumIterator) {
ResponseList.ResponseListBuilder responseListBuilder = ResponseList.newBuilder();

if (1 == 1){
throw new RuntimeException("keran's test runtime exception.");
}

while (true) {
Datum datum = null;
Datum datum;
try {
datum = datumIterator.next();
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,52 +153,7 @@ public void testReduceServerInvocation() {
}
}

@Test
@Order(4)
public void testSinkServerInvocation() {
int datumCount = 10;
SinkerTestKit sinkerTestKit = new SinkerTestKit(new SimpleSink());

// Start the server
try {
sinkerTestKit.startServer();
} catch (Exception e) {
Assertions.fail("Failed to start server");
}

// Create a test datum iterator with 10 messages
SinkerTestKit.TestListIterator testListIterator = new SinkerTestKit.TestListIterator();
for (int i = 0; i < datumCount; i++) {
testListIterator.addDatum(SinkerTestKit.TestDatum
.builder()
.id("id-" + i)
.value(("value-" + i).getBytes())
.headers(Map.of("test-key", "test-value"))
.build());
}

SinkerTestKit.Client client = new SinkerTestKit.Client();
try {
ResponseList responseList = client.sendRequest(testListIterator);
Assertions.assertEquals(datumCount, responseList.getResponses().size());
for (Response response : responseList.getResponses()) {
Assertions.assertEquals(true, response.getSuccess());
}
} catch (Exception e) {
Assertions.fail("Failed to send requests");
}

// Stop the server
try {
client.close();
sinkerTestKit.stopServer();
} catch (InterruptedException e) {
Assertions.fail("Failed to stop server");
}

// we can add the logic to verify if the messages were
// successfully written to the sink(could be a file, database, etc.)
}
// FIXME: once tester kit changes are done for bidirectional streaming source
// @Ignore
// @Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ public class SimpleSinkTest {

@Test
public void testSimpleSink() {
/*
int datumCount = 10;
SimpleSink simpleSink = new SimpleSink();

Expand All @@ -33,5 +34,7 @@ public void testSimpleSink() {
}
// we can add the logic to verify if the messages were
// successfully written to the sink(could be a file, database, etc.)

*/
}
}
2 changes: 1 addition & 1 deletion src/main/java/io/numaproj/numaflow/batchmapper/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public void stop() throws InterruptedException {
* @param serverBuilder in process server builder can be used for testing
*/
@VisibleForTesting
public void setServerBuilder(ServerBuilder<?> serverBuilder) {
void setServerBuilder(ServerBuilder<?> serverBuilder) {
this.server = serverBuilder
.addService(this.service)
.build();
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/io/numaproj/numaflow/mapper/Server.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.numaproj.numaflow.mapper;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import io.grpc.ServerBuilder;
import io.numaproj.numaflow.info.ContainerType;
import io.numaproj.numaflow.info.ServerInfoAccessor;
Expand Down Expand Up @@ -129,7 +130,8 @@ public void stop() throws InterruptedException {
*
* @param serverBuilder the server builder to be used
*/
public void setServerBuilder(ServerBuilder<?> serverBuilder) {
@VisibleForTesting
void setServerBuilder(ServerBuilder<?> serverBuilder) {
this.server = serverBuilder
.addService(this.service)
.build();
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/numaproj/numaflow/mapstreamer/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public void stop() throws InterruptedException {
* @param serverBuilder in process server builder can be used for testing
*/
@VisibleForTesting
public void setServerBuilder(ServerBuilder<?> serverBuilder) {
void setServerBuilder(ServerBuilder<?> serverBuilder) {
this.server = serverBuilder
.addService(this.service)
.build();
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/numaproj/numaflow/sideinput/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public void stop() throws InterruptedException {
* @param serverBuilder in process server builder can be used for testing
*/
@VisibleForTesting
public void setServerBuilder(ServerBuilder<?> serverBuilder) {
void setServerBuilder(ServerBuilder<?> serverBuilder) {
this.server = serverBuilder
.addService(this.service)
.build();
Expand Down
8 changes: 8 additions & 0 deletions src/main/java/io/numaproj/numaflow/sinker/DatumIterator.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,12 @@ public interface DatumIterator {
* @throws InterruptedException if the thread is interrupted while waiting for the next element
*/
Datum next() throws InterruptedException;

/**
* Write a datum to the iterator
* This method blocks until the write operation is successful.
* @param datum the datum to write
* @throws InterruptedException if the thread is interrupted while waiting for the write operation
*/
void write(Datum datum) throws InterruptedException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ public Datum next() throws InterruptedException {
return datum;
}

// blocking call, waits until the write operation is successful
public void writeMessage(Datum datum) throws InterruptedException {
public void write(Datum datum) throws InterruptedException {
blockingQueue.put(datum);
}
}
25 changes: 22 additions & 3 deletions src/main/java/io/numaproj/numaflow/sinker/Server.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.numaproj.numaflow.sinker;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import io.grpc.ServerBuilder;
import io.numaproj.numaflow.info.ContainerType;
import io.numaproj.numaflow.info.ServerInfoAccessor;
Expand Down Expand Up @@ -37,7 +38,7 @@
* @param sinker sink to process the message
*/
public Server(Sinker sinker, GRPCConfig grpcConfig) {
this.service = new Service(sinker);
this.service = new Service(sinker, this);
this.grpcConfig = grpcConfig;
}

Expand Down Expand Up @@ -84,10 +85,15 @@
try {
Server.this.stop();
} catch (InterruptedException e) {
Thread.interrupted();
if (Thread.interrupted()) {
System.err.println("Thread was interrupted when trying to stop the sink gRPC server.\n"

Check warning on line 89 in src/main/java/io/numaproj/numaflow/sinker/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/sinker/Server.java#L89

Added line #L89 was not covered by tests
+ "Thread interrupted status cleared");
}
System.err.println("Sink server printing stack trace for the exception to stderr");

Check warning on line 92 in src/main/java/io/numaproj/numaflow/sinker/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/sinker/Server.java#L92

Added line #L92 was not covered by tests
e.printStackTrace(System.err);
}
}));
log.info("Sink server shutdown hook registered");
}

/**
Expand All @@ -97,8 +103,12 @@
*
* @throws InterruptedException if the current thread is interrupted while waiting
*/
// TODO - should we use stop instead of awaitTermination in main?
public void awaitTermination() throws InterruptedException {
log.info("Sink server is waiting for termination");

Check warning on line 108 in src/main/java/io/numaproj/numaflow/sinker/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/sinker/Server.java#L108

Added line #L108 was not covered by tests
server.awaitTermination();
log.info("Sink server has terminated");
System.exit(0);

Check warning on line 111 in src/main/java/io/numaproj/numaflow/sinker/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/sinker/Server.java#L110-L111

Added lines #L110 - L111 were not covered by tests
}

/**
Expand All @@ -107,15 +117,23 @@
*
* @throws InterruptedException if shutdown is interrupted
*/
// TODO - can udsink call this method?
// what the difference between this method and awaitTermination?
public void stop() throws InterruptedException {
log.info("Server.stop started. Shutting down sink service");
// TODO - should server shutdown take care of service shutdown?
this.service.shutDown();
log.info("sink service is successfully shut down");
if (server != null) {
log.info("Shutting down gRPC server");
server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
// force shutdown if not terminated
if (!server.isTerminated()) {
log.info("Server did not terminate in {} seconds. Shutting down forcefully", 30);

Check warning on line 132 in src/main/java/io/numaproj/numaflow/sinker/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/sinker/Server.java#L132

Added line #L132 was not covered by tests
server.shutdownNow();
}
}
log.info("Server.stop successfully completed");
}

/**
Expand All @@ -124,7 +142,8 @@
*
* @param serverBuilder the server builder to be used
*/
public void setServerBuilder(ServerBuilder<?> serverBuilder) {
@VisibleForTesting
void setServerBuilder(ServerBuilder<?> serverBuilder) {
this.server = serverBuilder
.addService(this.service)
.build();
Expand Down
Loading
Loading