Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
Signed-off-by: Keran Yang <[email protected]>
  • Loading branch information
KeranYang committed Nov 6, 2024
1 parent 757293b commit 9f9eeb8
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 14 deletions.
15 changes: 11 additions & 4 deletions src/main/java/io/numaproj/numaflow/shared/GrpcServerHelper.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package io.numaproj.numaflow.shared;

import io.grpc.BindableService;
import io.grpc.Context;
import io.grpc.Contexts;
import io.grpc.ForwardingServerCallListener;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
Expand Down Expand Up @@ -30,11 +32,12 @@ public void gracefullyShutdownEventLoopGroups() {
}
}

public ServerBuilder<?> createServerBuilder(
public Server createServer(
String socketPath,
int maxMessageSize,
boolean isLocal,
int port) {
int port,
BindableService service) {
ServerInterceptor interceptor = new ServerInterceptor() {

Check warning on line 41 in src/main/java/io/numaproj/numaflow/shared/GrpcServerHelper.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/shared/GrpcServerHelper.java#L41

Added line #L41 was not covered by tests
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
Expand Down Expand Up @@ -84,7 +87,9 @@ private void handleException(
if (isLocal) {
return ServerBuilder.forPort(port)
.maxInboundMessageSize(maxMessageSize)
.intercept(interceptor);
.intercept(interceptor)
.addService(service)
.build();

Check warning on line 92 in src/main/java/io/numaproj/numaflow/shared/GrpcServerHelper.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/shared/GrpcServerHelper.java#L88-L92

Added lines #L88 - L92 were not covered by tests
}

this.bossEventLoopGroup = GrpcServerUtils.createEventLoopGroup(1, "netty-boss");
Expand All @@ -98,6 +103,8 @@ private void handleException(
.maxInboundMessageSize(maxMessageSize)
.bossEventLoopGroup(this.bossEventLoopGroup)
.workerEventLoopGroup(this.workerEventLoopGroup)
.intercept(interceptor);
.intercept(interceptor)
.addService(service)
.build();

Check warning on line 108 in src/main/java/io/numaproj/numaflow/shared/GrpcServerHelper.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/shared/GrpcServerHelper.java#L100-L108

Added lines #L100 - L108 were not covered by tests
}
}
14 changes: 4 additions & 10 deletions src/main/java/io/numaproj/numaflow/sinker/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,20 +62,14 @@ public void start() throws Exception {
}

if (this.server == null) {
// create server builder
ServerBuilder<?> serverBuilder = this.grpcServerHelper.createServerBuilder(
this.server = this.grpcServerHelper.createServer(

Check warning on line 65 in src/main/java/io/numaproj/numaflow/sinker/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/sinker/Server.java#L65

Added line #L65 was not covered by tests
grpcConfig.getSocketPath(),
grpcConfig.getMaxMessageSize(),
grpcConfig.isLocal(),
grpcConfig.getPort());

// build server
this.server = serverBuilder
.addService(this.service)
.build();
grpcConfig.getPort(),

Check warning on line 69 in src/main/java/io/numaproj/numaflow/sinker/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/sinker/Server.java#L69

Added line #L69 was not covered by tests
this.service);
}

// start server
server.start();

log.info(
Expand Down Expand Up @@ -111,7 +105,7 @@ public void start() throws Exception {
try {
log.info("stopping server");
Server.this.stop();
log.info("gracefully shutdown event loop groups");
log.info("gracefully shutting down event loop groups");
this.grpcServerHelper.gracefullyShutdownEventLoopGroups();
} catch (InterruptedException ex) {
Thread.interrupted();
Expand Down

0 comments on commit 9f9eeb8

Please sign in to comment.