Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: gracefully shutdown when error encountered #152

Merged
merged 17 commits into from
Nov 6, 2024
50 changes: 39 additions & 11 deletions src/main/java/io/numaproj/numaflow/batchmapper/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
import io.numaproj.numaflow.info.ContainerType;
import io.numaproj.numaflow.info.ServerInfoAccessor;
import io.numaproj.numaflow.info.ServerInfoAccessorImpl;
import io.numaproj.numaflow.shared.GrpcServerHelper;
import io.numaproj.numaflow.shared.GrpcServerUtils;
import lombok.extern.slf4j.Slf4j;

import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

/**
Expand All @@ -20,8 +22,10 @@

private final GRPCConfig grpcConfig;
private final Service service;
private final CompletableFuture<Void> shutdownSignal;
private final ServerInfoAccessor serverInfoAccessor = new ServerInfoAccessorImpl(new ObjectMapper());
private io.grpc.Server server;
private final GrpcServerHelper grpcServerHelper;

/**
* constructor to create sink gRPC server.
Expand All @@ -39,8 +43,10 @@
* @param batchMapper to process the message
*/
public Server(BatchMapper batchMapper, GRPCConfig grpcConfig) {
this.service = new Service(batchMapper);
this.shutdownSignal = new CompletableFuture<>();
this.service = new Service(batchMapper, this.shutdownSignal);
this.grpcConfig = grpcConfig;
this.grpcServerHelper = new GrpcServerHelper();
}

/**
Expand All @@ -57,35 +63,55 @@
Collections.singletonMap(Constants.MAP_MODE_KEY, Constants.MAP_MODE));

if (this.server == null) {
// create server builder
ServerBuilder<?> serverBuilder = GrpcServerUtils.createServerBuilder(
this.server = grpcServerHelper.createServer(

Check warning on line 66 in src/main/java/io/numaproj/numaflow/batchmapper/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/batchmapper/Server.java#L66

Added line #L66 was not covered by tests
grpcConfig.getSocketPath(),
grpcConfig.getMaxMessageSize(),
grpcConfig.isLocal(),
grpcConfig.getPort());
// build server
this.server = serverBuilder
.addService(this.service)
.build();
grpcConfig.getPort(),

Check warning on line 70 in src/main/java/io/numaproj/numaflow/batchmapper/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/batchmapper/Server.java#L70

Added line #L70 was not covered by tests
this.service);
}

// start server
server.start();

log.info(
"Server started, listening on socket path: " + grpcConfig.getSocketPath());
"server started, listening on socket path: " + grpcConfig.getSocketPath());

// register shutdown hook
// register shutdown hook to gracefully shut down the server
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
System.err.println("*** shutting down gRPC server since JVM is shutting down");
if (server != null && server.isTerminated()) {
return;
}
try {
Server.this.stop();
log.info("gracefully shutting down event loop groups");
this.grpcServerHelper.gracefullyShutdownEventLoopGroups();

Check warning on line 89 in src/main/java/io/numaproj/numaflow/batchmapper/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/batchmapper/Server.java#L88-L89

Added lines #L88 - L89 were not covered by tests
} catch (InterruptedException e) {
Thread.interrupted();
e.printStackTrace(System.err);
}
}));

// if there are any exceptions, shutdown the server gracefully.
shutdownSignal.whenCompleteAsync((v, e) -> {
if (server != null && server.isTerminated()) {
return;

Check warning on line 99 in src/main/java/io/numaproj/numaflow/batchmapper/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/batchmapper/Server.java#L99

Added line #L99 was not covered by tests
}

if (e != null) {
System.err.println("*** shutting down batch map gRPC server because of an exception - " + e.getMessage());
try {
log.info("stopping server");
Server.this.stop();
log.info("gracefully shutting down event loop groups");
this.grpcServerHelper.gracefullyShutdownEventLoopGroups();
} catch (InterruptedException ex) {
Thread.interrupted();
ex.printStackTrace(System.err);

Check warning on line 111 in src/main/java/io/numaproj/numaflow/batchmapper/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/batchmapper/Server.java#L109-L111

Added lines #L109 - L111 were not covered by tests
}
}
});
}

/**
Expand All @@ -96,7 +122,9 @@
* @throws InterruptedException if the current thread is interrupted while waiting
*/
public void awaitTermination() throws InterruptedException {
log.info("batch map server is waiting for termination");

Check warning on line 125 in src/main/java/io/numaproj/numaflow/batchmapper/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/batchmapper/Server.java#L125

Added line #L125 was not covered by tests
server.awaitTermination();
log.info("batch map server has terminated");

Check warning on line 127 in src/main/java/io/numaproj/numaflow/batchmapper/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/batchmapper/Server.java#L127

Added line #L127 was not covered by tests
}

