Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions cinterop-c/MODULE.bazel.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

69 changes: 65 additions & 4 deletions cinterop-c/include/kgrpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,74 @@ typedef struct {
} kgrpc_cb_tag;


/*
* Call to grpc_iomgr_run_in_background(), which is not exposed as extern "C" and therefore must be wrapped.
// This is a duplicate of the RegisteredCallAllocation, which is defined in
// https://github.com/grpc/grpc/blob/893bdadd56dbb75fb156175afdaa2b0d47e1c15b/src/core/server/server.h#L150-L157.
// This is required, as RegisteredCallAllocation is not part of the exposed C API.
typedef struct {
void *tag;
grpc_call **call;
grpc_metadata_array *initial_metadata;
gpr_timespec *deadline;
grpc_byte_buffer **optional_payload;
grpc_completion_queue *cq;
} kgrpc_registered_call_allocation;

typedef struct {
void* tag;
grpc_call** call;
grpc_metadata_array* initial_metadata;
grpc_call_details* details;
grpc_completion_queue* cq;
} kgrpc_batch_call_allocation;

typedef kgrpc_registered_call_allocation (*kgrpc_registered_call_allocator)(void* ctx);
typedef kgrpc_batch_call_allocation (*kgrpc_batch_call_allocator)(void* ctx);

/*
* Call to grpc_iomgr_run_in_background(), which is not exposed as extern "C" and therefore must be wrapped.
*/
bool kgrpc_iomgr_run_in_background();

/**
* Registers a C-style allocator callback for accepting gRPC calls to a specific method.
*
* Wraps the internal C++ API `Server::SetRegisteredMethodAllocator()` to enable
* callback-driven method dispatch via the Core C API.
* If the C++ API is exposed to the C API, this can be removed (https://github.com/grpc/grpc/issues/40632).
*
* When the gRPC Core needs to accept a new call for the specified method, it invokes:
* kgrpc_registered_call_allocation alloc = allocator();
* to retrieve the accept context, including `tag`, `grpc_call*`, metadata, deadline,
* optional payload, and the completion queue.
*
* @param server The gRPC C `grpc_server*` instance.
* @param cq A callback-style `grpc_completion_queue*` (must be registered earlier).
* @param method_tag Opaque identifier from `grpc_server_register_method()` for the RPC method.
* @param allocator_ctx The context for the callback to pass all necessary objects to the static function.
* @param allocator Function providing new accept contexts (`kgrpc_registered_call_allocation`).
*/
bool kgrpc_iomgr_run_in_background();
void kgrpc_server_set_register_method_allocator(
grpc_server *server,
grpc_completion_queue *cq,
void *method_tag,
void *allocator_ctx,
kgrpc_registered_call_allocator allocator
);

/**
* Like kgrpc_server_set_register_method_allocator but instead of registered methods,
* it sets an allocation callback for unknown method calls.
*/
void kgrpc_server_set_batch_method_allocator(
grpc_server *server,
grpc_completion_queue *cq,
void *allocator_ctx,
kgrpc_batch_call_allocator allocator
);


#ifdef __cplusplus
}
}
#endif

#endif //GRPCPP_C_H
50 changes: 47 additions & 3 deletions cinterop-c/src/kgrpc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,56 @@

#include <kgrpc.h>
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/server/server.h"

extern "C" {

bool kgrpc_iomgr_run_in_background() {
return grpc_iomgr_run_in_background();
}
bool kgrpc_iomgr_run_in_background() {
return grpc_iomgr_run_in_background();
}

void kgrpc_server_set_register_method_allocator(
grpc_server *server,
grpc_completion_queue *cq,
void *method_tag,
void *allocator_ctx,
kgrpc_registered_call_allocator allocator
) {
grpc_core::Server::FromC(server)->SetRegisteredMethodAllocator(
cq,
method_tag,
[allocator_ctx, allocator] {
auto result = allocator(allocator_ctx);
return grpc_core::Server::RegisteredCallAllocation{
.tag = result.tag,
.call = result.call,
.initial_metadata = result.initial_metadata,
.deadline = result.deadline,
.optional_payload = result.optional_payload,
.cq = result.cq,
};
});
}

void kgrpc_server_set_batch_method_allocator(
grpc_server *server,
grpc_completion_queue *cq,
void *allocator_ctx,
kgrpc_batch_call_allocator allocator
) {
grpc_core::Server::FromC(server)->SetBatchMethodAllocator(
cq,
[allocator_ctx, allocator] {
auto result = allocator(allocator_ctx);
return grpc_core::Server::BatchCallAllocation{
.tag = result.tag,
.call = result.call,
.initial_metadata = result.initial_metadata,
.details = result.details,
.cq = result.cq,
};
});
}

}

