Skip to content
18 changes: 18 additions & 0 deletions examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@
<goal>dockerBuild</goal>
</goals>
<configuration>
<from>
<image>amazoncorretto:11</image>
</from>
<container>
<mainClass>
io.numaproj.numaflow.examples.batchmap.flatmap.BatchFlatMap
Expand All @@ -74,6 +77,9 @@
<goal>dockerBuild</goal>
</goals>
<configuration>
<from>
<image>amazoncorretto:11</image>
</from>
<container>
<mainClass>
io.numaproj.numaflow.examples.sourcetransformer.eventtimefilter.EventTimeFilterFunction
Expand All @@ -93,6 +99,9 @@
<goal>dockerBuild</goal>
</goals>
<configuration>
<from>
<image>amazoncorretto:11</image>
</from>
<container>
<mainClass>
io.numaproj.numaflow.examples.mapstream.flatmapstream.FlatMapStreamFunction
Expand All @@ -110,6 +119,9 @@
<goal>dockerBuild</goal>
</goals>
<configuration>
<from>
<image>amazoncorretto:11</image>
</from>
<container>
<mainClass>
io.numaproj.numaflow.examples.map.flatmap.FlatMapFunction
Expand Down Expand Up @@ -195,6 +207,9 @@
<goal>dockerBuild</goal>
</goals>
<configuration>
<from>
<image>amazoncorretto:11</image>
</from>
<container>
<mainClass>
io.numaproj.numaflow.examples.map.forward.ForwardFunction
Expand Down Expand Up @@ -266,6 +281,9 @@
<goal>dockerBuild</goal>
</goals>
<configuration>
<from>
<image>amazoncorretto:11</image>
</from>
<container>
<mainClass>
io.numaproj.numaflow.examples.source.simple.SimpleSource
Expand Down
50 changes: 39 additions & 11 deletions src/main/java/io/numaproj/numaflow/batchmapper/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
import io.numaproj.numaflow.info.ContainerType;
import io.numaproj.numaflow.info.ServerInfoAccessor;
import io.numaproj.numaflow.info.ServerInfoAccessorImpl;
import io.numaproj.numaflow.shared.GrpcServerHelper;
import io.numaproj.numaflow.shared.GrpcServerUtils;
import lombok.extern.slf4j.Slf4j;

import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

/**
Expand All @@ -20,8 +22,10 @@

private final GRPCConfig grpcConfig;
private final Service service;
private final CompletableFuture<Void> shutdownSignal;
private final ServerInfoAccessor serverInfoAccessor = new ServerInfoAccessorImpl(new ObjectMapper());
private io.grpc.Server server;
private final GrpcServerHelper grpcServerHelper;

/**
* constructor to create sink gRPC server.
Expand All @@ -39,8 +43,10 @@
* @param batchMapper to process the message
*/
public Server(BatchMapper batchMapper, GRPCConfig grpcConfig) {
this.service = new Service(batchMapper);
this.shutdownSignal = new CompletableFuture<>();
this.service = new Service(batchMapper, this.shutdownSignal);
this.grpcConfig = grpcConfig;
this.grpcServerHelper = new GrpcServerHelper();
}