/**
Expand Down
17 changes: 11 additions & 6 deletions src/main/java/io/numaproj/numaflow/batchmapper/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
// BatchMapper instance to process the messages
private final BatchMapper batchMapper;

// Signal to shut down the gRPC server
private final CompletableFuture<Void> shutdownSignal;

// Applies a map function to each datum element in the stream.
@Override
public StreamObserver<MapOuterClass.MapRequest> mapFn(StreamObserver<MapOuterClass.MapResponse> responseObserver) {
Expand Down Expand Up @@ -93,8 +96,9 @@
datumStream.writeMessage(constructHandlerDatum(mapRequest));
}
} catch (Exception e) {
log.error("Encountered an error in batch map", e);
responseObserver.onError(Status.UNKNOWN
log.error("Encountered an error in batch map onNext - {}", e.getMessage());
shutdownSignal.completeExceptionally(e);
responseObserver.onError(Status.INTERNAL
.withDescription(e.getMessage())
.withCause(e)
.asException());
Expand All @@ -104,11 +108,12 @@
// Called when an error occurs
@Override
public void onError(Throwable throwable) {
log.error("Error Encountered in batchMap Stream", throwable);
var status = Status.UNKNOWN
log.error("Error Encountered in batchMap Stream - {}", throwable.getMessage());
shutdownSignal.completeExceptionally(throwable);
responseObserver.onError(Status.INTERNAL

Check warning on line 113 in src/main/java/io/numaproj/numaflow/batchmapper/Service.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/batchmapper/Service.java#L111-L113

Added lines #L111 - L113 were not covered by tests
.withDescription(throwable.getMessage())
.withCause(throwable);
responseObserver.onError(status.asException());
.withCause(throwable)
.asException());

Check warning on line 116 in src/main/java/io/numaproj/numaflow/batchmapper/Service.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/batchmapper/Service.java#L115-L116

Added lines #L115 - L116 were not covered by tests
}

// Called when the client has finished sending requests
Expand Down
25 changes: 13 additions & 12 deletions src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,29 +46,29 @@
class MapSupervisorActor extends AbstractActor {
private final Mapper mapper;
private final StreamObserver<MapOuterClass.MapResponse> responseObserver;
private final CompletableFuture<Void> failureFuture;
private final CompletableFuture<Void> shutdownSignal;

public MapSupervisorActor(
Mapper mapper,
StreamObserver<MapOuterClass.MapResponse> responseObserver,
CompletableFuture<Void> failureFuture) {
this.mapper = mapper;
this.responseObserver = responseObserver;
this.failureFuture = failureFuture;
this.shutdownSignal = failureFuture;
}

public static Props props(
Mapper mapper,
StreamObserver<MapOuterClass.MapResponse> responseObserver,
CompletableFuture<Void> failureFuture) {
return Props.create(MapSupervisorActor.class, mapper, responseObserver, failureFuture);
CompletableFuture<Void> shutdownSignal) {
return Props.create(MapSupervisorActor.class, mapper, responseObserver, shutdownSignal);
}

@Override
public void preRestart(Throwable reason, Optional<Object> message) {
log.debug("supervisor pre restart was executed");
failureFuture.completeExceptionally(reason);
responseObserver.onError(Status.UNKNOWN
shutdownSignal.completeExceptionally(reason);
responseObserver.onError(Status.INTERNAL

Check warning on line 71 in src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java#L70-L71

Added lines #L70 - L71 were not covered by tests
.withDescription(reason.getMessage())
.withCause(reason)
.asException());
Expand All @@ -93,11 +93,12 @@
}

private void handleFailure(Exception e) {
responseObserver.onError(Status.UNKNOWN
log.error("Encountered error in mapFn - {}", e.getMessage());
shutdownSignal.completeExceptionally(e);
responseObserver.onError(Status.INTERNAL
.withDescription(e.getMessage())
.withCause(e)
.asException());
failureFuture.completeExceptionally(e);
}

private void sendResponse(MapOuterClass.MapResponse mapResponse) {
Expand All @@ -118,8 +119,8 @@
// to make sure no messages are lost
private void handleDeadLetters(AllDeadLetters deadLetter) {
log.debug("got a dead letter, stopping the execution");
responseObserver.onError(Status.UNKNOWN.withDescription("dead letters").asException());
failureFuture.completeExceptionally(new Throwable("dead letters"));
shutdownSignal.completeExceptionally(new Throwable("dead letters"));
responseObserver.onError(Status.INTERNAL.withDescription("dead letters").asException());

Check warning on line 123 in src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java#L122-L123

Added lines #L122 - L123 were not covered by tests
getContext().getSystem().stop(getSelf());
}

Expand All @@ -129,8 +130,8 @@
return new AllForOneStrategy(
DeciderBuilder
.match(Exception.class, e -> {
failureFuture.completeExceptionally(e);
responseObserver.onError(Status.UNKNOWN
shutdownSignal.completeExceptionally(e);
responseObserver.onError(Status.INTERNAL

Check warning on line 134 in src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java#L133-L134

Added lines #L133 - L134 were not covered by tests
.withDescription(e.getMessage())
.withCause(e)
.asException());
Expand Down
52 changes: 39 additions & 13 deletions src/main/java/io/numaproj/numaflow/mapper/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
import io.numaproj.numaflow.info.ContainerType;
import io.numaproj.numaflow.info.ServerInfoAccessor;
import io.numaproj.numaflow.info.ServerInfoAccessorImpl;
import io.numaproj.numaflow.shared.GrpcServerHelper;
import io.numaproj.numaflow.shared.GrpcServerUtils;
import lombok.extern.slf4j.Slf4j;

import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

/**
Expand All @@ -19,8 +21,10 @@

private final GRPCConfig grpcConfig;
private final Service service;
private final CompletableFuture<Void> shutdownSignal;
private final ServerInfoAccessor serverInfoAccessor = new ServerInfoAccessorImpl(new ObjectMapper());
private io.grpc.Server server;
private final GrpcServerHelper grpcServerHelper;

/**
* constructor to create gRPC server.
Expand All @@ -38,8 +42,10 @@
* @param mapper to process the message
*/
public Server(Mapper mapper, GRPCConfig grpcConfig) {
this.service = new Service(mapper);
this.shutdownSignal = new CompletableFuture<>();
this.service = new Service(mapper, this.shutdownSignal);
this.grpcConfig = grpcConfig;
this.grpcServerHelper = new GrpcServerHelper();
}

