Skip to content

Commit

Permalink
feat: tikv/grpc-rs replace tonic
Browse files Browse the repository at this point in the history
  • Loading branch information
CherishCai committed Sep 9, 2022
1 parent 52c1e6f commit 01a8778
Show file tree
Hide file tree
Showing 18 changed files with 215 additions and 271 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,7 @@ Cargo.lock

# IDE
.idea

# Customize
### google.protobuf.rs build by prost-build, exclude it because no content.
**/google.protobuf.rs
10 changes: 5 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,18 @@ thiserror = "1.0"
tokio = { version = "1.21", features = ["full"] }
tokio-stream = { version = "0.1", features = ["net"] }
futures = "0.3"
h2 = "0.3"
tonic = "0.8"
prost = "0.11"
prost-types = "0.11"
grpcio = { version = "0.10", default-features = false, features = ["prost-codec"] }
prost = "0.9"
prost-types = "0.9"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
lazy_static = "1.4"

tracing = "0.1"

[build-dependencies]
tonic-build = "0.8"
grpcio-compiler = { version = "0.10", default-features = false, features = ["prost-codec"] }
prost-build = "0.9"

[dev-dependencies]
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
Expand Down
7 changes: 3 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,17 @@ Nacos client in Rust
在 nacos-sdk-rust 工程里,为主要功能的实现,将会引入以下依赖包。

- serde-rs/serde 一个超高性能的通用序列化/反序列化框架,可以跟多种协议的库联合使用,实现统一编解码格式;serde-rs/json 快到上天的 JSON 库,也是 Rust 事实上的标准 JSON
- hyperium/tonic 纯 Rust 实现的 gRPC 客户端和服务器端,支持 async/await 异步调用,文档和示例较为清晰
- tokio-rs/prost tokio 出品的 Protocol Buffers 工具,简单易用,文档详细
- tikv/grpc-rs 一个 Rust 版的 gRPC 客户端和服务器端
- tokio-rs/tokio 最火的异步网络库,除了复杂上手难度高一些外,没有其它大的问题。同时 tokio 团队提供了多个非常优秀的 Rust 库,整个生态欣欣向荣,用户认可度很高
- tokio-rs/tracing 强大的日志框架,同时还支持 OpenTelemetry 格式,无缝打通未来的监控