/**
Expand All @@ -57,35 +63,55 @@
Collections.singletonMap(Constants.MAP_MODE_KEY, Constants.MAP_MODE));

if (this.server == null) {
// create server builder
ServerBuilder<?> serverBuilder = GrpcServerUtils.createServerBuilder(
this.server = grpcServerHelper.createServer(

Check warning on line 66 in src/main/java/io/numaproj/numaflow/batchmapper/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/batchmapper/Server.java#L66

Added line #L66 was not covered by tests
grpcConfig.getSocketPath(),
grpcConfig.getMaxMessageSize(),
grpcConfig.isLocal(),
grpcConfig.getPort());
// build server
this.server = serverBuilder
.addService(this.service)
.build();
grpcConfig.getPort(),

Check warning on line 70 in src/main/java/io/numaproj/numaflow/batchmapper/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/batchmapper/Server.java#L70

Added line #L70 was not covered by tests
this.service);
}

// start server
server.start();

log.info(
"Server started, listening on socket path: " + grpcConfig.getSocketPath());
"server started, listening on socket path: " + grpcConfig.getSocketPath());

// register shutdown hook
// register shutdown hook to gracefully shut down the server
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
System.err.println("*** shutting down gRPC server since JVM is shutting down");
if (server != null && server.isTerminated()) {
return;
}
try {
Server.this.stop();
log.info("gracefully shutting down event loop groups");
this.grpcServerHelper.gracefullyShutdownEventLoopGroups();

Check warning on line 89 in src/main/java/io/numaproj/numaflow/batchmapper/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/batchmapper/Server.java#L88-L89

Added lines #L88 - L89 were not covered by tests
} catch (InterruptedException e) {
Thread.interrupted();
e.printStackTrace(System.err);
}
}));

// if there are any exceptions, shutdown the server gracefully.
shutdownSignal.whenCompleteAsync((v, e) -> {
if (server != null && server.isTerminated()) {
return;

Check warning on line 99 in src/main/java/io/numaproj/numaflow/batchmapper/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/batchmapper/Server.java#L99

Added line #L99 was not covered by tests
}

if (e != null) {
System.err.println("*** shutting down batch map gRPC server because of an exception - " + e.getMessage());
try {
log.info("stopping server");
Server.this.stop();
log.info("gracefully shutting down event loop groups");
this.grpcServerHelper.gracefullyShutdownEventLoopGroups();
} catch (InterruptedException ex) {
Thread.interrupted();
ex.printStackTrace(System.err);

Check warning on line 111 in src/main/java/io/numaproj/numaflow/batchmapper/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/batchmapper/Server.java#L109-L111

Added lines #L109 - L111 were not covered by tests
}
}
});
}

/**
Expand All @@ -96,7 +122,9 @@
* @throws InterruptedException if the current thread is interrupted while waiting
*/
public void awaitTermination() throws InterruptedException {
log.info("batch map server is waiting for termination");

Check warning on line 125 in src/main/java/io/numaproj/numaflow/batchmapper/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/batchmapper/Server.java#L125

Added line #L125 was not covered by tests
server.awaitTermination();
log.info("batch map server has terminated");

Check warning on line 127 in src/main/java/io/numaproj/numaflow/batchmapper/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/batchmapper/Server.java#L127

Added line #L127 was not covered by tests
}

