Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: gracefully shutdown when error encountered #152

Merged
merged 17 commits into from
Nov 6, 2024
110 changes: 110 additions & 0 deletions src/main/java/io/numaproj/numaflow/shared/GrpcServerHelper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package io.numaproj.numaflow.shared;

import io.grpc.BindableService;
import io.grpc.Context;
import io.grpc.Contexts;
import io.grpc.ForwardingServerCallListener;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.Status;
import io.grpc.netty.NettyServerBuilder;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.unix.DomainSocketAddress;

import static io.numaproj.numaflow.shared.GrpcServerUtils.DATUM_METADATA_WIN_END;
import static io.numaproj.numaflow.shared.GrpcServerUtils.DATUM_METADATA_WIN_START;
import static io.numaproj.numaflow.shared.GrpcServerUtils.WINDOW_END_TIME;
import static io.numaproj.numaflow.shared.GrpcServerUtils.WINDOW_START_TIME;

public class GrpcServerHelper {
private EventLoopGroup bossEventLoopGroup;
private EventLoopGroup workerEventLoopGroup;

public void gracefullyShutdownEventLoopGroups() {
if (this.bossEventLoopGroup != null) {
this.bossEventLoopGroup.shutdownGracefully();

Check warning on line 28 in src/main/java/io/numaproj/numaflow/shared/GrpcServerHelper.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/shared/GrpcServerHelper.java#L28

Added line #L28 was not covered by tests
}
if (this.workerEventLoopGroup != null) {
this.workerEventLoopGroup.shutdownGracefully();

Check warning on line 31 in src/main/java/io/numaproj/numaflow/shared/GrpcServerHelper.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/shared/GrpcServerHelper.java#L31

Added line #L31 was not covered by tests
}
}

public Server createServer(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is mostly copied from GrpcServerUtils.java. We need to manage the state of event loop groups hence I created this class.

String socketPath,
int maxMessageSize,
boolean isLocal,
int port,
BindableService service) {
ServerInterceptor interceptor = new ServerInterceptor() {

Check warning on line 41 in src/main/java/io/numaproj/numaflow/shared/GrpcServerHelper.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/shared/GrpcServerHelper.java#L41

Added line #L41 was not covered by tests
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
io.grpc.Metadata headers,
ServerCallHandler<ReqT, RespT> next) {

final var context =
Context.current().withValues(

Check warning on line 49 in src/main/java/io/numaproj/numaflow/shared/GrpcServerHelper.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/shared/GrpcServerHelper.java#L49

Added line #L49 was not covered by tests
WINDOW_START_TIME,
headers.get(DATUM_METADATA_WIN_START),

Check warning on line 51 in src/main/java/io/numaproj/numaflow/shared/GrpcServerHelper.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/shared/GrpcServerHelper.java#L51

Added line #L51 was not covered by tests
WINDOW_END_TIME,
headers.get(DATUM_METADATA_WIN_END));
ServerCall.Listener<ReqT> listener = Contexts.interceptCall(

Check warning on line 54 in src/main/java/io/numaproj/numaflow/shared/GrpcServerHelper.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/shared/GrpcServerHelper.java#L53-L54

Added lines #L53 - L54 were not covered by tests
context,
call,
headers,
next);
return new ForwardingServerCallListener.SimpleForwardingServerCallListener<>(
listener) {

Check warning on line 60 in src/main/java/io/numaproj/numaflow/shared/GrpcServerHelper.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/shared/GrpcServerHelper.java#L59-L60

Added lines #L59 - L60 were not covered by tests
@Override
public void onHalfClose() {
try {
super.onHalfClose();
} catch (RuntimeException ex) {
handleException(ex, call, headers);
throw ex;
}
}

Check warning on line 69 in src/main/java/io/numaproj/numaflow/shared/GrpcServerHelper.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/shared/GrpcServerHelper.java#L64-L69

Added lines #L64 - L69 were not covered by tests

private void handleException(
RuntimeException e,
ServerCall<ReqT, RespT> serverCall,
io.grpc.Metadata headers) {
// Currently, we only have application level exceptions.
// Translate it to UNKNOWN status.
var status = Status.UNKNOWN.withDescription(e.getMessage()).withCause(e);
var newStatus = Status.fromThrowable(status.asException());
serverCall.close(newStatus, headers);
e.printStackTrace();
System.exit(1);
}

Check warning on line 82 in src/main/java/io/numaproj/numaflow/shared/GrpcServerHelper.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/shared/GrpcServerHelper.java#L77-L82

Added lines #L77 - L82 were not covered by tests
};
}
};

if (isLocal) {
return ServerBuilder.forPort(port)
.maxInboundMessageSize(maxMessageSize)
.intercept(interceptor)
.addService(service)
.build();

Check warning on line 92 in src/main/java/io/numaproj/numaflow/shared/GrpcServerHelper.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/shared/GrpcServerHelper.java#L88-L92

Added lines #L88 - L92 were not covered by tests
}

this.bossEventLoopGroup = GrpcServerUtils.createEventLoopGroup(1, "netty-boss");
this.workerEventLoopGroup = GrpcServerUtils.createEventLoopGroup(
ThreadUtils.INSTANCE.availableProcessors(),

Check warning on line 97 in src/main/java/io/numaproj/numaflow/shared/GrpcServerHelper.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/shared/GrpcServerHelper.java#L95-L97

Added lines #L95 - L97 were not covered by tests
"netty-worker");

return NettyServerBuilder
.forAddress(new DomainSocketAddress(socketPath))
.channelType(GrpcServerUtils.getChannelTypeClass())
.maxInboundMessageSize(maxMessageSize)
.bossEventLoopGroup(this.bossEventLoopGroup)
.workerEventLoopGroup(this.workerEventLoopGroup)
.intercept(interceptor)
.addService(service)
.build();

Check warning on line 108 in src/main/java/io/numaproj/numaflow/shared/GrpcServerHelper.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/shared/GrpcServerHelper.java#L100-L108

Added lines #L100 - L108 were not covered by tests
}
}
56 changes: 43 additions & 13 deletions src/main/java/io/numaproj/numaflow/sinker/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
import io.numaproj.numaflow.info.ContainerType;
import io.numaproj.numaflow.info.ServerInfoAccessor;
import io.numaproj.numaflow.info.ServerInfoAccessorImpl;
import io.numaproj.numaflow.shared.GrpcServerHelper;
import io.numaproj.numaflow.shared.GrpcServerUtils;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

