Skip to content

Commit

Permalink
akka change for mapper
Browse files Browse the repository at this point in the history
Signed-off-by: Keran Yang <[email protected]>
  • Loading branch information
KeranYang committed Nov 6, 2024
1 parent a675025 commit fd5a3ee
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 8 deletions.
37 changes: 30 additions & 7 deletions src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ class MapSupervisorActor extends AbstractActor {
private final Mapper mapper;
private final StreamObserver<MapOuterClass.MapResponse> responseObserver;
private final CompletableFuture<Void> shutdownSignal;
private int activeMapperCount;
private Exception userException;

public MapSupervisorActor(
Mapper mapper,
Expand All @@ -55,6 +57,8 @@ public MapSupervisorActor(
this.mapper = mapper;
this.responseObserver = responseObserver;
this.shutdownSignal = failureFuture;
this.userException = null;
this.activeMapperCount = 0;
}

public static Props props(
Expand All @@ -66,13 +70,14 @@ public static Props props(

@Override
public void preRestart(Throwable reason, Optional<Object> message) {
log.debug("supervisor pre restart was executed");
getContext().getSystem().log().warning("supervisor pre restart was executed due to: {}", reason.getMessage());
shutdownSignal.completeExceptionally(reason);
responseObserver.onError(Status.INTERNAL

Check warning on line 75 in src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java#L73-L75

Added lines #L73 - L75 were not covered by tests
.withDescription(reason.getMessage())
.withCause(reason)
.asException());
Service.mapperActorSystem.stop(getSelf());
shutdownSignal.completeExceptionally(reason);

Check warning on line 80 in src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java#L80

Added line #L80 was not covered by tests
}

@Override
Expand All @@ -94,34 +99,52 @@ public Receive createReceive() {

private void handleFailure(Exception e) {
log.error("Encountered error in mapFn - {}", e.getMessage());
shutdownSignal.completeExceptionally(e);
responseObserver.onError(Status.INTERNAL
.withDescription(e.getMessage())
.withCause(e)
.asException());
if (userException == null) {
userException = e;
// only send the very first exception to the client
// one exception should trigger a container restart
responseObserver.onError(Status.INTERNAL
.withDescription(e.getMessage())
.withCause(e)
.asException());
}
activeMapperCount--;
}

private void sendResponse(MapOuterClass.MapResponse mapResponse) {
responseObserver.onNext(mapResponse);
activeMapperCount--;
}

private void processRequest(MapOuterClass.MapRequest mapRequest) {
if (userException != null) {
log.info("a previous mapper actor failed, not processing any more requests");

Check warning on line 121 in src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java#L121

Added line #L121 was not covered by tests
if (activeMapperCount == 0) {
log.info("there is no more active mapper AKKA actors - stopping the system");
getContext().getSystem().stop(getSelf());
log.info("AKKA system stopped");
shutdownSignal.completeExceptionally(userException);

Check warning on line 126 in src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java#L123-L126

Added lines #L123 - L126 were not covered by tests
}
return;

Check warning on line 128 in src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java#L128

Added line #L128 was not covered by tests
}

// Create a MapperActor for each incoming request.
ActorRef mapperActor = getContext()
.actorOf(MapperActor.props(
mapper));

// Send the message to the MapperActor.
mapperActor.tell(mapRequest, getSelf());
activeMapperCount++;
}

// if we see dead letters, we need to stop the execution and exit
// to make sure no messages are lost
private void handleDeadLetters(AllDeadLetters deadLetter) {
log.debug("got a dead letter, stopping the execution");
shutdownSignal.completeExceptionally(new Throwable("dead letters"));
responseObserver.onError(Status.INTERNAL.withDescription("dead letters").asException());

Check warning on line 145 in src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java#L145

Added line #L145 was not covered by tests
getContext().getSystem().stop(getSelf());
shutdownSignal.completeExceptionally(new Throwable("dead letters"));

Check warning on line 147 in src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java#L147

Added line #L147 was not covered by tests
}

@Override
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/io/numaproj/numaflow/mapper/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ public void start() throws Exception {
Server.this.stop();
log.info("gracefully shutting down event loop groups");
this.grpcServerHelper.gracefullyShutdownEventLoopGroups();

Check warning on line 95 in src/main/java/io/numaproj/numaflow/mapper/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/mapper/Server.java#L94-L95

Added lines #L94 - L95 were not covered by tests
// FIXME - this is a workaround to immediately terminate the JVM process
// The correct way to do this is to stop all the actors and wait for them to terminate
System.exit(0);

Check warning on line 98 in src/main/java/io/numaproj/numaflow/mapper/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/mapper/Server.java#L98

Added line #L98 was not covered by tests
} catch (InterruptedException e) {
Thread.interrupted();
e.printStackTrace(System.err);
Expand All @@ -112,6 +115,9 @@ public void start() throws Exception {
Server.this.stop();
log.info("gracefully shutting down event loop groups");
this.grpcServerHelper.gracefullyShutdownEventLoopGroups();

Check warning on line 117 in src/main/java/io/numaproj/numaflow/mapper/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/mapper/Server.java#L114-L117

Added lines #L114 - L117 were not covered by tests
// FIXME - this is a workaround to immediately terminate the JVM process
// The correct way to do this is to stop all the actors and wait for them to terminate
System.exit(0);
} catch (InterruptedException ex) {
Thread.interrupted();
ex.printStackTrace(System.err);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ private void processRequest(Sourcetransformer.SourceTransformRequest transformRe
* @param deadLetter The dead letter to be handled.
*/
private void handleDeadLetters(AllDeadLetters deadLetter) {
log.info("got a dead letter, stopping the execution");
log.debug("got a dead letter, stopping the execution");
responseObserver.onError(Status.INTERNAL.withDescription("dead letters").asException());

Check warning on line 195 in src/main/java/io/numaproj/numaflow/sourcetransformer/TransformSupervisorActor.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/sourcetransformer/TransformSupervisorActor.java#L195

Added line #L195 was not covered by tests
getContext().getSystem().stop(getSelf());
shutdownSignal.completeExceptionally(new Throwable("dead letters"));

Check warning on line 197 in src/main/java/io/numaproj/numaflow/sourcetransformer/TransformSupervisorActor.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/sourcetransformer/TransformSupervisorActor.java#L197

Added line #L197 was not covered by tests
Expand Down

0 comments on commit fd5a3ee

Please sign in to comment.