/**
Expand Down
17 changes: 11 additions & 6 deletions src/main/java/io/numaproj/numaflow/batchmapper/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
// BatchMapper instance to process the messages
private final BatchMapper batchMapper;

// Signal to shut down the gRPC server
private final CompletableFuture<Void> shutdownSignal;

// Applies a map function to each datum element in the stream.
@Override
public StreamObserver<MapOuterClass.MapRequest> mapFn(StreamObserver<MapOuterClass.MapResponse> responseObserver) {
Expand Down Expand Up @@ -93,8 +96,9 @@
datumStream.writeMessage(constructHandlerDatum(mapRequest));
}
} catch (Exception e) {
log.error("Encountered an error in batch map", e);
responseObserver.onError(Status.UNKNOWN
log.error("Encountered an error in batch map onNext - {}", e.getMessage());
shutdownSignal.completeExceptionally(e);
responseObserver.onError(Status.INTERNAL
.withDescription(e.getMessage())
.withCause(e)
.asException());
Expand All @@ -104,11 +108,12 @@
// Called when an error occurs
@Override
public void onError(Throwable throwable) {
log.error("Error Encountered in batchMap Stream", throwable);
var status = Status.UNKNOWN
log.error("Error Encountered in batchMap Stream - {}", throwable.getMessage());
shutdownSignal.completeExceptionally(throwable);
responseObserver.onError(Status.INTERNAL

Check warning on line 113 in src/main/java/io/numaproj/numaflow/batchmapper/Service.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/batchmapper/Service.java#L111-L113

Added lines #L111 - L113 were not covered by tests
.withDescription(throwable.getMessage())
.withCause(throwable);
responseObserver.onError(status.asException());
.withCause(throwable)
.asException());

Check warning on line 116 in src/main/java/io/numaproj/numaflow/batchmapper/Service.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/batchmapper/Service.java#L115-L116

Added lines #L115 - L116 were not covered by tests
}

// Called when the client has finished sending requests
Expand Down
56 changes: 40 additions & 16 deletions src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,33 +46,38 @@
class MapSupervisorActor extends AbstractActor {
private final Mapper mapper;
private final StreamObserver<MapOuterClass.MapResponse> responseObserver;
private final CompletableFuture<Void> failureFuture;
private final CompletableFuture<Void> shutdownSignal;
private int activeMapperCount;
private Exception userException;

public MapSupervisorActor(
Mapper mapper,
StreamObserver<MapOuterClass.MapResponse> responseObserver,
CompletableFuture<Void> failureFuture) {
this.mapper = mapper;
this.responseObserver = responseObserver;
this.failureFuture = failureFuture;
this.shutdownSignal = failureFuture;
this.userException = null;
this.activeMapperCount = 0;
}

public static Props props(
Mapper mapper,
StreamObserver<MapOuterClass.MapResponse> responseObserver,
CompletableFuture<Void> failureFuture) {
return Props.create(MapSupervisorActor.class, mapper, responseObserver, failureFuture);
CompletableFuture<Void> shutdownSignal) {
return Props.create(MapSupervisorActor.class, mapper, responseObserver, shutdownSignal);
}

@Override
public void preRestart(Throwable reason, Optional<Object> message) {
log.debug("supervisor pre restart was executed");
failureFuture.completeExceptionally(reason);
responseObserver.onError(Status.UNKNOWN
getContext().getSystem().log().warning("supervisor pre restart was executed due to: {}", reason.getMessage());
shutdownSignal.completeExceptionally(reason);
responseObserver.onError(Status.INTERNAL

Check warning on line 75 in src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java#L73-L75

Added lines #L73 - L75 were not covered by tests
.withDescription(reason.getMessage())
.withCause(reason)
.asException());
Service.mapperActorSystem.stop(getSelf());
shutdownSignal.completeExceptionally(reason);

Check warning on line 80 in src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java#L80

Added line #L80 was not covered by tests
}

@Override
Expand All @@ -93,34 +98,53 @@
}

private void handleFailure(Exception e) {
responseObserver.onError(Status.UNKNOWN
.withDescription(e.getMessage())
.withCause(e)
.asException());
failureFuture.completeExceptionally(e);
log.error("Encountered error in mapFn - {}", e.getMessage());
if (userException == null) {
userException = e;
// only send the very first exception to the client
// one exception should trigger a container restart
responseObserver.onError(Status.INTERNAL
.withDescription(e.getMessage())
.withCause(e)
.asException());
}
activeMapperCount--;
}

private void sendResponse(MapOuterClass.MapResponse mapResponse) {
responseObserver.onNext(mapResponse);
activeMapperCount--;
}

private void processRequest(MapOuterClass.MapRequest mapRequest) {
if (userException != null) {
log.info("a previous mapper actor failed, not processing any more requests");

Check warning on line 121 in src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java#L121

Added line #L121 was not covered by tests
if (activeMapperCount == 0) {
log.info("there is no more active mapper AKKA actors - stopping the system");
getContext().getSystem().stop(getSelf());
log.info("AKKA system stopped");
shutdownSignal.completeExceptionally(userException);

Check warning on line 126 in src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java#L123-L126

Added lines #L123 - L126 were not covered by tests
}
return;

Check warning on line 128 in src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java#L128

Added line #L128 was not covered by tests
}

// Create a MapperActor for each incoming request.
ActorRef mapperActor = getContext()
.actorOf(MapperActor.props(
mapper));

// Send the message to the MapperActor.
mapperActor.tell(mapRequest, getSelf());
activeMapperCount++;
}

// if we see dead letters, we need to stop the execution and exit
// to make sure no messages are lost
private void handleDeadLetters(AllDeadLetters deadLetter) {
log.debug("got a dead letter, stopping the execution");
responseObserver.onError(Status.UNKNOWN.withDescription("dead letters").asException());
failureFuture.completeExceptionally(new Throwable("dead letters"));
responseObserver.onError(Status.INTERNAL.withDescription("dead letters").asException());

Check warning on line 145 in src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java#L145

Added line #L145 was not covered by tests
getContext().getSystem().stop(getSelf());
shutdownSignal.completeExceptionally(new Throwable("dead letters"));

Check warning on line 147 in src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java#L147

Added line #L147 was not covered by tests
}

@Override
Expand All @@ -129,8 +153,8 @@
return new AllForOneStrategy(
DeciderBuilder
.match(Exception.class, e -> {
failureFuture.completeExceptionally(e);
responseObserver.onError(Status.UNKNOWN
shutdownSignal.completeExceptionally(e);
responseObserver.onError(Status.INTERNAL

Check warning on line 157 in src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java#L156-L157

Added lines #L156 - L157 were not covered by tests
.withDescription(e.getMessage())
.withCause(e)
.asException());
Expand Down
Loading
Loading