/**
Expand All @@ -18,8 +20,10 @@

private final GRPCConfig grpcConfig;
private final Service service;
public final CompletableFuture<Void> shutdownSignal;
private final ServerInfoAccessor serverInfoAccessor = new ServerInfoAccessorImpl(new ObjectMapper());
private io.grpc.Server server;
private final GrpcServerHelper grpcServerHelper;

/**
* constructor to create sink gRPC server.
Expand All @@ -37,8 +41,10 @@
* @param sinker sink to process the message
*/
public Server(Sinker sinker, GRPCConfig grpcConfig) {
this.service = new Service(sinker);
this.shutdownSignal = new CompletableFuture<>();
this.service = new Service(sinker, this.shutdownSignal);
this.grpcConfig = grpcConfig;
this.grpcServerHelper = new GrpcServerHelper();
}

/**
Expand All @@ -56,38 +62,57 @@
}

if (this.server == null) {
// create server builder
ServerBuilder<?> serverBuilder = GrpcServerUtils.createServerBuilder(
this.server = this.grpcServerHelper.createServer(

Check warning on line 65 in src/main/java/io/numaproj/numaflow/sinker/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/sinker/Server.java#L65

Added line #L65 was not covered by tests
grpcConfig.getSocketPath(),
grpcConfig.getMaxMessageSize(),
grpcConfig.isLocal(),
grpcConfig.getPort());

// build server
this.server = serverBuilder
.addService(this.service)
.build();
grpcConfig.getPort(),

Check warning on line 69 in src/main/java/io/numaproj/numaflow/sinker/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/sinker/Server.java#L69

Added line #L69 was not covered by tests
this.service);
}

// start server
server.start();

log.info(
"Server started, listening on {}",
"server started, listening on {}",
grpcConfig.isLocal() ?
"localhost:" + grpcConfig.getPort() : grpcConfig.getSocketPath());

// register shutdown hook
// register shutdown hook to gracefully shut down the server
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
System.err.println("*** shutting down gRPC server since JVM is shutting down");
System.err.println("*** shutting down sink gRPC server since JVM is shutting down");
if (server.isTerminated()) {
return;
}
try {
Server.this.stop();
log.info("gracefully shutting down event loop groups");
this.grpcServerHelper.gracefullyShutdownEventLoopGroups();

Check warning on line 90 in src/main/java/io/numaproj/numaflow/sinker/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/sinker/Server.java#L89-L90

Added lines #L89 - L90 were not covered by tests
} catch (InterruptedException e) {
Thread.interrupted();
e.printStackTrace(System.err);
}
}));

// if there are any exceptions, shutdown the server gracefully.
shutdownSignal.whenCompleteAsync((v, e) -> {
if (server.isTerminated()) {
return;

Check warning on line 100 in src/main/java/io/numaproj/numaflow/sinker/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/sinker/Server.java#L100

Added line #L100 was not covered by tests
}

if (e != null) {
System.err.println("*** shutting down sink gRPC server because of an exception - " + e.getMessage());
try {
log.info("stopping server");
Server.this.stop();
log.info("gracefully shutting down event loop groups");
this.grpcServerHelper.gracefullyShutdownEventLoopGroups();
} catch (InterruptedException ex) {
Thread.interrupted();
ex.printStackTrace(System.err);

Check warning on line 112 in src/main/java/io/numaproj/numaflow/sinker/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/sinker/Server.java#L110-L112

Added lines #L110 - L112 were not covered by tests
}
}
});
}

/**
Expand All @@ -98,7 +123,9 @@
* @throws InterruptedException if the current thread is interrupted while waiting
*/
public void awaitTermination() throws InterruptedException {
log.info("sink server is waiting for termination");

Check warning on line 126 in src/main/java/io/numaproj/numaflow/sinker/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/sinker/Server.java#L126

Added line #L126 was not covered by tests
server.awaitTermination();
log.info("sink server is terminated");

Check warning on line 128 in src/main/java/io/numaproj/numaflow/sinker/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/sinker/Server.java#L128

Added line #L128 was not covered by tests
}

