Skip to content

Commit

Permalink
reducers
Browse files Browse the repository at this point in the history
Signed-off-by: Keran Yang <[email protected]>
  • Loading branch information
KeranYang committed Nov 6, 2024
1 parent c09d304 commit 7b77b6d
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 112 deletions.
26 changes: 15 additions & 11 deletions src/main/java/io/numaproj/numaflow/reducer/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.numaproj.numaflow.info.ContainerType;
import io.numaproj.numaflow.info.ServerInfoAccessor;
import io.numaproj.numaflow.info.ServerInfoAccessorImpl;
import io.numaproj.numaflow.shared.GrpcServerHelper;
import io.numaproj.numaflow.shared.GrpcServerUtils;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -21,6 +22,7 @@ public class Server {
private final Service service;
private final ServerInfoAccessor serverInfoAccessor = new ServerInfoAccessorImpl(new ObjectMapper());
private io.grpc.Server server;
private final GrpcServerHelper grpcServerHelper;

/**
* constructor to create gRPC server.
Expand All @@ -40,6 +42,7 @@ public Server(ReducerFactory<? extends Reducer> reducerFactory) {
public Server(ReducerFactory<? extends Reducer> reducerFactory, GRPCConfig grpcConfig) {
this.service = new Service(reducerFactory);
this.grpcConfig = grpcConfig;
this.grpcServerHelper = new GrpcServerHelper();
}

/**
Expand All @@ -57,33 +60,32 @@ public void start() throws Exception {
}

if (this.server == null) {
// create server builder
ServerBuilder<?> serverBuilder = GrpcServerUtils.createServerBuilder(
this.server = this.grpcServerHelper.createServer(

Check warning on line 63 in src/main/java/io/numaproj/numaflow/reducer/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/reducer/Server.java#L63

Added line #L63 was not covered by tests
grpcConfig.getSocketPath(),
grpcConfig.getMaxMessageSize(),
grpcConfig.isLocal(),
grpcConfig.getPort());

// build server
this.server = serverBuilder
.addService(this.service)
.build();
grpcConfig.getPort(),

Check warning on line 67 in src/main/java/io/numaproj/numaflow/reducer/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/reducer/Server.java#L67

Added line #L67 was not covered by tests
this.service);
}

// start server
server.start();

log.info(
"Server started, listening on {}",
"server started, listening on {}",
grpcConfig.isLocal() ?
"localhost:" + grpcConfig.getPort() : grpcConfig.getSocketPath());

// register shutdown hook
// register shutdown hook to gracefully shut down the server
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
System.err.println("*** shutting down gRPC server since JVM is shutting down");
if (server != null && server.isTerminated()) {
return;
}
try {
Server.this.stop();
log.info("gracefully shutting down event loop groups");
this.grpcServerHelper.gracefullyShutdownEventLoopGroups();

Check warning on line 88 in src/main/java/io/numaproj/numaflow/reducer/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/reducer/Server.java#L87-L88

Added lines #L87 - L88 were not covered by tests
} catch (InterruptedException e) {
Thread.interrupted();
e.printStackTrace(System.err);
Expand All @@ -99,7 +101,9 @@ public void start() throws Exception {
* @throws InterruptedException if the current thread is interrupted while waiting
*/
public void awaitTermination() throws InterruptedException {
log.info("reducer server is waiting for termination");

Check warning on line 104 in src/main/java/io/numaproj/numaflow/reducer/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/reducer/Server.java#L104

Added line #L104 was not covered by tests
server.awaitTermination();
log.info("reducer server has terminated");

Check warning on line 106 in src/main/java/io/numaproj/numaflow/reducer/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/reducer/Server.java#L106

Added line #L106 was not covered by tests
}

/**
Expand Down
26 changes: 15 additions & 11 deletions src/main/java/io/numaproj/numaflow/reducestreamer/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.numaproj.numaflow.info.ServerInfoAccessorImpl;
import io.numaproj.numaflow.reducestreamer.model.ReduceStreamer;
import io.numaproj.numaflow.reducestreamer.model.ReduceStreamerFactory;
import io.numaproj.numaflow.shared.GrpcServerHelper;
import io.numaproj.numaflow.shared.GrpcServerUtils;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -22,6 +23,7 @@ public class Server {
private final Service service;
private final ServerInfoAccessor serverInfoAccessor = new ServerInfoAccessorImpl(new ObjectMapper());
private io.grpc.Server server;
private final GrpcServerHelper grpcServerHelper;

/**
* constructor to create gRPC server.
Expand All @@ -43,6 +45,7 @@ public Server(
GRPCConfig grpcConfig) {
this.service = new Service(reduceStreamerFactory);
this.grpcConfig = grpcConfig;
this.grpcServerHelper = new GrpcServerHelper();
}

/**
Expand All @@ -60,33 +63,32 @@ public void start() throws Exception {
}

if (this.server == null) {
// create server builder
ServerBuilder<?> serverBuilder = GrpcServerUtils.createServerBuilder(
this.server = this.grpcServerHelper.createServer(

Check warning on line 66 in src/main/java/io/numaproj/numaflow/reducestreamer/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/reducestreamer/Server.java#L66

Added line #L66 was not covered by tests
grpcConfig.getSocketPath(),
grpcConfig.getMaxMessageSize(),
grpcConfig.isLocal(),
grpcConfig.getPort());

// build server
this.server = serverBuilder
.addService(this.service)
.build();
grpcConfig.getPort(),

Check warning on line 70 in src/main/java/io/numaproj/numaflow/reducestreamer/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/reducestreamer/Server.java#L70

Added line #L70 was not covered by tests
this.service);
}

// start server
server.start();

log.info(
"Server started, listening on {}",
"server started, listening on {}",
grpcConfig.isLocal() ?
"localhost:" + grpcConfig.getPort() : grpcConfig.getSocketPath());

// register shutdown hook
// register shutdown hook to gracefully shut down the server
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
System.err.println("*** shutting down gRPC server since JVM is shutting down");
if (server != null && server.isTerminated()) {
return;
}
try {
Server.this.stop();
log.info("gracefully shutting down event loop groups");
this.grpcServerHelper.gracefullyShutdownEventLoopGroups();

Check warning on line 91 in src/main/java/io/numaproj/numaflow/reducestreamer/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/reducestreamer/Server.java#L90-L91

Added lines #L90 - L91 were not covered by tests
} catch (InterruptedException e) {
Thread.interrupted();
e.printStackTrace(System.err);
Expand All @@ -102,7 +104,9 @@ public void start() throws Exception {
* @throws InterruptedException if the current thread is interrupted while waiting
*/
public void awaitTermination() throws InterruptedException {
log.info("reduce stream server is waiting for termination");

Check warning on line 107 in src/main/java/io/numaproj/numaflow/reducestreamer/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/reducestreamer/Server.java#L107

Added line #L107 was not covered by tests
server.awaitTermination();
log.info("reduce stream server terminated");

Check warning on line 109 in src/main/java/io/numaproj/numaflow/reducestreamer/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/reducestreamer/Server.java#L109

Added line #L109 was not covered by tests
}

/**
Expand Down
26 changes: 15 additions & 11 deletions src/main/java/io/numaproj/numaflow/sessionreducer/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.numaproj.numaflow.info.ServerInfoAccessorImpl;
import io.numaproj.numaflow.sessionreducer.model.SessionReducer;
import io.numaproj.numaflow.sessionreducer.model.SessionReducerFactory;
import io.numaproj.numaflow.shared.GrpcServerHelper;
import io.numaproj.numaflow.shared.GrpcServerUtils;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -22,6 +23,7 @@ public class Server {
private final Service service;
private final ServerInfoAccessor serverInfoAccessor = new ServerInfoAccessorImpl(new ObjectMapper());
private io.grpc.Server server;
private final GrpcServerHelper grpcServerHelper;

/**
* constructor to create gRPC server.
Expand All @@ -43,6 +45,7 @@ public Server(
GRPCConfig grpcConfig) {
this.service = new Service(sessionReducerFactory);
this.grpcConfig = grpcConfig;
this.grpcServerHelper = new GrpcServerHelper();
}

/**
Expand All @@ -60,33 +63,32 @@ public void start() throws Exception {
}

if (this.server == null) {
// create server builder
ServerBuilder<?> serverBuilder = GrpcServerUtils.createServerBuilder(
this.server = this.grpcServerHelper.createServer(

Check warning on line 66 in src/main/java/io/numaproj/numaflow/sessionreducer/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/sessionreducer/Server.java#L66

Added line #L66 was not covered by tests
grpcConfig.getSocketPath(),
grpcConfig.getMaxMessageSize(),
grpcConfig.isLocal(),
grpcConfig.getPort());

// build server
this.server = serverBuilder
.addService(this.service)
.build();
grpcConfig.getPort(),

Check warning on line 70 in src/main/java/io/numaproj/numaflow/sessionreducer/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/sessionreducer/Server.java#L70

Added line #L70 was not covered by tests
this.service);
}

// start server
server.start();

log.info(
"Server started, listening on {}",
"server started, listening on {}",
grpcConfig.isLocal() ?
"localhost:" + grpcConfig.getPort() : grpcConfig.getSocketPath());

// register shutdown hook
// register shutdown hook to gracefully shut down the server
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
System.err.println("*** shutting down gRPC server since JVM is shutting down");
if (server != null && server.isTerminated()) {
return;
}
try {
Server.this.stop();
log.info("gracefully shutting down event loop groups");
this.grpcServerHelper.gracefullyShutdownEventLoopGroups();

Check warning on line 91 in src/main/java/io/numaproj/numaflow/sessionreducer/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/sessionreducer/Server.java#L90-L91

Added lines #L90 - L91 were not covered by tests
} catch (InterruptedException e) {
Thread.interrupted();
e.printStackTrace(System.err);
Expand All @@ -102,7 +104,9 @@ public void start() throws Exception {
* @throws InterruptedException if the current thread is interrupted while waiting
*/
public void awaitTermination() throws InterruptedException {
log.info("session reduce server is waiting for termination");

Check warning on line 107 in src/main/java/io/numaproj/numaflow/sessionreducer/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/sessionreducer/Server.java#L107

Added line #L107 was not covered by tests
server.awaitTermination();
log.info("session reduce server terminated");

Check warning on line 109 in src/main/java/io/numaproj/numaflow/sessionreducer/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/sessionreducer/Server.java#L109

Added line #L109 was not covered by tests
}

/**
Expand Down
69 changes: 0 additions & 69 deletions src/main/java/io/numaproj/numaflow/shared/GrpcServerUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -127,73 +127,4 @@ public static void writeServerInfo(
log.info("Writing server info {} to {}", serverInfo, infoFilePath);
serverInfoAccessor.write(serverInfo, infoFilePath);
}

// TODO - remove this one
public static ServerBuilder<?> createServerBuilder(
String socketPath,
int maxMessageSize,
boolean isLocal,
int port) {
ServerInterceptor interceptor = new ServerInterceptor() {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
io.grpc.Metadata headers,
ServerCallHandler<ReqT, RespT> next) {

final var context =
Context.current().withValues(
WINDOW_START_TIME,
headers.get(DATUM_METADATA_WIN_START),
WINDOW_END_TIME,
headers.get(DATUM_METADATA_WIN_END));
ServerCall.Listener<ReqT> listener = Contexts.interceptCall(
context,
call,
headers,
next);
return new ForwardingServerCallListener.SimpleForwardingServerCallListener<>(
listener) {
@Override
public void onHalfClose() {
try {
super.onHalfClose();
} catch (RuntimeException ex) {
handleException(ex, call, headers);
throw ex;
}
}

private void handleException(
RuntimeException e,
ServerCall<ReqT, RespT> serverCall,
io.grpc.Metadata headers) {
// Currently, we only have application level exceptions.
// Translate it to UNKNOWN status.
var status = Status.UNKNOWN.withDescription(e.getMessage()).withCause(e);
var newStatus = Status.fromThrowable(status.asException());
serverCall.close(newStatus, headers);
e.printStackTrace();
System.exit(1);
}
};
}
};

if (isLocal) {
return ServerBuilder.forPort(port)
.maxInboundMessageSize(maxMessageSize)
.intercept(interceptor);
}

return NettyServerBuilder
.forAddress(new DomainSocketAddress(socketPath))
.channelType(GrpcServerUtils.getChannelTypeClass())
.maxInboundMessageSize(maxMessageSize)
.bossEventLoopGroup(GrpcServerUtils.createEventLoopGroup(1, "netty-boss"))
.workerEventLoopGroup(GrpcServerUtils.createEventLoopGroup(
ThreadUtils.INSTANCE.availableProcessors(),
"netty-worker"))
.intercept(interceptor);
}
}
10 changes: 0 additions & 10 deletions src/test/java/io/numaproj/numaflow/shared/GrpcServerUtilsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,6 @@ public void testWriteServerInfo() throws Exception {
.write(Mockito.any(), Mockito.eq("infoFilePath"));
}

@Test
public void testCreateServerBuilder() {
ServerBuilder<?> serverBuilder = GrpcServerUtils.createServerBuilder(
"socketPath",
1000,
false,
50051);
Assert.assertNotNull(serverBuilder);
}

@Test
public void testWindowStartTime() {
Context.Key<String> windowStartTime = GrpcServerUtils.WINDOW_START_TIME;
Expand Down

0 comments on commit 7b77b6d

Please sign in to comment.