Skip to content

Commit 2cba8b9

Browse files
yhl25KeranYang
authored andcommitted
feat: use gRPC Bidirectional Streaming for Mapper and Transformer (numaproj#143)
Signed-off-by: Yashash H L <[email protected]>
1 parent c471b24 commit 2cba8b9

30 files changed

+1250
-325
lines changed

src/main/java/io/numaproj/numaflow/batchmapper/Service.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public StreamObserver<Batchmap.BatchMapRequest> batchMapFn(StreamObserver<Batchm
4848
Future<BatchResponses> result = batchMapTaskExecutor.submit(() -> this.batchMapper.processMessage(
4949
datumStream));
5050

51-
return new StreamObserver<Batchmap.BatchMapRequest>() {
51+
return new StreamObserver<>() {
5252
@Override
5353
public void onNext(Batchmap.BatchMapRequest mapRequest) {
5454
try {

src/main/java/io/numaproj/numaflow/mapper/Constants.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,6 @@ class Constants {
1414
public static final String MAP_MODE_KEY = "MAP_MODE";
1515

1616
public static final String MAP_MODE = "unary-map";
17+
18+
public static final String EOF = "EOF";
1719
}
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
package io.numaproj.numaflow.mapper;
2+
3+
import akka.actor.AbstractActor;
4+
import akka.actor.ActorRef;
5+
import akka.actor.AllDeadLetters;
6+
import akka.actor.AllForOneStrategy;
7+
import akka.actor.Props;
8+
import akka.actor.SupervisorStrategy;
9+
import akka.japi.pf.DeciderBuilder;
10+
import akka.japi.pf.ReceiveBuilder;
11+
import io.grpc.Status;
12+
import io.grpc.stub.StreamObserver;
13+
import io.numaproj.numaflow.map.v1.MapOuterClass;
14+
import lombok.extern.slf4j.Slf4j;
15+
16+
import java.util.Optional;
17+
import java.util.concurrent.CompletableFuture;
18+
19+
/**
20+
* MapSupervisorActor actor is responsible for distributing the messages to actors and handling failure.
21+
* It creates a MapperActor for each incoming request and listens to the responses from the MapperActor.
22+
* <p>
23+
* MapSupervisorActor
24+
* │
25+
* ├── Creates MapperActor instances for each incoming MapRequest
26+
* │ │
27+
* │ ├── MapperActor 1
28+
* │ │ ├── Processes MapRequest
29+
* │ │ ├── Sends MapResponse to MapSupervisorActor
30+
* │ │ └── Stops itself after processing
31+
* │ │
32+
* │ ├── MapperActor 2
33+
* │ │ ├── Processes MapRequest
34+
* │ │ ├── Sends MapResponse to MapSupervisorActor
35+
* │ │ └── Stops itself after processing
36+
* │ │
37+
* ├── Listens to the responses from the MapperActor instances
38+
* │ ├── On receiving a MapResponse, writes the response back to the client
39+
* │
40+
* ├── If any MapperActor fails (throws an exception):
41+
* │ ├── Sends the exception back to the client
42+
* │ ├── Initiates a shutdown by completing the CompletableFuture exceptionally
43+
* │ └── Stops all child actors (AllForOneStrategy)
44+
*/
45+
@Slf4j
46+
class MapSupervisorActor extends AbstractActor {
47+
private final Mapper mapper;
48+
private final StreamObserver<MapOuterClass.MapResponse> responseObserver;
49+
private final CompletableFuture<Void> failureFuture;
50+
51+
public MapSupervisorActor(
52+
Mapper mapper,
53+
StreamObserver<MapOuterClass.MapResponse> responseObserver,
54+
CompletableFuture<Void> failureFuture) {
55+
this.mapper = mapper;
56+
this.responseObserver = responseObserver;
57+
this.failureFuture = failureFuture;
58+
}
59+
60+
public static Props props(
61+
Mapper mapper,
62+
StreamObserver<MapOuterClass.MapResponse> responseObserver,
63+
CompletableFuture<Void> failureFuture) {
64+
return Props.create(MapSupervisorActor.class, mapper, responseObserver, failureFuture);
65+
}
66+
67+
@Override
68+
public void preRestart(Throwable reason, Optional<Object> message) {
69+
log.debug("supervisor pre restart was executed");
70+
failureFuture.completeExceptionally(reason);
71+
responseObserver.onError(Status.UNKNOWN
72+
.withDescription(reason.getMessage())
73+
.withCause(reason)
74+
.asException());
75+
Service.mapperActorSystem.stop(getSelf());
76+
}
77+
78+
@Override
79+
public void postStop() {
80+
log.debug("post stop of supervisor executed - {}", getSelf().toString());
81+
}
82+
83+
@Override
84+
public Receive createReceive() {
85+
return ReceiveBuilder
86+
.create()
87+
.match(MapOuterClass.MapRequest.class, this::processRequest)
88+
.match(MapOuterClass.MapResponse.class, this::sendResponse)
89+
.match(Exception.class, this::handleFailure)
90+
.match(AllDeadLetters.class, this::handleDeadLetters)
91+
.match(String.class, eof -> responseObserver.onCompleted())
92+
.build();
93+
}
94+
95+
private void handleFailure(Exception e) {
96+
responseObserver.onError(Status.UNKNOWN
97+
.withDescription(e.getMessage())
98+
.withCause(e)
99+
.asException());
100+
failureFuture.completeExceptionally(e);
101+
}
102+
103+
private void sendResponse(MapOuterClass.MapResponse mapResponse) {
104+
responseObserver.onNext(mapResponse);
105+
}
106+
107+
private void processRequest(MapOuterClass.MapRequest mapRequest) {
108+
// Create a MapperActor for each incoming request.
109+
ActorRef mapperActor = getContext()
110+
.actorOf(MapperActor.props(
111+
mapper));
112+
113+
// Send the message to the MapperActor.
114+
mapperActor.tell(mapRequest, getSelf());
115+
}
116+
117+
// if we see dead letters, we need to stop the execution and exit
118+
// to make sure no messages are lost
119+
private void handleDeadLetters(AllDeadLetters deadLetter) {
120+
log.debug("got a dead letter, stopping the execution");
121+
responseObserver.onError(Status.UNKNOWN.withDescription("dead letters").asException());
122+
failureFuture.completeExceptionally(new Throwable("dead letters"));
123+
getContext().getSystem().stop(getSelf());
124+
}
125+
126+
@Override
127+
public SupervisorStrategy supervisorStrategy() {
128+
// we want to stop all child actors in case of any exception
129+
return new AllForOneStrategy(
130+
DeciderBuilder
131+
.match(Exception.class, e -> {
132+
failureFuture.completeExceptionally(e);
133+
responseObserver.onError(Status.UNKNOWN
134+
.withDescription(e.getMessage())
135+
.withCause(e)
136+
.asException());
137+
return SupervisorStrategy.stop();
138+
})
139+
.build()
140+
);
141+
}
142+
}
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
package io.numaproj.numaflow.mapper;
2+
3+
import akka.actor.AbstractActor;
4+
import akka.actor.Props;
5+
import akka.japi.pf.ReceiveBuilder;
6+
import com.google.protobuf.ByteString;
7+
import io.numaproj.numaflow.map.v1.MapOuterClass;
8+
9+
import java.time.Instant;
10+
import java.util.ArrayList;
11+
import java.util.List;
12+
13+
/**
14+
* Mapper actor that processes the map request. It invokes the mapper to process the request and
15+
* sends the response back to the sender actor(MapSupervisorActor). In case of any exception, it
16+
* sends the exception back to the sender actor. It stops itself after processing the request.
17+
*/
18+
class MapperActor extends AbstractActor {
19+
private final Mapper mapper;
20+
21+
public MapperActor(Mapper mapper) {
22+
this.mapper = mapper;
23+
}
24+
25+
public static Props props(Mapper mapper) {
26+
return Props.create(MapperActor.class, mapper);
27+
}
28+
29+
@Override
30+
public Receive createReceive() {
31+
return ReceiveBuilder.create()
32+
.match(MapOuterClass.MapRequest.class, this::processRequest)
33+
.build();
34+
}
35+
36+
/**
37+
* Process the map request and send the response back to the sender actor.
38+
*
39+
* @param mapRequest map request
40+
*/
41+
private void processRequest(MapOuterClass.MapRequest mapRequest) {
42+
Datum handlerDatum = new HandlerDatum(
43+
mapRequest.getRequest().getValue().toByteArray(),
44+
Instant.ofEpochSecond(
45+
mapRequest.getRequest().getWatermark().getSeconds(),
46+
mapRequest.getRequest().getWatermark().getNanos()),
47+
Instant.ofEpochSecond(
48+
mapRequest.getRequest().getEventTime().getSeconds(),
49+
mapRequest.getRequest().getEventTime().getNanos()),
50+
mapRequest.getRequest().getHeadersMap()
51+
);
52+
String[] keys = mapRequest.getRequest().getKeysList().toArray(new String[0]);
53+
try {
54+
MessageList resultMessages = this.mapper.processMessage(keys, handlerDatum);
55+
MapOuterClass.MapResponse response = buildResponse(resultMessages, mapRequest.getId());
56+
getSender().tell(response, getSelf());
57+
} catch (Exception e) {
58+
getSender().tell(e, getSelf());
59+
}
60+
context().stop(getSelf());
61+
}
62+
63+
/**
64+
* Build the response from the message list.
65+
*
66+
* @param messageList message list
67+
*
68+
* @return map response
69+
*/
70+
private MapOuterClass.MapResponse buildResponse(MessageList messageList, String ID) {
71+
MapOuterClass.MapResponse.Builder responseBuilder = MapOuterClass
72+
.MapResponse
73+
.newBuilder();
74+
75+
messageList.getMessages().forEach(message -> {
76+
responseBuilder.addResults(MapOuterClass.MapResponse.Result.newBuilder()
77+
.setValue(message.getValue() == null ? ByteString.EMPTY : ByteString.copyFrom(
78+
message.getValue()))
79+
.addAllKeys(message.getKeys()
80+
== null ? new ArrayList<>() : List.of(message.getKeys()))
81+
.addAllTags(message.getTags()
82+
== null ? new ArrayList<>() : List.of(message.getTags()))
83+
.build());
84+
});
85+
return responseBuilder.setId(ID).build();
86+
}
87+
}

0 commit comments

Comments
 (0)