/**
Expand All @@ -108,14 +135,17 @@
* @throws InterruptedException if shutdown is interrupted
*/
public void stop() throws InterruptedException {
log.info("server.stop started. Shutting down sink service");
this.service.shutDown();
if (server != null) {
server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
// force shutdown if not terminated
if (!server.isTerminated()) {
log.info("server did not terminate in {} seconds. Shutting down forcefully", 30);

Check warning on line 144 in src/main/java/io/numaproj/numaflow/sinker/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/sinker/Server.java#L144

Added line #L144 was not covered by tests
server.shutdownNow();
}
}
log.info("server.stop successfully completed");
}

/**
Expand Down
12 changes: 8 additions & 4 deletions src/main/java/io/numaproj/numaflow/sinker/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@
.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);

private final Sinker sinker;
private final CompletableFuture<Void> shutdownSignal;

public Service(Sinker sinker) {
public Service(Sinker sinker, CompletableFuture<Void> shutdownSignal) {
this.sinker = sinker;
this.shutdownSignal = shutdownSignal;
}

/**
Expand Down Expand Up @@ -96,15 +98,17 @@
datumStream.writeMessage(constructHandlerDatum(request));
}
} catch (Exception e) {
log.error("Encountered error in sinkFn - {}", e.getMessage());
responseObserver.onError(e);
log.error("Encountered error in sinkFn onNext - {}", e.getMessage());
shutdownSignal.completeExceptionally(e);
responseObserver.onError(Status.INTERNAL.withDescription(e.getMessage()).asException());
}
}

@Override
public void onError(Throwable throwable) {
log.error("Encountered error in sinkFn - {}", throwable.getMessage());
responseObserver.onError(throwable);
shutdownSignal.completeExceptionally(throwable);
responseObserver.onError(Status.INTERNAL.withDescription(throwable.getMessage()).asException());

Check warning on line 111 in src/main/java/io/numaproj/numaflow/sinker/Service.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/sinker/Service.java#L110-L111

Added lines #L110 - L111 were not covered by tests
}

@Override
Expand Down
Loading