*Tip:Rust 入门推荐 [Rust语言圣经(Rust Course)](https://course.rs/about-book.html)*

### 简要描述 client & server 的交互

请关注 `proto/nacos_grpc_service.proto` 并知晓经过 `tonic-build` 构建出客户端侧的 stub,实现同步调用 `service Request.request()`,流式交互 `service BiRequestStream.requestBiStream()`
请关注 `proto/nacos_grpc_service.proto` 并知晓构建出客户端侧的 stub,实现同步调用 `service Request.request()`,流式交互 `service BiRequestStream.requestBiStream()`

`tonic` 创建与 Nacos-server 的 gRPC 双工长链接,`serde/json` 适配与 server 的交互序列化;
`tikv/grpc-rs` 创建与 Nacos-server 的 gRPC 双工长链接,`serde/json` 适配与 server 的交互序列化;

gRPC 交互的 Payload 和 Metadata 由 `Protocol Buffers` 序列化,具体的 Request/Response 实体 json 格式二进制数据维护于 Payload.body,类型名字符串维护于 Metadata.type (为 java 全类名)。

Expand Down
13 changes: 6 additions & 7 deletions build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,12 @@
// limitations under the License.
//

// use std::env;

fn main() -> Result<(), std::io::Error> {
// env::set_var("OUT_DIR", "src");
tonic_build::configure()
.build_server(false)
.disable_package_emission()
.compile(&["./proto/nacos_grpc_service.proto"], &["./proto"])?;
grpcio_compiler::prost_codegen::compile_protos(
&["./proto/nacos_grpc_service.proto"],
&["./proto"],
"src",
)
.unwrap();
Ok(())
}
3 changes: 0 additions & 3 deletions proto/nacos_grpc_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@

syntax = "proto3";

// package for rust
package nacos.v2;

import "google/protobuf/any.proto";
import "google/protobuf/timestamp.proto";

Expand Down
54 changes: 54 additions & 0 deletions src/_.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Metadata {
#[prost(string, tag="3")]
pub r#type: ::prost::alloc::string::String,
#[prost(string, tag="8")]
pub client_ip: ::prost::alloc::string::String,
#[prost(map="string, string", tag="7")]
pub headers: ::std::collections::HashMap<::prost::alloc::string::String, ::prost::alloc::string::String>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Payload {
#[prost(message, optional, tag="2")]
pub metadata: ::core::option::Option<Metadata>,
#[prost(message, optional, tag="3")]
pub body: ::core::option::Option<::prost_types::Any>,
}
const METHOD_REQUEST_REQUEST: ::grpcio::Method<Payload, Payload> = ::grpcio::Method{ty: ::grpcio::MethodType::Unary, name: "/Request/request", req_mar: ::grpcio::Marshaller { ser: ::grpcio::pr_ser, de: ::grpcio::pr_de }, resp_mar: ::grpcio::Marshaller { ser: ::grpcio::pr_ser, de: ::grpcio::pr_de }, };
#[derive(Clone)]
pub struct RequestClient { client: ::grpcio::Client }
impl RequestClient {
pub fn new(channel: ::grpcio::Channel) -> Self { RequestClient { client: ::grpcio::Client::new(channel) }}
pub fn request_opt(&self, req: &Payload, opt: ::grpcio::CallOption) -> ::grpcio::Result<Payload,> { self.client.unary_call(&METHOD_REQUEST_REQUEST, req, opt) }
pub fn request(&self, req: &Payload) -> ::grpcio::Result<Payload,> { self.request_opt(req, ::grpcio::CallOption::default()) }
pub fn request_async_opt(&self, req: &Payload, opt: ::grpcio::CallOption) -> ::grpcio::Result<::grpcio::ClientUnaryReceiver<Payload>,> { self.client.unary_call_async(&METHOD_REQUEST_REQUEST, req, opt) }
pub fn request_async(&self, req: &Payload) -> ::grpcio::Result<::grpcio::ClientUnaryReceiver<Payload>,> { self.request_async_opt(req, ::grpcio::CallOption::default()) }
pub fn spawn<F>(&self, f: F) where F: ::std::future::Future<Output = ()> + Send + 'static {self.client.spawn(f)}
}
pub trait Request {
fn request(&mut self, ctx: ::grpcio::RpcContext, _req: Payload, sink: ::grpcio::UnarySink<Payload>) { grpcio::unimplemented_call!(ctx, sink) }
}
pub fn create_request<S: Request + Send + Clone + 'static>(s: S) -> ::grpcio::Service {
let mut builder = ::grpcio::ServiceBuilder::new();
let mut instance = s;
builder = builder.add_unary_handler(&METHOD_REQUEST_REQUEST, move |ctx, req, resp| instance.request(ctx, req, resp));
builder.build()
}
const METHOD_BI_REQUEST_STREAM_REQUEST_BI_STREAM: ::grpcio::Method<Payload, Payload> = ::grpcio::Method{ty: ::grpcio::MethodType::Duplex, name: "/BiRequestStream/requestBiStream", req_mar: ::grpcio::Marshaller { ser: ::grpcio::pr_ser, de: ::grpcio::pr_de }, resp_mar: ::grpcio::Marshaller { ser: ::grpcio::pr_ser, de: ::grpcio::pr_de }, };
#[derive(Clone)]
pub struct BiRequestStreamClient { client: ::grpcio::Client }
impl BiRequestStreamClient {
pub fn new(channel: ::grpcio::Channel) -> Self { BiRequestStreamClient { client: ::grpcio::Client::new(channel) }}
pub fn request_bi_stream_opt(&self, opt: ::grpcio::CallOption) -> ::grpcio::Result<(::grpcio::ClientDuplexSender<Payload>,::grpcio::ClientDuplexReceiver<Payload>,)> { self.client.duplex_streaming(&METHOD_BI_REQUEST_STREAM_REQUEST_BI_STREAM, opt) }
pub fn request_bi_stream(&self) -> ::grpcio::Result<(::grpcio::ClientDuplexSender<Payload>,::grpcio::ClientDuplexReceiver<Payload>,)> { self.request_bi_stream_opt(::grpcio::CallOption::default()) }
pub fn spawn<F>(&self, f: F) where F: ::std::future::Future<Output = ()> + Send + 'static {self.client.spawn(f)}
}
pub trait BiRequestStream {
fn request_bi_stream(&mut self, ctx: ::grpcio::RpcContext, _stream: ::grpcio::RequestStream<Payload>, sink: ::grpcio::DuplexSink<Payload>) { grpcio::unimplemented_call!(ctx, sink) }
}
pub fn create_bi_request_stream<S: BiRequestStream + Send + Clone + 'static>(s: S) -> ::grpcio::Service {
let mut builder = ::grpcio::ServiceBuilder::new();
let mut instance = s;
builder = builder.add_duplex_streaming_handler(&METHOD_BI_REQUEST_STREAM_REQUEST_BI_STREAM, move |ctx, req, resp| instance.request_bi_stream(ctx, req, resp));
builder.build()
}
2 changes: 1 addition & 1 deletion src/api/constants.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
pub const KEY_SERVER_ADDR: &'static str = "server_addr";