/**
Expand All @@ -61,39 +67,57 @@
}

if (this.server == null) {
ServerBuilder<?> serverBuilder;
// create server builder for domain socket server
serverBuilder = GrpcServerUtils.createServerBuilder(
this.server = this.grpcServerHelper.createServer(

Check warning on line 70 in src/main/java/io/numaproj/numaflow/mapper/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/mapper/Server.java#L70

Added line #L70 was not covered by tests
grpcConfig.getSocketPath(),
grpcConfig.getMaxMessageSize(),
grpcConfig.isLocal(),
grpcConfig.getPort());

// build server
this.server = serverBuilder
.addService(this.service)
.build();
grpcConfig.getPort(),

Check warning on line 74 in src/main/java/io/numaproj/numaflow/mapper/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/mapper/Server.java#L74

Added line #L74 was not covered by tests
this.service);
}

// start server
server.start();

log.info(
"Server started, listening on {}",
"server started, listening on {}",
grpcConfig.isLocal() ?
"localhost:" + grpcConfig.getPort() : grpcConfig.getSocketPath());

// register shutdown hook
// register shutdown hook to gracefully shut down the server
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
System.err.println("*** shutting down gRPC server since JVM is shutting down");
if (server != null && server.isTerminated()) {
return;
}
try {
Server.this.stop();
log.info("gracefully shutting down event loop groups");
this.grpcServerHelper.gracefullyShutdownEventLoopGroups();

Check warning on line 95 in src/main/java/io/numaproj/numaflow/mapper/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/mapper/Server.java#L94-L95

Added lines #L94 - L95 were not covered by tests
} catch (InterruptedException e) {
Thread.interrupted();
e.printStackTrace(System.err);
}
}));

// if there are any exceptions, shutdown the server gracefully.
shutdownSignal.whenCompleteAsync((v, e) -> {
if (server.isTerminated()) {
return;

Check warning on line 105 in src/main/java/io/numaproj/numaflow/mapper/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/mapper/Server.java#L105

Added line #L105 was not covered by tests
}

if (e != null) {
System.err.println("*** shutting down mapper gRPC server because of an exception - " + e.getMessage());
try {
log.info("stopping server");
Server.this.stop();
log.info("gracefully shutting down event loop groups");
this.grpcServerHelper.gracefullyShutdownEventLoopGroups();
} catch (InterruptedException ex) {
Thread.interrupted();
ex.printStackTrace(System.err);

Check warning on line 117 in src/main/java/io/numaproj/numaflow/mapper/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/mapper/Server.java#L115-L117

Added lines #L115 - L117 were not covered by tests
}
}
});
}

/**
Expand All @@ -104,7 +128,9 @@
* @throws InterruptedException if the current thread is interrupted while waiting
*/
public void awaitTermination() throws InterruptedException {
log.info("mapper server is waiting for termination");

Check warning on line 131 in src/main/java/io/numaproj/numaflow/mapper/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/mapper/Server.java#L131

Added line #L131 was not covered by tests
server.awaitTermination();
log.info("mapper server has terminated");

Check warning on line 133 in src/main/java/io/numaproj/numaflow/mapper/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/mapper/Server.java#L133

Added line #L133 was not covered by tests
}

