diff --git a/src/main/java/io/numaproj/numaflow/shared/GrpcServerHelper.java b/src/main/java/io/numaproj/numaflow/shared/GrpcServerHelper.java index 975997c..9fe59cd 100644 --- a/src/main/java/io/numaproj/numaflow/shared/GrpcServerHelper.java +++ b/src/main/java/io/numaproj/numaflow/shared/GrpcServerHelper.java @@ -1,8 +1,10 @@ package io.numaproj.numaflow.shared; +import io.grpc.BindableService; import io.grpc.Context; import io.grpc.Contexts; import io.grpc.ForwardingServerCallListener; +import io.grpc.Server; import io.grpc.ServerBuilder; import io.grpc.ServerCall; import io.grpc.ServerCallHandler; @@ -30,11 +32,12 @@ public void gracefullyShutdownEventLoopGroups() { } } - public ServerBuilder createServerBuilder( + public Server createServer( String socketPath, int maxMessageSize, boolean isLocal, - int port) { + int port, + BindableService service) { ServerInterceptor interceptor = new ServerInterceptor() { @Override public ServerCall.Listener interceptCall( @@ -84,7 +87,9 @@ private void handleException( if (isLocal) { return ServerBuilder.forPort(port) .maxInboundMessageSize(maxMessageSize) - .intercept(interceptor); + .intercept(interceptor) + .addService(service) + .build(); } this.bossEventLoopGroup = GrpcServerUtils.createEventLoopGroup(1, "netty-boss"); @@ -98,6 +103,8 @@ private void handleException( .maxInboundMessageSize(maxMessageSize) .bossEventLoopGroup(this.bossEventLoopGroup) .workerEventLoopGroup(this.workerEventLoopGroup) - .intercept(interceptor); + .intercept(interceptor) + .addService(service) + .build(); } } diff --git a/src/main/java/io/numaproj/numaflow/sinker/Server.java b/src/main/java/io/numaproj/numaflow/sinker/Server.java index e3e5a7d..9766ba3 100644 --- a/src/main/java/io/numaproj/numaflow/sinker/Server.java +++ b/src/main/java/io/numaproj/numaflow/sinker/Server.java @@ -62,20 +62,14 @@ public void start() throws Exception { } if (this.server == null) { - // create server builder - ServerBuilder serverBuilder = this.grpcServerHelper.createServerBuilder( + this.server = this.grpcServerHelper.createServer( grpcConfig.getSocketPath(), grpcConfig.getMaxMessageSize(), grpcConfig.isLocal(), - grpcConfig.getPort()); - - // build server - this.server = serverBuilder - .addService(this.service) - .build(); + grpcConfig.getPort(), + this.service); } - // start server server.start(); log.info( @@ -111,7 +105,7 @@ public void start() throws Exception { try { log.info("stopping server"); Server.this.stop(); - log.info("gracefully shutdown event loop groups"); + log.info("gracefully shutting down event loop groups"); this.grpcServerHelper.gracefullyShutdownEventLoopGroups(); } catch (InterruptedException ex) { Thread.interrupted();