Expand Down
6 changes: 1 addition & 5 deletions cinterop-c/tools/collect_headers.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,8 @@ include_dir = rule(

def _cc_headers_only_impl(ctx):
dep_cc = ctx.attr.dep[CcInfo].compilation_context

# keep only source headers; this skips generated headers and their actions.
all_hdrs = dep_cc.headers.to_list()
src_hdrs = [f for f in all_hdrs if getattr(f, "is_source", False)]
cc_ctx = cc_common.create_compilation_context(
headers = depset(src_hdrs),
headers = dep_cc.headers,
includes = dep_cc.includes,
quote_includes = dep_cc.quote_includes,
system_includes = dep_cc.system_includes,
Expand Down
12 changes: 12 additions & 0 deletions grpc/grpc-core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,18 @@ kotlin {
extraOpts("-libraryPath", "$cLibOutDir")
}
}

// configures linkReleaseTest task to build and link the test binary in RELEASE mode.
// this can be useful for performance analysis.
targets.withType<org.jetbrains.kotlin.gradle.plugin.mpp.KotlinNativeTarget>().configureEach {
binaries {
test(
buildTypes = listOf(
org.jetbrains.kotlin.gradle.plugin.mpp.NativeBuildType.RELEASE
)
)
}
}
}

configureLocalProtocGenDevelopmentDependency()
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ import kotlinx.rpc.internal.utils.InternalRpcApi
public expect class ServerServiceDefinition {
public fun getServiceDescriptor(): ServiceDescriptor
public fun getMethods(): Collection<ServerMethodDefinition<*, *>>

public fun getMethod(methodName: String): ServerMethodDefinition<*, *>?
}

@InternalRpcApi
public expect fun serverServiceDefinition(
serviceDescriptor: ServiceDescriptor,
methods: Collection<ServerMethodDefinition<*, *>>
methods: Collection<ServerMethodDefinition<*, *>>,
): ServerServiceDefinition
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,25 @@ import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.test.runTest
import kotlinx.coroutines.withTimeout
import kotlinx.rpc.grpc.*
import kotlinx.rpc.grpc.internal.*
import kotlinx.rpc.grpc.GrpcServer
import kotlinx.rpc.grpc.GrpcTrailers
import kotlinx.rpc.grpc.ManagedChannel
import kotlinx.rpc.grpc.ManagedChannelBuilder
import kotlinx.rpc.grpc.Status
import kotlinx.rpc.grpc.StatusCode
import kotlinx.rpc.grpc.buildChannel
import kotlinx.rpc.grpc.internal.ClientCall
import kotlinx.rpc.grpc.internal.GrpcDefaultCallOptions
import kotlinx.rpc.grpc.internal.MethodDescriptor
import kotlinx.rpc.grpc.internal.MethodType
import kotlinx.rpc.grpc.internal.clientCallListener
import kotlinx.rpc.grpc.internal.methodDescriptor
import kotlinx.rpc.grpc.statusCode
import kotlinx.rpc.registerService
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertFails
import kotlin.test.assertFailsWith
import kotlin.time.Duration

private const val PORT = 50051

Expand Down Expand Up @@ -51,8 +62,12 @@ class GrpcCoreClientTest {
this.timeout = timeout
}

