Skip to content

chore: Map Stream to Support Concurrent Requests #160

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Feb 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@
// if we see dead letters, we need to stop the execution and exit
// to make sure no messages are lost
private void handleDeadLetters(AllDeadLetters deadLetter) {
log.debug("got a dead letter, stopping the execution");
log.error("got a dead letter, stopping the execution");

Check warning on line 147 in src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java#L147

Added line #L147 was not covered by tests
responseObserver.onError(Status.INTERNAL.withDescription("dead letters").asException());
getContext().getSystem().stop(getSelf());
shutdownSignal.completeExceptionally(new Throwable("dead letters"));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package io.numaproj.numaflow.mapstreamer;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.AllDeadLetters;
import akka.actor.AllForOneStrategy;
import akka.actor.Props;
import akka.actor.SupervisorStrategy;
import akka.japi.pf.DeciderBuilder;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import io.numaproj.numaflow.map.v1.MapOuterClass;
import lombok.extern.slf4j.Slf4j;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;

/**
* MapStreamSupervisorActor is responsible for managing MapStreamerActor instances and handling failures.
* It creates a MapStreamerActor for each incoming MapRequest and listens to their responses.
* <p>
* MapStreamSupervisorActor
* │
* ├── Creates MapStreamerActor instances for each incoming MapRequest
* │ │
* │ ├── MapStreamerActor 1
* │ │ ├── Processes MapRequest
* │ │ ├── Sends results/errors to MapStreamSupervisorActor
* │ │ └── Stops itself after processing
* │ │
* │ ├── MapStreamerActor 2
* │ │ ├── Processes MapRequest
* │ │ ├── Sends results/errors to MapStreamSupervisorActor
* │ │ └── Stops itself after processing
* │ │
* ├── Listens to responses and errors from the MapStreamerActor instances➝➝
* │ ├── On receiving a result, forwards it to the gRPC client via StreamObserver
* │ ├── On error, forwards the error to the gRPC client and initiates shutdown
* │
* ├── Uses AllForOneStrategy for supervising children actors.
* │ ├── On any MapStreamerActor failure, stops all child actors and resumes by restarting.
* <p>
* Note: After all the output messages are streamed to the client, we send an EOF message to
* indicate the end of the stream to the client.
*/
@Slf4j
class MapStreamSupervisorActor extends AbstractActor {

private final MapStreamer mapStreamer;
private final StreamObserver<MapOuterClass.MapResponse> responseObserver;
private final CompletableFuture<Void> shutdownSignal;
private int activeMapStreamersCount;
private Exception userException;

public MapStreamSupervisorActor(
MapStreamer mapStreamer,
StreamObserver<MapOuterClass.MapResponse> responseObserver,
CompletableFuture<Void> failureFuture) {
this.mapStreamer = mapStreamer;
this.responseObserver = responseObserver;
this.shutdownSignal = failureFuture;
this.userException = null;
this.activeMapStreamersCount = 0;
}

public static Props props(
MapStreamer mapStreamer,
StreamObserver<MapOuterClass.MapResponse> responseObserver,
CompletableFuture<Void> shutdownSignal) {
return Props.create(
MapStreamSupervisorActor.class,
() -> new MapStreamSupervisorActor(mapStreamer, responseObserver, shutdownSignal));
}

@Override
public void preRestart(Throwable reason, Optional<Object> message) {
getContext()
.getSystem()
.log()
.warning("supervisor pre restart due to: {}", reason.getMessage());
shutdownSignal.completeExceptionally(reason);
responseObserver.onError(Status.INTERNAL
.withDescription(reason.getMessage())
.withCause(reason)
.asException());
}

Check warning on line 86 in src/main/java/io/numaproj/numaflow/mapstreamer/MapStreamSupervisorActor.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/mapstreamer/MapStreamSupervisorActor.java#L77-L86

Added lines #L77 - L86 were not covered by tests

// if we see dead letters, we need to stop the execution and exit
// to make sure no messages are lost
private void handleDeadLetters(AllDeadLetters deadLetter) {
log.error("got a dead letter, stopping the execution");
responseObserver.onError(Status.INTERNAL.withDescription("dead letters").asException());
getContext().getSystem().stop(getSelf());
shutdownSignal.completeExceptionally(new Throwable("dead letters"));
}

Check warning on line 95 in src/main/java/io/numaproj/numaflow/mapstreamer/MapStreamSupervisorActor.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/mapstreamer/MapStreamSupervisorActor.java#L91-L95

Added lines #L91 - L95 were not covered by tests

@Override
public void postStop() {
getContext().getSystem().log().debug("post stop - {}", getSelf().toString());
}

Check warning on line 100 in src/main/java/io/numaproj/numaflow/mapstreamer/MapStreamSupervisorActor.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/mapstreamer/MapStreamSupervisorActor.java#L99-L100

Added lines #L99 - L100 were not covered by tests

@Override
public Receive createReceive() {
return receiveBuilder()
.match(MapOuterClass.MapRequest.class, this::processRequest)
.match(MapOuterClass.MapResponse.class, this::sendResponse)
.match(Exception.class, this::handleFailure)
.match(AllDeadLetters.class, this::handleDeadLetters)
.build();
}

private void handleFailure(Exception e) {
getContext().getSystem().log().error("Encountered error in mapStreamFn", e);
if (userException == null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: && e != null

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handleFailure will only be invoked when there is an exception in the code.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but I don't think we shouldn't assume caller behaviour when we implement a method.

userException = e;
responseObserver.onError(Status.INTERNAL
.withDescription(e.getMessage())
.withCause(e)
.asException());
}
activeMapStreamersCount--;
}

private void sendResponse(MapOuterClass.MapResponse mapResponse) {
responseObserver.onNext(mapResponse);
activeMapStreamersCount--;
}

private void processRequest(MapOuterClass.MapRequest mapRequest) {
if (userException != null) {
getContext()
.getSystem()
.log()
.info("Previous mapStreamer actor failed, not processing further requests");

Check warning on line 134 in src/main/java/io/numaproj/numaflow/mapstreamer/MapStreamSupervisorActor.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/mapstreamer/MapStreamSupervisorActor.java#L131-L134

Added lines #L131 - L134 were not covered by tests
if (activeMapStreamersCount == 0) {
getContext().getSystem().log().info("No active mapStreamer actors, shutting down");
getContext().getSystem().terminate();
shutdownSignal.completeExceptionally(userException);

Check warning on line 138 in src/main/java/io/numaproj/numaflow/mapstreamer/MapStreamSupervisorActor.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/mapstreamer/MapStreamSupervisorActor.java#L136-L138

Added lines #L136 - L138 were not covered by tests
}
return;

Check warning on line 140 in src/main/java/io/numaproj/numaflow/mapstreamer/MapStreamSupervisorActor.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/mapstreamer/MapStreamSupervisorActor.java#L140

Added line #L140 was not covered by tests
}

ActorRef mapStreamerActor = getContext().actorOf(MapStreamerActor.props(
mapStreamer));
mapStreamerActor.tell(mapRequest, getSelf());
activeMapStreamersCount++;
}

@Override
public SupervisorStrategy supervisorStrategy() {
return new AllForOneStrategy(
DeciderBuilder.match(Exception.class, e -> {
shutdownSignal.completeExceptionally(e);
responseObserver.onError(Status.INTERNAL
.withDescription(e.getMessage())
.withCause(e)
.asException());
return SupervisorStrategy.stop();

Check warning on line 158 in src/main/java/io/numaproj/numaflow/mapstreamer/MapStreamSupervisorActor.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/mapstreamer/MapStreamSupervisorActor.java#L153-L158

Added lines #L153 - L158 were not covered by tests
}).build()
);
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package io.numaproj.numaflow.mapstreamer;

import akka.actor.AbstractActor;
import akka.actor.Props;
import akka.japi.pf.ReceiveBuilder;
import io.numaproj.numaflow.map.v1.MapOuterClass;

import java.time.Instant;

/**
* MapStreamerActor processes individual requests.
* Upon processing, it sends the results or errors back to the MapStreamSupervisorActor.
* It stops itself after processing the request.
*/
class MapStreamerActor extends AbstractActor {

private final MapStreamer mapStreamer;

public MapStreamerActor(MapStreamer mapStreamer) {
this.mapStreamer = mapStreamer;
}

public static Props props(MapStreamer mapStreamer) {
return Props.create(MapStreamerActor.class, mapStreamer);
}

@Override
public Receive createReceive() {
return ReceiveBuilder.create()
.match(MapOuterClass.MapRequest.class, this::processRequest)
.build();
}

private void processRequest(MapOuterClass.MapRequest mapRequest) {
HandlerDatum handlerDatum = new HandlerDatum(
mapRequest.getRequest().getValue().toByteArray(),
Instant.ofEpochSecond(
mapRequest.getRequest().getWatermark().getSeconds(),
mapRequest.getRequest().getWatermark().getNanos()),
Instant.ofEpochSecond(
mapRequest.getRequest().getEventTime().getSeconds(),
mapRequest.getRequest().getEventTime().getNanos()),
mapRequest.getRequest().getHeadersMap()
);

String[] keys = mapRequest.getRequest().getKeysList().toArray(new String[0]);

try {
OutputObserverImpl outputObserver = new OutputObserverImpl(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
OutputObserverImpl outputObserver = new OutputObserverImpl(
OutputObserver outputObserver = new OutputObserverImpl(

We can move sendEOF() to MapStreamerActor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MapStreamerActor won't have that context, I prefer to do it here because the processMessage is a blocking call and immediately after the processing is done we are sending EOF. I don't anything wrong here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

won't have that context.

Do you mean requestId and supervisor actor ref? if so, I think MapStreamerActor has the context of both.

I was proposing that ObserverImpl only implements methods the interface defines but I am also ok with having an extra sendEOF.

getSender(),
mapRequest.getId());
mapStreamer.processMessage(keys, handlerDatum, outputObserver);
// send eof response
outputObserver.sendEOF();
} catch (Exception e) {
getSender().tell(e, getSelf());
}
context().stop(getSelf());
}
}
Original file line number Diff line number Diff line change
@@ -1,29 +1,33 @@
package io.numaproj.numaflow.mapstreamer;

import akka.actor.ActorRef;
import com.google.protobuf.ByteString;
import io.grpc.stub.StreamObserver;
import io.numaproj.numaflow.map.v1.MapOuterClass;
import lombok.AllArgsConstructor;

import java.util.ArrayList;
import java.util.List;

/**
* OutputObserverImpl is the implementation of the OutputObserver interface.
* It is used to send messages to the gRPC client when the send method is called.
* Implementation of the OutputObserver interface.
* It sends messages to the supervisor actor when the send method is called.
* <p>
* We create a new output observer for every map stream invocation, but they
* all forward the response to a common actor (supervisor) who will send the
* responses back to the client. We cannot directly write to the gRPC stream
* from the output observer because the gRPC stream observer is not thread
* safe, whereas writing to an actor is thread safe, and only one actor will
* write the responses back to the client.
*/
@AllArgsConstructor
class OutputObserverImpl implements OutputObserver {
StreamObserver<MapOuterClass.MapResponse> responseObserver;
public class OutputObserverImpl implements OutputObserver {
private final ActorRef supervisorActor;
private final String requestID;

@Override
public void send(Message message) {
MapOuterClass.MapResponse response = buildResponse(message);
responseObserver.onNext(response);
}

private MapOuterClass.MapResponse buildResponse(Message message) {
return MapOuterClass.MapResponse.newBuilder()
MapOuterClass.MapResponse response = MapOuterClass.MapResponse.newBuilder()
.setId(requestID)
.addResults(MapOuterClass.MapResponse.Result.newBuilder()
.setValue(
message.getValue() == null ? ByteString.EMPTY : ByteString.copyFrom(
Expand All @@ -33,5 +37,14 @@ private MapOuterClass.MapResponse buildResponse(Message message) {
.addAllTags(message.getTags()
== null ? new ArrayList<>() : List.of(message.getTags()))
.build()).build();
supervisorActor.tell(response, ActorRef.noSender());
}

public void sendEOF() {
supervisorActor.tell(MapOuterClass.MapResponse
.newBuilder()
.setId(requestID)
.setStatus(MapOuterClass.TransmissionStatus.newBuilder().setEot(true).build())
.build(), ActorRef.noSender());
}
}
Loading
Loading