Skip to content

Commit

Permalink
fix akka for transformer
Browse files Browse the repository at this point in the history
Signed-off-by: Keran Yang <[email protected]>
  • Loading branch information
KeranYang committed Nov 6, 2024
1 parent 7b77b6d commit a675025
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 14 deletions.
18 changes: 18 additions & 0 deletions examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@
<goal>dockerBuild</goal>
</goals>
<configuration>
<from>
<image>amazoncorretto:11</image>
</from>
<container>
<mainClass>
io.numaproj.numaflow.examples.batchmap.flatmap.BatchFlatMap
Expand All @@ -74,6 +77,9 @@
<goal>dockerBuild</goal>
</goals>
<configuration>
<from>
<image>amazoncorretto:11</image>
</from>
<container>
<mainClass>
io.numaproj.numaflow.examples.sourcetransformer.eventtimefilter.EventTimeFilterFunction
Expand All @@ -93,6 +99,9 @@
<goal>dockerBuild</goal>
</goals>
<configuration>
<from>
<image>amazoncorretto:11</image>
</from>
<container>
<mainClass>
io.numaproj.numaflow.examples.mapstream.flatmapstream.FlatMapStreamFunction
Expand All @@ -110,6 +119,9 @@
<goal>dockerBuild</goal>
</goals>
<configuration>
<from>
<image>amazoncorretto:11</image>
</from>
<container>
<mainClass>
io.numaproj.numaflow.examples.map.flatmap.FlatMapFunction
Expand Down Expand Up @@ -195,6 +207,9 @@
<goal>dockerBuild</goal>
</goals>
<configuration>
<from>
<image>amazoncorretto:11</image>
</from>
<container>
<mainClass>
io.numaproj.numaflow.examples.map.forward.ForwardFunction
Expand Down Expand Up @@ -266,6 +281,9 @@
<goal>dockerBuild</goal>
</goals>
<configuration>
<from>
<image>amazoncorretto:11</image>
</from>
<container>
<mainClass>
io.numaproj.numaflow.examples.source.simple.SimpleSource
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/numaproj/numaflow/mapstreamer/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,9 @@ public void start() throws Exception {
* @throws InterruptedException if the current thread is interrupted while waiting
*/
public void awaitTermination() throws InterruptedException {
log.info("sink server is waiting for termination");
log.info("map stream server is waiting for termination");

Check warning on line 125 in src/main/java/io/numaproj/numaflow/mapstreamer/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/mapstreamer/Server.java#L125

Added line #L125 was not covered by tests
server.awaitTermination();
log.info("sink server has terminated");
log.info("map stream server has terminated");

Check warning on line 127 in src/main/java/io/numaproj/numaflow/mapstreamer/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/mapstreamer/Server.java#L127

Added line #L127 was not covered by tests
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ public void start() throws Exception {
Server.this.stop();
log.info("gracefully shutting down event loop groups");
this.grpcServerHelper.gracefullyShutdownEventLoopGroups();

Check warning on line 91 in src/main/java/io/numaproj/numaflow/sourcetransformer/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/sourcetransformer/Server.java#L90-L91

Added lines #L90 - L91 were not covered by tests
// FIXME - this is a workaround to immediately terminate the JVM process
// The correct way to do this is to stop all the actors and wait for them to terminate
System.exit(0);

Check warning on line 94 in src/main/java/io/numaproj/numaflow/sourcetransformer/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/sourcetransformer/Server.java#L94

Added line #L94 was not covered by tests
} catch (InterruptedException e) {
Thread.interrupted();
e.printStackTrace(System.err);
Expand All @@ -108,6 +111,9 @@ public void start() throws Exception {
Server.this.stop();
log.info("gracefully shutting down event loop groups");
this.grpcServerHelper.gracefullyShutdownEventLoopGroups();

Check warning on line 113 in src/main/java/io/numaproj/numaflow/sourcetransformer/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/sourcetransformer/Server.java#L110-L113

Added lines #L110 - L113 were not covered by tests
// FIXME - this is a workaround to immediately terminate the JVM process
// The correct way to do this is to stop all the actors and wait for them to terminate
System.exit(0);
} catch (InterruptedException ex) {
Thread.interrupted();
ex.printStackTrace(System.err);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ class TransformSupervisorActor extends AbstractActor {
private final SourceTransformer transformer;
private final StreamObserver<Sourcetransformer.SourceTransformResponse> responseObserver;
private final CompletableFuture<Void> shutdownSignal;
private int activeTransformersCount;
private Exception userException;

/**
* Constructor for TransformSupervisorActor.
Expand All @@ -62,6 +64,8 @@ public TransformSupervisorActor(
this.transformer = transformer;
this.responseObserver = responseObserver;
this.shutdownSignal = shutdownSignal;
this.userException = null;
this.activeTransformersCount = 0;
}

/**
Expand Down Expand Up @@ -92,13 +96,13 @@ public static Props props(
*/
@Override
public void preRestart(Throwable reason, Optional<Object> message) {
log.debug("supervisor pre restart was executed");
shutdownSignal.completeExceptionally(reason);
getContext().getSystem().log().warning("supervisor pre restart was executed due to: {}", reason.getMessage());
responseObserver.onError(Status.INTERNAL

Check warning on line 100 in src/main/java/io/numaproj/numaflow/sourcetransformer/TransformSupervisorActor.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/sourcetransformer/TransformSupervisorActor.java#L99-L100

Added lines #L99 - L100 were not covered by tests
.withDescription(reason.getMessage())
.withCause(reason)
.asException());
Service.transformerActorSystem.stop(getSelf());
shutdownSignal.completeExceptionally(reason);

Check warning on line 105 in src/main/java/io/numaproj/numaflow/sourcetransformer/TransformSupervisorActor.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/sourcetransformer/TransformSupervisorActor.java#L105

Added line #L105 was not covered by tests
}

/**
Expand Down Expand Up @@ -133,11 +137,16 @@ public Receive createReceive() {
*/
private void handleFailure(Exception e) {
log.error("Encountered error in sourceTransformFn - {}", e.getMessage());
shutdownSignal.completeExceptionally(e);
responseObserver.onError(Status.INTERNAL
.withDescription(e.getMessage())
.withCause(e)
.asException());
if (userException == null) {
userException = e;
// only send the very first exception to the client
// one exception should trigger a container restart
responseObserver.onError(Status.INTERNAL
.withDescription(e.getMessage())
.withCause(e)
.asException());
}
activeTransformersCount--;
}

/**
Expand All @@ -147,6 +156,7 @@ private void handleFailure(Exception e) {
*/
private void sendResponse(Sourcetransformer.SourceTransformResponse transformResponse) {
responseObserver.onNext(transformResponse);
activeTransformersCount--;
}

/**
Expand All @@ -155,13 +165,24 @@ private void sendResponse(Sourcetransformer.SourceTransformResponse transformRes
* @param transformRequest The SourceTransformRequest to be processed.
*/
private void processRequest(Sourcetransformer.SourceTransformRequest transformRequest) {
if (userException != null) {
log.info("a previous transformer actor failed, not processing any more requests");

Check warning on line 169 in src/main/java/io/numaproj/numaflow/sourcetransformer/TransformSupervisorActor.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/sourcetransformer/TransformSupervisorActor.java#L169

Added line #L169 was not covered by tests
if (activeTransformersCount == 0) {
log.info("there is no more active transformer AKKA actors - stopping the system");
getContext().getSystem().stop(getSelf());
log.info("AKKA system stopped");
shutdownSignal.completeExceptionally(userException);

Check warning on line 174 in src/main/java/io/numaproj/numaflow/sourcetransformer/TransformSupervisorActor.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/sourcetransformer/TransformSupervisorActor.java#L171-L174

Added lines #L171 - L174 were not covered by tests
}
return;

Check warning on line 176 in src/main/java/io/numaproj/numaflow/sourcetransformer/TransformSupervisorActor.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/sourcetransformer/TransformSupervisorActor.java#L176

Added line #L176 was not covered by tests
}
// Create a TransformerActor for each incoming request.
ActorRef transformerActor = getContext()
.actorOf(TransformerActor.props(
transformer));

// Send the message to the TransformerActor.
transformerActor.tell(transformRequest, getSelf());
activeTransformersCount++;
}

/**
Expand All @@ -170,10 +191,10 @@ private void processRequest(Sourcetransformer.SourceTransformRequest transformRe
* @param deadLetter The dead letter to be handled.
*/
private void handleDeadLetters(AllDeadLetters deadLetter) {
log.debug("got a dead letter, stopping the execution");
shutdownSignal.completeExceptionally(new Throwable("dead letters"));
responseObserver.onError(Status.UNKNOWN.withDescription("dead letters").asException());
log.info("got a dead letter, stopping the execution");
responseObserver.onError(Status.INTERNAL.withDescription("dead letters").asException());

Check warning on line 195 in src/main/java/io/numaproj/numaflow/sourcetransformer/TransformSupervisorActor.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/sourcetransformer/TransformSupervisorActor.java#L194-L195

Added lines #L194 - L195 were not covered by tests
getContext().getSystem().stop(getSelf());
shutdownSignal.completeExceptionally(new Throwable("dead letters"));

Check warning on line 197 in src/main/java/io/numaproj/numaflow/sourcetransformer/TransformSupervisorActor.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/sourcetransformer/TransformSupervisorActor.java#L197

Added line #L197 was not covered by tests
}

/**
Expand All @@ -187,8 +208,7 @@ public SupervisorStrategy supervisorStrategy() {
return new AllForOneStrategy(
DeciderBuilder
.match(Exception.class, e -> {
shutdownSignal.completeExceptionally(e);
responseObserver.onError(Status.UNKNOWN
responseObserver.onError(Status.INTERNAL

Check warning on line 211 in src/main/java/io/numaproj/numaflow/sourcetransformer/TransformSupervisorActor.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/sourcetransformer/TransformSupervisorActor.java#L211

Added line #L211 was not covered by tests
.withDescription(e.getMessage())
.withCause(e)
.asException());
Expand Down

0 comments on commit a675025

Please sign in to comment.