Skip to content

Commit

Permalink
map
Browse files Browse the repository at this point in the history
Signed-off-by: Keran Yang <[email protected]>
  • Loading branch information
KeranYang committed Nov 6, 2024
1 parent 8f53154 commit 561dc5d
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 43 deletions.
24 changes: 12 additions & 12 deletions src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,29 +46,29 @@
class MapSupervisorActor extends AbstractActor {
private final Mapper mapper;
private final StreamObserver<MapOuterClass.MapResponse> responseObserver;
private final CompletableFuture<Void> failureFuture;
private final CompletableFuture<Void> shutdownSignal;

public MapSupervisorActor(
Mapper mapper,
StreamObserver<MapOuterClass.MapResponse> responseObserver,
CompletableFuture<Void> failureFuture) {
this.mapper = mapper;
this.responseObserver = responseObserver;
this.failureFuture = failureFuture;
this.shutdownSignal = failureFuture;
}

public static Props props(
Mapper mapper,
StreamObserver<MapOuterClass.MapResponse> responseObserver,
CompletableFuture<Void> failureFuture) {
return Props.create(MapSupervisorActor.class, mapper, responseObserver, failureFuture);
CompletableFuture<Void> shutdownSignal) {
return Props.create(MapSupervisorActor.class, mapper, responseObserver, shutdownSignal);
}

@Override
public void preRestart(Throwable reason, Optional<Object> message) {
log.debug("supervisor pre restart was executed");
failureFuture.completeExceptionally(reason);
responseObserver.onError(Status.UNKNOWN
shutdownSignal.completeExceptionally(reason);
responseObserver.onError(Status.INTERNAL

Check warning on line 71 in src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java#L70-L71

Added lines #L70 - L71 were not covered by tests
.withDescription(reason.getMessage())
.withCause(reason)
.asException());
Expand All @@ -93,11 +93,11 @@ public Receive createReceive() {
}

private void handleFailure(Exception e) {
responseObserver.onError(Status.UNKNOWN
shutdownSignal.completeExceptionally(e);
responseObserver.onError(Status.INTERNAL
.withDescription(e.getMessage())
.withCause(e)
.asException());
failureFuture.completeExceptionally(e);
}

private void sendResponse(MapOuterClass.MapResponse mapResponse) {
Expand All @@ -118,8 +118,8 @@ private void processRequest(MapOuterClass.MapRequest mapRequest) {
// to make sure no messages are lost
private void handleDeadLetters(AllDeadLetters deadLetter) {
log.debug("got a dead letter, stopping the execution");
responseObserver.onError(Status.UNKNOWN.withDescription("dead letters").asException());
failureFuture.completeExceptionally(new Throwable("dead letters"));
shutdownSignal.completeExceptionally(new Throwable("dead letters"));
responseObserver.onError(Status.INTERNAL.withDescription("dead letters").asException());

Check warning on line 122 in src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java#L121-L122

Added lines #L121 - L122 were not covered by tests
getContext().getSystem().stop(getSelf());
}

Expand All @@ -129,8 +129,8 @@ public SupervisorStrategy supervisorStrategy() {
return new AllForOneStrategy(
DeciderBuilder
.match(Exception.class, e -> {
failureFuture.completeExceptionally(e);
responseObserver.onError(Status.UNKNOWN
shutdownSignal.completeExceptionally(e);
responseObserver.onError(Status.INTERNAL

Check warning on line 133 in src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java#L132-L133

Added lines #L132 - L133 were not covered by tests
.withDescription(e.getMessage())
.withCause(e)
.asException());
Expand Down
52 changes: 39 additions & 13 deletions src/main/java/io/numaproj/numaflow/mapper/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
import io.numaproj.numaflow.info.ContainerType;
import io.numaproj.numaflow.info.ServerInfoAccessor;
import io.numaproj.numaflow.info.ServerInfoAccessorImpl;
import io.numaproj.numaflow.shared.GrpcServerHelper;
import io.numaproj.numaflow.shared.GrpcServerUtils;
import lombok.extern.slf4j.Slf4j;

import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

/**
Expand All @@ -19,8 +21,10 @@ public class Server {

private final GRPCConfig grpcConfig;
private final Service service;
private final CompletableFuture<Void> shutdownSignal;
private final ServerInfoAccessor serverInfoAccessor = new ServerInfoAccessorImpl(new ObjectMapper());
private io.grpc.Server server;
private final GrpcServerHelper grpcServerHelper;

/**
* constructor to create gRPC server.
Expand All @@ -38,8 +42,10 @@ public Server(Mapper mapper) {
* @param mapper to process the message
*/
public Server(Mapper mapper, GRPCConfig grpcConfig) {
this.service = new Service(mapper);
this.shutdownSignal = new CompletableFuture<>();
this.service = new Service(mapper, this.shutdownSignal);
this.grpcConfig = grpcConfig;
this.grpcServerHelper = new GrpcServerHelper();
}

/**
Expand All @@ -61,39 +67,57 @@ public void start() throws Exception {
}

if (this.server == null) {
ServerBuilder<?> serverBuilder;
// create server builder for domain socket server
serverBuilder = GrpcServerUtils.createServerBuilder(
this.server = this.grpcServerHelper.createServer(

Check warning on line 70 in src/main/java/io/numaproj/numaflow/mapper/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/mapper/Server.java#L70

Added line #L70 was not covered by tests
grpcConfig.getSocketPath(),
grpcConfig.getMaxMessageSize(),
grpcConfig.isLocal(),
grpcConfig.getPort());

// build server
this.server = serverBuilder
.addService(this.service)
.build();
grpcConfig.getPort(),

Check warning on line 74 in src/main/java/io/numaproj/numaflow/mapper/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/mapper/Server.java#L74

Added line #L74 was not covered by tests
this.service);
}

// start server
server.start();

log.info(
"Server started, listening on {}",
"server started, listening on {}",
grpcConfig.isLocal() ?
"localhost:" + grpcConfig.getPort() : grpcConfig.getSocketPath());

// register shutdown hook
// register shutdown hook to gracefully shut down the server
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
System.err.println("*** shutting down gRPC server since JVM is shutting down");
if (server.isTerminated()) {
return;
}
try {
Server.this.stop();
log.info("gracefully shutting down event loop groups");
this.grpcServerHelper.gracefullyShutdownEventLoopGroups();

Check warning on line 95 in src/main/java/io/numaproj/numaflow/mapper/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/mapper/Server.java#L94-L95

Added lines #L94 - L95 were not covered by tests
} catch (InterruptedException e) {
Thread.interrupted();
e.printStackTrace(System.err);
}
}));

// if there are any exceptions, shutdown the server gracefully.
shutdownSignal.whenCompleteAsync((v, e) -> {
if (server.isTerminated()) {
return;

Check warning on line 105 in src/main/java/io/numaproj/numaflow/mapper/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/mapper/Server.java#L105

Added line #L105 was not covered by tests
}

if (e != null) {
System.err.println("*** shutting down mapper gRPC server because of an exception - " + e.getMessage());
try {
log.info("stopping server");
Server.this.stop();
log.info("gracefully shutting down event loop groups");
this.grpcServerHelper.gracefullyShutdownEventLoopGroups();
} catch (InterruptedException ex) {
Thread.interrupted();
ex.printStackTrace(System.err);

Check warning on line 117 in src/main/java/io/numaproj/numaflow/mapper/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/mapper/Server.java#L115-L117

Added lines #L115 - L117 were not covered by tests
}
}
});
}

/**
Expand All @@ -104,7 +128,9 @@ public void start() throws Exception {
* @throws InterruptedException if the current thread is interrupted while waiting
*/
public void awaitTermination() throws InterruptedException {
log.info("mapper server is waiting for termination");

Check warning on line 131 in src/main/java/io/numaproj/numaflow/mapper/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/mapper/Server.java#L131

Added line #L131 was not covered by tests
server.awaitTermination();
log.info("mapper server has terminated");

Check warning on line 133 in src/main/java/io/numaproj/numaflow/mapper/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/mapper/Server.java#L133

Added line #L133 was not covered by tests
}

/**
Expand Down
19 changes: 2 additions & 17 deletions src/main/java/io/numaproj/numaflow/mapper/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,7 @@ class Service extends MapGrpc.MapImplBase {
public static final ActorSystem mapperActorSystem = ActorSystem.create("mapper");

private final Mapper mapper;

// TODO we need to propagate the exception all the way up and shutdown the server.
static void handleFailure(
CompletableFuture<Void> failureFuture) {
new Thread(() -> {
try {
failureFuture.get();
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
private final CompletableFuture<Void> shutdownSignal;

@Override
public StreamObserver<MapOuterClass.MapRequest> mapFn(final StreamObserver<MapOuterClass.MapResponse> responseObserver) {
Expand All @@ -43,13 +32,9 @@ public StreamObserver<MapOuterClass.MapRequest> mapFn(final StreamObserver<MapOu
responseObserver);
}

CompletableFuture<Void> failureFuture = new CompletableFuture<>();

handleFailure(failureFuture);

// create a MapSupervisorActor that processes the map requests.
ActorRef mapSupervisorActor = mapperActorSystem
.actorOf(MapSupervisorActor.props(mapper, responseObserver, failureFuture));
.actorOf(MapSupervisorActor.props(mapper, responseObserver, shutdownSignal));

return new StreamObserver<>() {
private boolean handshakeDone = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public void testMapperFailure() {
fail("Expected exception not thrown");
} catch (Exception e) {
assertEquals(
"io.grpc.StatusRuntimeException: UNKNOWN: unknown exception",
"io.grpc.StatusRuntimeException: INTERNAL: unknown exception",
e.getMessage());
}
}
Expand Down

0 comments on commit 561dc5d

Please sign in to comment.