private fun shutdownAndWait(channel: ManagedChannel) {
channel.shutdown()
private fun shutdownAndWait(channel: ManagedChannel, now: Boolean = false) {
if (now) {
channel.shutdownNow()
} else {
channel.shutdown()
}
runBlocking { channel.awaitTermination() }
}

Expand Down Expand Up @@ -180,10 +195,7 @@ class GrpcCoreClientTest {
fun halfCloseBeforeSendingMessage_errorWithoutCrashing() {
val channel = createChannel()
val call = channel.newHelloCall()
val statusDeferred = CompletableDeferred<Status>()
val listener = createClientCallListener<HelloReply>(
onClose = { status, _ -> statusDeferred.complete(status) }
)
val listener = createClientCallListener<HelloReply>()
assertFailsWith<IllegalStateException> {
try {
call.start(listener, GrpcTrailers())
Expand Down Expand Up @@ -259,7 +271,7 @@ class GreeterServiceImpl : GreeterService {
* Run this on JVM before executing tests.
*/
@Test
fun runServer() = runTest(timeout = Duration.INFINITE) {
fun runServer() = runTest {
val server = GrpcServer(
port = PORT,
builder = { registerService<GreeterService> { GreeterServiceImpl() } }
Expand All @@ -272,6 +284,7 @@ class GreeterServiceImpl : GreeterService {
} finally {
server.shutdown()
server.awaitTermination()

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@

package kotlinx.rpc.grpc

import kotlinx.rpc.grpc.internal.*
import kotlinx.rpc.grpc.internal.GrpcChannel
import kotlinx.rpc.grpc.internal.GrpcChannelCredentials
import kotlinx.rpc.grpc.internal.GrpcInsecureChannelCredentials
import kotlinx.rpc.grpc.internal.NativeManagedChannel
import kotlinx.rpc.grpc.internal.internalError

/**
* Same as [ManagedChannel], but is platform-exposed.
Expand All @@ -25,10 +29,10 @@ public actual abstract class ManagedChannelBuilder<T : ManagedChannelBuilder<T>>
internal class NativeManagedChannelBuilder(
private val target: String,
) : ManagedChannelBuilder<NativeManagedChannelBuilder>() {
private var credentials: GrpcCredentials? = null
private var credentials: GrpcChannelCredentials? = null

override fun usePlaintext(): NativeManagedChannelBuilder {
credentials = GrpcInsecureCredentials()
credentials = GrpcInsecureChannelCredentials()
return this
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,66 @@

package kotlinx.rpc.grpc

import kotlinx.rpc.grpc.internal.MethodDescriptor
import kotlinx.rpc.grpc.internal.ServerMethodDefinition
import kotlinx.rpc.internal.utils.map.RpcInternalConcurrentHashMap

/**
* Registry of services and their methods used by servers to dispatching incoming calls.
*/
public actual abstract class HandlerRegistry
public actual abstract class HandlerRegistry {
/**
* Returns the [ServerServiceDefinition]s provided by the registry, or an empty list if not
* supported by the implementation.
*/
public abstract fun getServices(): List<ServerServiceDefinition>

/**
* Lookup a [ServerMethodDefinition] by its fully-qualified name.
*
* @param methodName to lookup [ServerMethodDefinition] for.
* @param authority the authority for the desired method (to do virtual hosting). If `null`
* the first matching method will be returned.
* @return the resolved method or `null` if no method for that name exists.
*/
public abstract fun lookupMethod(
methodName: String, authority: String?,
): ServerMethodDefinition<*, *>?

/**
* Lookup a [ServerMethodDefinition] by its fully-qualified name.
*
* @param methodName to lookup [ServerMethodDefinition] for.
* @return the resolved method or `null` if no method for that name exists.
*/
public fun lookupMethod(methodName: String): ServerMethodDefinition<*, *>? {
return lookupMethod(methodName, null)
}

}

internal actual class MutableHandlerRegistry : HandlerRegistry() {

private val services = RpcInternalConcurrentHashMap<String, ServerServiceDefinition>()

actual fun addService(service: ServerServiceDefinition): ServerServiceDefinition? {
error("Native target is not supported in gRPC")
return services.put(service.getServiceDescriptor().getName(), service)
}

actual fun removeService(service: ServerServiceDefinition): Boolean {
error("Native target is not supported in gRPC")
return services.remove(service.getServiceDescriptor().getName()) != null
}

override fun getServices(): List<ServerServiceDefinition> {
return services.values.toList()
}

override fun lookupMethod(
methodName: String,
authority: String?,
): ServerMethodDefinition<*, *>? {
val serviceName = MethodDescriptor.extractFullServiceName(methodName) ?: return null
val service = services[serviceName] ?: return null
return service.getMethod(methodName)
}
}
Loading
Loading