pub const DEFAULT_SERVER_ADDR: &'static str = "http://0.0.0.0:9848";
pub const DEFAULT_SERVER_ADDR: &'static str = "0.0.0.0:9848";

// label AppName
pub const KEY_LABEL_APP_NAME: &'static str = "AppName";
8 changes: 4 additions & 4 deletions src/api/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
// limitations under the License.
//
/// error learn from Skywalking Rust.
use tokio::{sync::oneshot, task::JoinError};
/// Nacos Sdk Rust Result.
pub type Result<T> = std::result::Result<T, Error>;
Expand All @@ -31,15 +30,16 @@ pub enum Error {
#[error("remote client shutdown failed: {0}")]
ClientShutdown(String),

/*
#[error("tonic transport failed: {0}")]
TonicTransport(#[from] tonic::transport::Error),
#[error("tonic status: {0}")]
TonicStatus(#[from] tonic::Status),

*/
#[error("tokio task join failed: {0}")]
TokioJoin(#[from] JoinError),
TokioJoin(#[from] tokio::task::JoinError),

#[error("tokio oneshot receive failed: {0}")]
TokioOneshotRecv(#[from] oneshot::error::RecvError),
TokioOneshotRecv(#[from] tokio::sync::oneshot::error::RecvError),
}
93 changes: 48 additions & 45 deletions src/common/remote/conn.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
/**
* Learn from https://github.com/tokio-rs/console/blob/main/tokio-console/src/conn.rs
*/
use std::{error::Error, pin::Pin, time::Duration};

use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tonic::{transport::Channel, Streaming};

use futures::stream::StreamExt;
use futures::SinkExt;

use std::sync::{Arc, Mutex};
use std::{error::Error, pin::Pin, time::Duration};

use crate::api::client_config::ClientConfig;
use crate::common::remote::request::client_request::{
Expand All @@ -16,11 +14,9 @@ use crate::common::remote::request::client_request::{
use crate::common::remote::request::Request;
use crate::common::remote::response::Response;
use crate::common::util::*;
use crate::nacos_proto::v2::bi_request_stream_client::BiRequestStreamClient;
use crate::nacos_proto::v2::request_client::RequestClient;
use crate::nacos_proto::v2::Payload;
use crate::nacos_proto::v2::{BiRequestStreamClient, Payload, RequestClient};

#[derive(Debug)]
// #[derive(Debug)]
pub struct Connection {
client_config: ClientConfig,
state: State,
Expand All @@ -32,14 +28,14 @@ pub struct Connection {
// stream just adds a heap pointer dereference, slightly penalizing polling
// the stream in most cases. so, don't listen to clippy on this.
#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
// #[derive(Debug)]
enum State {
Connected {
conn_id: String,
client: RequestClient<Channel>,
bi_client: BiRequestStreamClient<Channel>,
bi_sender: mpsc::UnboundedSender<Payload>,
resp_bi_stream: Box<Streaming<Payload>>,
client: RequestClient,
bi_client: BiRequestStreamClient,
bi_sender: Arc<Mutex<grpcio::ClientDuplexSender<Payload>>>,
bi_receiver: Arc<Mutex<grpcio::ClientDuplexReceiver<Payload>>>,
},
Disconnected(Duration),
}
Expand Down Expand Up @@ -102,36 +98,36 @@ impl Connection {
let tenant = self.client_config.namespace.clone();
let labels = self.client_config.labels.clone();

let endpoint = tonic::transport::Endpoint::new(target)?;
let channel = endpoint.connect().await?;
let env = Arc::new(grpcio::Environment::new(2));
let channel = grpcio::ChannelBuilder::new(env).connect(target.as_str());

let client = RequestClient::new(channel.clone());

let mut client = RequestClient::new(channel.clone());
let req_payload =
payload_helper::build_req_grpc_payload(ServerCheckClientRequest::new());
let resp_payload = client.request(tonic::Request::new(req_payload)).await?;
let resp_payload = client.request(&req_payload);
let server_check_response =
payload_helper::build_server_response(resp_payload.into_inner()).unwrap();
payload_helper::build_server_response(resp_payload.unwrap()).unwrap();
let conn_id = server_check_response.get_connection_id();

let mut bi_client = BiRequestStreamClient::new(channel.clone());
let (tx, rx) = mpsc::unbounded_channel();
let bi_client = BiRequestStreamClient::new(channel.clone());
let (mut client_sender, client_receiver) = bi_client.request_bi_stream().unwrap();
// send a ConnectionSetupClientRequest
tx.send(payload_helper::build_req_grpc_payload(
ConnectionSetupClientRequest::new(tenant, labels),
))
.unwrap();

let resp_bi_stream = bi_client
.request_bi_stream(UnboundedReceiverStream::from(rx))
.await?
.into_inner();
client_sender
.send((
payload_helper::build_req_grpc_payload(ConnectionSetupClientRequest::new(
tenant, labels,
)),
grpcio::WriteFlags::default(),
))
.await?;

Ok::<State, Box<dyn Error + Send + Sync>>(State::Connected {
conn_id: String::from(conn_id.unwrap()),
client,
bi_client,
bi_sender: tx,
resp_bi_stream: Box::new(resp_bi_stream),
bi_sender: Arc::new(Mutex::new(client_sender)),
bi_receiver: Arc::new(Mutex::new(client_receiver)),
})
};
self.state = match try_connect.await {
Expand All @@ -154,9 +150,12 @@ impl Connection {
loop {
match self.state {
State::Connected {
ref mut resp_bi_stream,
ref mut bi_receiver,
..
} => match Pin::new(resp_bi_stream).next().await {
} => match Pin::new(bi_receiver.to_owned().lock().unwrap())
.next()
.await
{
Some(Ok(payload)) => return payload,
Some(Err(status)) => {
println!("error from stream {}", status);
Expand Down Expand Up @@ -184,8 +183,8 @@ impl Connection {
match self.state {
State::Connected { ref mut client, .. } => {
let req_payload = payload_helper::build_req_grpc_payload(req);
let resp_payload = client.request(tonic::Request::new(req_payload)).await?;
payload_helper::build_server_response(resp_payload.into_inner())
let resp_payload = client.request(&req_payload);
payload_helper::build_server_response(resp_payload.unwrap())
}
State::Disconnected(_) => {
self.connect().await;
Expand All @@ -196,12 +195,19 @@ impl Connection {
}
}

pub(crate) async fn send_resp(&mut self, resp: impl Response + serde::Serialize) -> () {
pub(crate) async fn send_resp(&mut self, resp: impl Response + serde::Serialize) {
match self.state {
State::Connected {
ref mut bi_sender, ..
} => bi_sender
.send(payload_helper::build_resp_grpc_payload(resp))
.to_owned()
.lock()
.unwrap()
.send((
payload_helper::build_resp_grpc_payload(resp),
grpcio::WriteFlags::default(),
))
.await
.unwrap(),
State::Disconnected(_) => self.connect().await,
}
Expand All @@ -219,20 +225,17 @@ mod tests {
tracing_subscriber::fmt::init();
println!("test_remote_connect");
let mut remote_connect =
Connection::new(ClientConfig::new().server_addr("http://0.0.0.0:9848".to_string()));
Connection::new(ClientConfig::new().server_addr("0.0.0.0:9848".to_string()));
remote_connect.connect().await;
println!("{:?}", remote_connect.state)
}

#[tokio::test]
async fn test_next_payload() {
println!("test_next_payload");
let mut remote_connect =
Connection::new(ClientConfig::new().server_addr("http://0.0.0.0:9848".to_string()));
let payload = remote_connect.next_payload().await;
Connection::new(ClientConfig::new().server_addr("0.0.0.0:9848".to_string()));
let payload = remote_connect.next_payload().await;
// let server_req = payload_helper::build_server_request(payload).unwrap();
println!("{:?}", remote_connect.state);
let server_req = payload_helper::build_server_request(payload).unwrap();
}

#[tokio::test]
Expand Down
5 changes: 0 additions & 5 deletions src/common/remote/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,4 @@ pub(crate) mod remote_client;
pub(crate) mod request;
pub(crate) mod response;

#[tonic::async_trait]
pub(crate) trait RemoteClient {}

pub(crate) trait PayloadConverter {
fn convert_to_grpc_payload() -> crate::nacos_proto::v2::Payload;
}
Loading

0 comments on commit 01a8778

Please sign in to comment.