/**
Expand Down
19 changes: 2 additions & 17 deletions src/main/java/io/numaproj/numaflow/mapper/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,7 @@ class Service extends MapGrpc.MapImplBase {
public static final ActorSystem mapperActorSystem = ActorSystem.create("mapper");

private final Mapper mapper;

// TODO we need to propagate the exception all the way up and shutdown the server.
static void handleFailure(
CompletableFuture<Void> failureFuture) {
new Thread(() -> {
try {
failureFuture.get();
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
private final CompletableFuture<Void> shutdownSignal;

@Override
public StreamObserver<MapOuterClass.MapRequest> mapFn(final StreamObserver<MapOuterClass.MapResponse> responseObserver) {
Expand All @@ -43,13 +32,9 @@ public StreamObserver<MapOuterClass.MapRequest> mapFn(final StreamObserver<MapOu
responseObserver);
}

CompletableFuture<Void> failureFuture = new CompletableFuture<>();

handleFailure(failureFuture);

// create a MapSupervisorActor that processes the map requests.
ActorRef mapSupervisorActor = mapperActorSystem
.actorOf(MapSupervisorActor.props(mapper, responseObserver, failureFuture));
.actorOf(MapSupervisorActor.props(mapper, responseObserver, shutdownSignal));

return new StreamObserver<>() {
private boolean handshakeDone = false;
Expand Down
Loading
Loading