Open
Description
I am writing async code.
The proto file is like these:
service Continuum{
rpc Tick(TickRequest) returns (HistoryResult);
}
The pyi file is partially like these:
class ContinuumStub:
def __init__(self, channel: typing.Union[grpc.Channel, grpc.aio.Channel]) -> None: ...
Tick: grpc.UnaryUnaryMultiCallable[
essence.service_pb2.TickRequest,
essence.service_pb2.HistoryResult,
]
class ContinuumAsyncStub:
Tick: grpc.aio.UnaryUnaryMultiCallable[
essence.service_pb2.TickRequest,
essence.service_pb2.HistoryResult,
]
The ContinuumStub has 2 different params for its init function, which are Channel and aio.Channel. When using aio.Channel, the Stub instance need a type:ignore or typing.cast to convert to AsyncStub, like:
self._channel =grpc.aio.insecure_channel(
target=self.target,
)
stub: service_grpc.ContinuumAsyncStub = service_grpc.ContinuumStub(
self._channel
) # type:ignore
typing.cast is not working here because there isn't a true AsyncStub here, AsyncStub only exists in pyi file.
To solve this problem, maybe using Generic or overload the Tick function in pyi file?
Any idea?
Metadata
Metadata
Assignees
Labels
No labels