From 01a877808eaf2b0ae69cd5e30a7097085cd42372 Mon Sep 17 00:00:00 2001 From: caihongwen <785427346@qq.com> Date: Fri, 9 Sep 2022 23:00:06 +0800 Subject: [PATCH] feat: tikv/grpc-rs replace tonic --- .gitignore | 4 + Cargo.toml | 10 +- README.md | 7 +- build.rs | 13 +- proto/nacos_grpc_service.proto | 3 - src/_.rs | 54 +++++++ src/api/constants.rs | 2 +- src/api/error.rs | 8 +- src/common/remote/conn.rs | 93 +++++------ src/common/remote/mod.rs | 5 - src/common/remote/remote_client.rs | 146 +----------------- src/common/remote/request/client_request.rs | 39 +---- src/common/remote/request/mod.rs | 6 +- src/common/remote/request/server_request.rs | 64 ++++++++ src/common/remote/response/mod.rs | 1 - src/common/remote/response/server_response.rs | 4 +- src/common/util/payload_helper.rs | 16 +- src/lib.rs | 11 +- 18 files changed, 215 insertions(+), 271 deletions(-) create mode 100644 src/_.rs create mode 100644 src/common/remote/request/server_request.rs diff --git a/.gitignore b/.gitignore index 45c6b7c..93d5772 100644 --- a/.gitignore +++ b/.gitignore @@ -11,3 +11,7 @@ Cargo.lock # IDE .idea + +# Customize +### google.protobuf.rs build by prost-build, exclude it because no content. +**/google.protobuf.rs diff --git a/Cargo.toml b/Cargo.toml index 805aa46..eab702c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,10 +28,9 @@ thiserror = "1.0" tokio = { version = "1.21", features = ["full"] } tokio-stream = { version = "0.1", features = ["net"] } futures = "0.3" -h2 = "0.3" -tonic = "0.8" -prost = "0.11" -prost-types = "0.11" +grpcio = { version = "0.10", default-features = false, features = ["prost-codec"] } +prost = "0.9" +prost-types = "0.9" serde = { version = "1", features = ["derive"] } serde_json = "1" lazy_static = "1.4" @@ -39,7 +38,8 @@ lazy_static = "1.4" tracing = "0.1" [build-dependencies] -tonic-build = "0.8" +grpcio-compiler = { version = "0.10", default-features = false, features = ["prost-codec"] } +prost-build = "0.9" [dev-dependencies] tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/README.md b/README.md index 587b062..2a7ecc1 100644 --- a/README.md +++ b/README.md @@ -11,8 +11,7 @@ Nacos client in Rust 在 nacos-sdk-rust 工程里,为主要功能的实现,将会引入以下依赖包。 - serde-rs/serde 一个超高性能的通用序列化/反序列化框架,可以跟多种协议的库联合使用,实现统一编解码格式;serde-rs/json 快到上天的 JSON 库,也是 Rust 事实上的标准 JSON -- hyperium/tonic 纯 Rust 实现的 gRPC 客户端和服务器端,支持 async/await 异步调用,文档和示例较为清晰 -- tokio-rs/prost tokio 出品的 Protocol Buffers 工具,简单易用,文档详细 +- tikv/grpc-rs 一个 Rust 版的 gRPC 客户端和服务器端 - tokio-rs/tokio 最火的异步网络库,除了复杂上手难度高一些外,没有其它大的问题。同时 tokio 团队提供了多个非常优秀的 Rust 库,整个生态欣欣向荣,用户认可度很高 - tokio-rs/tracing 强大的日志框架,同时还支持 OpenTelemetry 格式,无缝打通未来的监控 @@ -20,9 +19,9 @@ Nacos client in Rust ### 简要描述 client & server 的交互 -请关注 `proto/nacos_grpc_service.proto` 并知晓经过 `tonic-build` 构建出客户端侧的 stub,实现同步调用 `service Request.request()`,流式交互 `service BiRequestStream.requestBiStream()`。 +请关注 `proto/nacos_grpc_service.proto` 并知晓构建出客户端侧的 stub,实现同步调用 `service Request.request()`,流式交互 `service BiRequestStream.requestBiStream()`。 -`tonic` 创建与 Nacos-server 的 gRPC 双工长链接,`serde/json` 适配与 server 的交互序列化; +`tikv/grpc-rs` 创建与 Nacos-server 的 gRPC 双工长链接,`serde/json` 适配与 server 的交互序列化; gRPC 交互的 Payload 和 Metadata 由 `Protocol Buffers` 序列化,具体的 Request/Response 实体 json 格式二进制数据维护于 Payload.body,类型名字符串维护于 Metadata.type (为 java 全类名)。 diff --git a/build.rs b/build.rs index 276b42f..93fa363 100644 --- a/build.rs +++ b/build.rs @@ -14,13 +14,12 @@ // limitations under the License. // -// use std::env; - fn main() -> Result<(), std::io::Error> { - // env::set_var("OUT_DIR", "src"); - tonic_build::configure() - .build_server(false) - .disable_package_emission() - .compile(&["./proto/nacos_grpc_service.proto"], &["./proto"])?; + grpcio_compiler::prost_codegen::compile_protos( + &["./proto/nacos_grpc_service.proto"], + &["./proto"], + "src", + ) + .unwrap(); Ok(()) } diff --git a/proto/nacos_grpc_service.proto b/proto/nacos_grpc_service.proto index 3d99f1c..adf2a30 100644 --- a/proto/nacos_grpc_service.proto +++ b/proto/nacos_grpc_service.proto @@ -17,9 +17,6 @@ syntax = "proto3"; -// package for rust -package nacos.v2; - import "google/protobuf/any.proto"; import "google/protobuf/timestamp.proto"; diff --git a/src/_.rs b/src/_.rs new file mode 100644 index 0000000..d3ed547 --- /dev/null +++ b/src/_.rs @@ -0,0 +1,54 @@ +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct Metadata { + #[prost(string, tag="3")] + pub r#type: ::prost::alloc::string::String, + #[prost(string, tag="8")] + pub client_ip: ::prost::alloc::string::String, + #[prost(map="string, string", tag="7")] + pub headers: ::std::collections::HashMap<::prost::alloc::string::String, ::prost::alloc::string::String>, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct Payload { + #[prost(message, optional, tag="2")] + pub metadata: ::core::option::Option, + #[prost(message, optional, tag="3")] + pub body: ::core::option::Option<::prost_types::Any>, +} +const METHOD_REQUEST_REQUEST: ::grpcio::Method = ::grpcio::Method{ty: ::grpcio::MethodType::Unary, name: "/Request/request", req_mar: ::grpcio::Marshaller { ser: ::grpcio::pr_ser, de: ::grpcio::pr_de }, resp_mar: ::grpcio::Marshaller { ser: ::grpcio::pr_ser, de: ::grpcio::pr_de }, }; +#[derive(Clone)] +pub struct RequestClient { client: ::grpcio::Client } +impl RequestClient { +pub fn new(channel: ::grpcio::Channel) -> Self { RequestClient { client: ::grpcio::Client::new(channel) }} +pub fn request_opt(&self, req: &Payload, opt: ::grpcio::CallOption) -> ::grpcio::Result { self.client.unary_call(&METHOD_REQUEST_REQUEST, req, opt) } +pub fn request(&self, req: &Payload) -> ::grpcio::Result { self.request_opt(req, ::grpcio::CallOption::default()) } +pub fn request_async_opt(&self, req: &Payload, opt: ::grpcio::CallOption) -> ::grpcio::Result<::grpcio::ClientUnaryReceiver,> { self.client.unary_call_async(&METHOD_REQUEST_REQUEST, req, opt) } +pub fn request_async(&self, req: &Payload) -> ::grpcio::Result<::grpcio::ClientUnaryReceiver,> { self.request_async_opt(req, ::grpcio::CallOption::default()) } +pub fn spawn(&self, f: F) where F: ::std::future::Future + Send + 'static {self.client.spawn(f)} +} +pub trait Request { +fn request(&mut self, ctx: ::grpcio::RpcContext, _req: Payload, sink: ::grpcio::UnarySink) { grpcio::unimplemented_call!(ctx, sink) } +} +pub fn create_request(s: S) -> ::grpcio::Service { +let mut builder = ::grpcio::ServiceBuilder::new(); +let mut instance = s; +builder = builder.add_unary_handler(&METHOD_REQUEST_REQUEST, move |ctx, req, resp| instance.request(ctx, req, resp)); +builder.build() +} +const METHOD_BI_REQUEST_STREAM_REQUEST_BI_STREAM: ::grpcio::Method = ::grpcio::Method{ty: ::grpcio::MethodType::Duplex, name: "/BiRequestStream/requestBiStream", req_mar: ::grpcio::Marshaller { ser: ::grpcio::pr_ser, de: ::grpcio::pr_de }, resp_mar: ::grpcio::Marshaller { ser: ::grpcio::pr_ser, de: ::grpcio::pr_de }, }; +#[derive(Clone)] +pub struct BiRequestStreamClient { client: ::grpcio::Client } +impl BiRequestStreamClient { +pub fn new(channel: ::grpcio::Channel) -> Self { BiRequestStreamClient { client: ::grpcio::Client::new(channel) }} +pub fn request_bi_stream_opt(&self, opt: ::grpcio::CallOption) -> ::grpcio::Result<(::grpcio::ClientDuplexSender,::grpcio::ClientDuplexReceiver,)> { self.client.duplex_streaming(&METHOD_BI_REQUEST_STREAM_REQUEST_BI_STREAM, opt) } +pub fn request_bi_stream(&self) -> ::grpcio::Result<(::grpcio::ClientDuplexSender,::grpcio::ClientDuplexReceiver,)> { self.request_bi_stream_opt(::grpcio::CallOption::default()) } +pub fn spawn(&self, f: F) where F: ::std::future::Future + Send + 'static {self.client.spawn(f)} +} +pub trait BiRequestStream { +fn request_bi_stream(&mut self, ctx: ::grpcio::RpcContext, _stream: ::grpcio::RequestStream, sink: ::grpcio::DuplexSink) { grpcio::unimplemented_call!(ctx, sink) } +} +pub fn create_bi_request_stream(s: S) -> ::grpcio::Service { +let mut builder = ::grpcio::ServiceBuilder::new(); +let mut instance = s; +builder = builder.add_duplex_streaming_handler(&METHOD_BI_REQUEST_STREAM_REQUEST_BI_STREAM, move |ctx, req, resp| instance.request_bi_stream(ctx, req, resp)); +builder.build() +} diff --git a/src/api/constants.rs b/src/api/constants.rs index 1dc322c..ea70f23 100644 --- a/src/api/constants.rs +++ b/src/api/constants.rs @@ -1,6 +1,6 @@ pub const KEY_SERVER_ADDR: &'static str = "server_addr"; -pub const DEFAULT_SERVER_ADDR: &'static str = "http://0.0.0.0:9848"; +pub const DEFAULT_SERVER_ADDR: &'static str = "0.0.0.0:9848"; // label AppName pub const KEY_LABEL_APP_NAME: &'static str = "AppName"; diff --git a/src/api/error.rs b/src/api/error.rs index b786a3e..405b520 100644 --- a/src/api/error.rs +++ b/src/api/error.rs @@ -14,7 +14,6 @@ // limitations under the License. // /// error learn from Skywalking Rust. -use tokio::{sync::oneshot, task::JoinError}; /// Nacos Sdk Rust Result. pub type Result = std::result::Result; @@ -31,15 +30,16 @@ pub enum Error { #[error("remote client shutdown failed: {0}")] ClientShutdown(String), + /* #[error("tonic transport failed: {0}")] TonicTransport(#[from] tonic::transport::Error), #[error("tonic status: {0}")] TonicStatus(#[from] tonic::Status), - + */ #[error("tokio task join failed: {0}")] - TokioJoin(#[from] JoinError), + TokioJoin(#[from] tokio::task::JoinError), #[error("tokio oneshot receive failed: {0}")] - TokioOneshotRecv(#[from] oneshot::error::RecvError), + TokioOneshotRecv(#[from] tokio::sync::oneshot::error::RecvError), } diff --git a/src/common/remote/conn.rs b/src/common/remote/conn.rs index 04c8d8a..f29b0d6 100644 --- a/src/common/remote/conn.rs +++ b/src/common/remote/conn.rs @@ -1,13 +1,11 @@ /** * Learn from https://github.com/tokio-rs/console/blob/main/tokio-console/src/conn.rs */ -use std::{error::Error, pin::Pin, time::Duration}; - -use tokio::sync::mpsc; -use tokio_stream::wrappers::UnboundedReceiverStream; -use tonic::{transport::Channel, Streaming}; - use futures::stream::StreamExt; +use futures::SinkExt; + +use std::sync::{Arc, Mutex}; +use std::{error::Error, pin::Pin, time::Duration}; use crate::api::client_config::ClientConfig; use crate::common::remote::request::client_request::{ @@ -16,11 +14,9 @@ use crate::common::remote::request::client_request::{ use crate::common::remote::request::Request; use crate::common::remote::response::Response; use crate::common::util::*; -use crate::nacos_proto::v2::bi_request_stream_client::BiRequestStreamClient; -use crate::nacos_proto::v2::request_client::RequestClient; -use crate::nacos_proto::v2::Payload; +use crate::nacos_proto::v2::{BiRequestStreamClient, Payload, RequestClient}; -#[derive(Debug)] +// #[derive(Debug)] pub struct Connection { client_config: ClientConfig, state: State, @@ -32,14 +28,14 @@ pub struct Connection { // stream just adds a heap pointer dereference, slightly penalizing polling // the stream in most cases. so, don't listen to clippy on this. #[allow(clippy::large_enum_variant)] -#[derive(Debug)] +// #[derive(Debug)] enum State { Connected { conn_id: String, - client: RequestClient, - bi_client: BiRequestStreamClient, - bi_sender: mpsc::UnboundedSender, - resp_bi_stream: Box>, + client: RequestClient, + bi_client: BiRequestStreamClient, + bi_sender: Arc>>, + bi_receiver: Arc>>, }, Disconnected(Duration), } @@ -102,36 +98,36 @@ impl Connection { let tenant = self.client_config.namespace.clone(); let labels = self.client_config.labels.clone(); - let endpoint = tonic::transport::Endpoint::new(target)?; - let channel = endpoint.connect().await?; + let env = Arc::new(grpcio::Environment::new(2)); + let channel = grpcio::ChannelBuilder::new(env).connect(target.as_str()); + + let client = RequestClient::new(channel.clone()); - let mut client = RequestClient::new(channel.clone()); let req_payload = payload_helper::build_req_grpc_payload(ServerCheckClientRequest::new()); - let resp_payload = client.request(tonic::Request::new(req_payload)).await?; + let resp_payload = client.request(&req_payload); let server_check_response = - payload_helper::build_server_response(resp_payload.into_inner()).unwrap(); + payload_helper::build_server_response(resp_payload.unwrap()).unwrap(); let conn_id = server_check_response.get_connection_id(); - let mut bi_client = BiRequestStreamClient::new(channel.clone()); - let (tx, rx) = mpsc::unbounded_channel(); + let bi_client = BiRequestStreamClient::new(channel.clone()); + let (mut client_sender, client_receiver) = bi_client.request_bi_stream().unwrap(); // send a ConnectionSetupClientRequest - tx.send(payload_helper::build_req_grpc_payload( - ConnectionSetupClientRequest::new(tenant, labels), - )) - .unwrap(); - - let resp_bi_stream = bi_client - .request_bi_stream(UnboundedReceiverStream::from(rx)) - .await? - .into_inner(); + client_sender + .send(( + payload_helper::build_req_grpc_payload(ConnectionSetupClientRequest::new( + tenant, labels, + )), + grpcio::WriteFlags::default(), + )) + .await?; Ok::>(State::Connected { conn_id: String::from(conn_id.unwrap()), client, bi_client, - bi_sender: tx, - resp_bi_stream: Box::new(resp_bi_stream), + bi_sender: Arc::new(Mutex::new(client_sender)), + bi_receiver: Arc::new(Mutex::new(client_receiver)), }) }; self.state = match try_connect.await { @@ -154,9 +150,12 @@ impl Connection { loop { match self.state { State::Connected { - ref mut resp_bi_stream, + ref mut bi_receiver, .. - } => match Pin::new(resp_bi_stream).next().await { + } => match Pin::new(bi_receiver.to_owned().lock().unwrap()) + .next() + .await + { Some(Ok(payload)) => return payload, Some(Err(status)) => { println!("error from stream {}", status); @@ -184,8 +183,8 @@ impl Connection { match self.state { State::Connected { ref mut client, .. } => { let req_payload = payload_helper::build_req_grpc_payload(req); - let resp_payload = client.request(tonic::Request::new(req_payload)).await?; - payload_helper::build_server_response(resp_payload.into_inner()) + let resp_payload = client.request(&req_payload); + payload_helper::build_server_response(resp_payload.unwrap()) } State::Disconnected(_) => { self.connect().await; @@ -196,12 +195,19 @@ impl Connection { } } - pub(crate) async fn send_resp(&mut self, resp: impl Response + serde::Serialize) -> () { + pub(crate) async fn send_resp(&mut self, resp: impl Response + serde::Serialize) { match self.state { State::Connected { ref mut bi_sender, .. } => bi_sender - .send(payload_helper::build_resp_grpc_payload(resp)) + .to_owned() + .lock() + .unwrap() + .send(( + payload_helper::build_resp_grpc_payload(resp), + grpcio::WriteFlags::default(), + )) + .await .unwrap(), State::Disconnected(_) => self.connect().await, } @@ -219,20 +225,17 @@ mod tests { tracing_subscriber::fmt::init(); println!("test_remote_connect"); let mut remote_connect = - Connection::new(ClientConfig::new().server_addr("http://0.0.0.0:9848".to_string())); + Connection::new(ClientConfig::new().server_addr("0.0.0.0:9848".to_string())); remote_connect.connect().await; - println!("{:?}", remote_connect.state) } #[tokio::test] async fn test_next_payload() { println!("test_next_payload"); let mut remote_connect = - Connection::new(ClientConfig::new().server_addr("http://0.0.0.0:9848".to_string())); - let payload = remote_connect.next_payload().await; + Connection::new(ClientConfig::new().server_addr("0.0.0.0:9848".to_string())); let payload = remote_connect.next_payload().await; - // let server_req = payload_helper::build_server_request(payload).unwrap(); - println!("{:?}", remote_connect.state); + let server_req = payload_helper::build_server_request(payload).unwrap(); } #[tokio::test] diff --git a/src/common/remote/mod.rs b/src/common/remote/mod.rs index abfccdb..b01972b 100644 --- a/src/common/remote/mod.rs +++ b/src/common/remote/mod.rs @@ -3,9 +3,4 @@ pub(crate) mod remote_client; pub(crate) mod request; pub(crate) mod response; -#[tonic::async_trait] pub(crate) trait RemoteClient {} - -pub(crate) trait PayloadConverter { - fn convert_to_grpc_payload() -> crate::nacos_proto::v2::Payload; -} diff --git a/src/common/remote/remote_client.rs b/src/common/remote/remote_client.rs index ea40e44..d49063d 100644 --- a/src/common/remote/remote_client.rs +++ b/src/common/remote/remote_client.rs @@ -1,164 +1,28 @@ -use std::collections::HashMap; -use std::sync::{Arc, Mutex}; - -use tonic::{ - async_trait, - transport::{Channel, Endpoint}, -}; - use crate::api::client_config::ClientConfig; -use crate::api::constants::DEFAULT_SERVER_ADDR; -use crate::api::error::Error; -use crate::common::remote::request::client_request::{ - ConnectionSetupClientRequest, ServerCheckClientRequest, -}; -use crate::common::util::*; -use crate::nacos_proto::v2::bi_request_stream_client::BiRequestStreamClient; -use crate::nacos_proto::v2::request_client::RequestClient; -use crate::nacos_proto::v2::Payload; use super::RemoteClient; pub(crate) struct GrpcRemoteClient { - client_name: Option, - tenant: String, - labels: HashMap, - client: RequestClient, - bi_client: BiRequestStreamClient, - resp_bi_stream: Arc>>, + client_config: ClientConfig, } impl GrpcRemoteClient { - /// Sets the request_client against. - pub fn request_client(self, client: RequestClient) -> Self { - GrpcRemoteClient { client, ..self } - } - - /// Sets the request_client against. - pub fn bi_client(self, bi_client: BiRequestStreamClient) -> Self { - GrpcRemoteClient { bi_client, ..self } - } - - /// Sets the resp_bi_stream against. - /*pub fn resp_bi_stream(self, resp_bi_stream: Arc>) -> Self { - GrpcRemoteClient { - resp_bi_stream, - ..self - } - }*/ - - pub async fn new(client_config: ClientConfig) -> crate::api::error::Result { - let client_name = client_config.client_name; - let tenant = client_config.namespace; - let labels = client_config.labels; - let address = client_config - .server_addr - .unwrap_or(String::from(DEFAULT_SERVER_ADDR)); - - let (client, bi_client, resp_bi_stream) = - Self::connect_to_server(address.clone(), tenant.clone(), labels.clone()) - .await - .unwrap(); - - let resp_bi_stream = Arc::new(Mutex::new(resp_bi_stream)); - - let mut resp_bi_stream_clone = Arc::clone(&resp_bi_stream); - /*let thread = tokio::spawn(async move { - loop { - while let Some(received) = resp_bi_stream_clone.next().await { - let payload = received.unwrap(); - } - } - });*/ - // let threads = vec![thread]; - - let remote_client = Self { - client_name, - tenant, - labels, - client, - bi_client, - resp_bi_stream, - }; - - Ok(remote_client) - } - - async fn connect_to_server( - address: String, - tenant: String, - labels: HashMap, - ) -> crate::api::error::Result<( - RequestClient, - BiRequestStreamClient, - tonic::codec::Streaming, - )> { - let endpoint = Endpoint::new(address)?; - let channel = endpoint.connect().await?; - let mut client = RequestClient::new(channel.clone()); - - let req_payload = payload_helper::build_req_grpc_payload(ServerCheckClientRequest::new()); - let resp_future = client.request(tonic::Request::new(req_payload)); - match resp_future.await { - Ok(response) => { - let resp_payload = response.into_inner(); - let server_check_response = - payload_helper::build_server_response(resp_payload).unwrap(); - let conn_id = server_check_response.get_connection_id(); - let mut bi_client = BiRequestStreamClient::new(channel.clone()); - let mut resp_bi_stream = bi_client - .request_bi_stream(Self::stream_once_connection_setup_request( - ConnectionSetupClientRequest::new(tenant, labels), - )) - .await - .unwrap() - .into_inner(); - Ok((client, bi_client, resp_bi_stream)) - } - Err(e) => Err(Error::TonicStatus(e)), - } - } - - fn stream_once_connection_setup_request( - connection_setup_request: ConnectionSetupClientRequest, - ) -> impl tonic::codegen::futures_core::Stream { - tokio_stream::once(payload_helper::build_req_grpc_payload( - connection_setup_request, - )) + pub fn new(client_config: ClientConfig) -> Self { + Self { client_config } } } -#[async_trait] impl RemoteClient for GrpcRemoteClient {} #[cfg(test)] mod tests { - use std::collections::HashMap; - use std::thread::sleep; - use std::time::Duration; - - use tokio_stream::StreamExt; use crate::api::client_config::ClientConfig; use crate::common::remote::remote_client::GrpcRemoteClient; - use crate::common::remote::RemoteClient; - use crate::common::util::payload_helper; // #[tokio::test] async fn test_grpc_remote_client() { - let mut remote_client = GrpcRemoteClient::new( - ClientConfig::new().server_addr("http://0.0.0.0:9848".to_string()), - ) - .await - .unwrap(); - - // wait resp_bi_stream - sleep(Duration::from_secs(2)); - - let mut resp_bi_stream_clone = remote_client.resp_bi_stream.clone(); - while let Some(received) = resp_bi_stream_clone.lock().unwrap().next().await { - let payload = received.unwrap(); - let server_req = payload_helper::build_server_request(payload); - } + let remote_client = + GrpcRemoteClient::new(ClientConfig::new().server_addr("0.0.0.0:9848".to_string())); } } diff --git a/src/common/remote/request/client_request.rs b/src/common/remote/request/client_request.rs index abe697c..0c41488 100644 --- a/src/common/remote/request/client_request.rs +++ b/src/common/remote/request/client_request.rs @@ -1,8 +1,5 @@ #![allow(non_snake_case)] -use crate::common::remote::request::{ - generate_request_id, Request, TYPE_CONNECT_RESET_SERVER_REQUEST, - TYPE_CONNECT_SETUP_SERVER_REQUEST, TYPE_SERVER_CHECK_CLIENT_REQUEST, -}; +use crate::common::remote::request::*; use serde::{Deserialize, Serialize}; use std::collections::HashMap; @@ -53,7 +50,7 @@ impl Request for ConnectionSetupClientRequest { &self.headers } fn get_type_url(&self) -> &String { - &TYPE_CONNECT_SETUP_SERVER_REQUEST + &TYPE_CONNECT_SETUP_CLIENT_REQUEST } } @@ -73,35 +70,3 @@ impl ConnectionSetupClientRequest { ConnectionSetupClientRequest { labels, ..self } } } - -#[derive(Debug, Serialize, Deserialize)] -pub(crate) struct ConnectResetRequest { - requestId: String, - /// count be empty. - headers: HashMap, - serverIp: String, - serverPort: String, -} - -impl Request for ConnectResetRequest { - fn get_request_id(&self) -> &String { - &self.requestId - } - fn get_headers(&self) -> &HashMap { - &self.headers - } - fn get_type_url(&self) -> &String { - &TYPE_CONNECT_RESET_SERVER_REQUEST - } -} - -impl ConnectResetRequest { - pub fn new(server_ip: String, server_port: String) -> Self { - ConnectResetRequest { - requestId: generate_request_id(), - headers: HashMap::new(), - serverIp: server_ip, - serverPort: server_port, - } - } -} diff --git a/src/common/remote/request/mod.rs b/src/common/remote/request/mod.rs index ac2ca1f..425de48 100644 --- a/src/common/remote/request/mod.rs +++ b/src/common/remote/request/mod.rs @@ -3,6 +3,7 @@ use std::collections::HashMap; use std::sync::atomic::{AtomicI64, Ordering}; pub(crate) mod client_request; +pub(crate) mod server_request; pub(crate) trait Request { fn get_request_id(&self) -> &String; @@ -18,11 +19,14 @@ lazy_static! { pub static ref TYPE_SERVER_CHECK_CLIENT_REQUEST: String = String::from("ServerCheckRequest"); /// com.alibaba.nacos.api.remote.request.ConnectionSetupRequest - pub static ref TYPE_CONNECT_SETUP_SERVER_REQUEST: String = String::from("ConnectionSetupRequest"); + pub static ref TYPE_CONNECT_SETUP_CLIENT_REQUEST: String = String::from("ConnectionSetupRequest"); /// com.alibaba.nacos.api.remote.request.ConnectResetRequest pub static ref TYPE_CONNECT_RESET_SERVER_REQUEST: String = String::from("ConnectResetRequest"); + /// com.alibaba.nacos.api.remote.request.ClientDetectionRequest + pub static ref TYPE_CLIENT_DETECTION_SERVER_REQUEST: String = String::from("ClientDetectionRequest"); + } // odd by client request id. diff --git a/src/common/remote/request/server_request.rs b/src/common/remote/request/server_request.rs new file mode 100644 index 0000000..3e5b392 --- /dev/null +++ b/src/common/remote/request/server_request.rs @@ -0,0 +1,64 @@ +#![allow(non_snake_case)] +use crate::common::remote::request::*; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; + +#[derive(Debug, Serialize, Deserialize)] +pub(crate) struct ConnectResetServerRequest { + requestId: String, + /// count be empty. + headers: HashMap, + serverIp: Option, + serverPort: Option, +} + +impl Request for ConnectResetServerRequest { + fn get_request_id(&self) -> &String { + &self.requestId + } + fn get_headers(&self) -> &HashMap { + &self.headers + } + fn get_type_url(&self) -> &String { + &TYPE_CONNECT_RESET_SERVER_REQUEST + } +} + +impl ConnectResetServerRequest { + pub fn new(server_ip: Option, server_port: Option) -> Self { + ConnectResetServerRequest { + requestId: generate_request_id(), + headers: HashMap::new(), + serverIp: server_ip, + serverPort: server_port, + } + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub(crate) struct ClientDetectionServerRequest { + requestId: String, + /// count be empty. + headers: HashMap, +} + +impl Request for ClientDetectionServerRequest { + fn get_request_id(&self) -> &String { + &self.requestId + } + fn get_headers(&self) -> &HashMap { + &self.headers + } + fn get_type_url(&self) -> &String { + &TYPE_CLIENT_DETECTION_SERVER_REQUEST + } +} + +impl ClientDetectionServerRequest { + pub fn new() -> Self { + ClientDetectionServerRequest { + requestId: generate_request_id(), + headers: HashMap::new(), + } + } +} diff --git a/src/common/remote/response/mod.rs b/src/common/remote/response/mod.rs index 37a62ce..624d5fb 100644 --- a/src/common/remote/response/mod.rs +++ b/src/common/remote/response/mod.rs @@ -1,5 +1,4 @@ use lazy_static::lazy_static; -use serde::{Deserialize, Serialize}; pub(crate) mod server_response; diff --git a/src/common/remote/response/server_response.rs b/src/common/remote/response/server_response.rs index fd41bb3..7f0b058 100644 --- a/src/common/remote/response/server_response.rs +++ b/src/common/remote/response/server_response.rs @@ -1,7 +1,5 @@ #![allow(non_snake_case)] -use crate::common::remote::response::{ - Response, TYPE_ERROR_SERVER_RESPONSE, TYPE_SERVER_CHECK_SERVER_RESPONSE, -}; +use crate::common::remote::response::*; use serde::{Deserialize, Serialize}; #[derive(Debug, Serialize, Deserialize)] diff --git a/src/common/util/payload_helper.rs b/src/common/util/payload_helper.rs index 5df30fc..feaedf8 100644 --- a/src/common/util/payload_helper.rs +++ b/src/common/util/payload_helper.rs @@ -1,9 +1,7 @@ -use crate::common::remote::request::client_request::ConnectResetRequest; -use crate::common::remote::request::{Request, LOCAL_IP, TYPE_CONNECT_RESET_SERVER_REQUEST}; -use crate::common::remote::response::server_response::{ErrorResponse, ServerCheckServerResponse}; -use crate::common::remote::response::{ - Response, TYPE_ERROR_SERVER_RESPONSE, TYPE_SERVER_CHECK_SERVER_RESPONSE, -}; +use crate::common::remote::request::server_request::*; +use crate::common::remote::request::*; +use crate::common::remote::response::server_response::*; +use crate::common::remote::response::*; use crate::nacos_proto::v2::{Metadata, Payload}; use serde::Serialize; @@ -69,7 +67,11 @@ pub(crate) fn build_server_request( println!("build_server_request {} with {}", type_url, body_str); tracing::debug!("build_server_request {} with {}", type_url, body_str); if TYPE_CONNECT_RESET_SERVER_REQUEST.eq(&type_url) { - let de: ConnectResetRequest = serde_json::from_str(body_str.as_str())?; + let de: ConnectResetServerRequest = serde_json::from_str(body_str.as_str())?; + return Ok(Box::new(de)); + } + if TYPE_CLIENT_DETECTION_SERVER_REQUEST.eq(&type_url) { + let de: ClientDetectionServerRequest = serde_json::from_str(body_str.as_str())?; return Ok(Box::new(de)); } Err(crate::api::error::Error::Deserialization(type_url)) diff --git a/src/lib.rs b/src/lib.rs index be32349..5db13a4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -20,21 +20,18 @@ mod common; mod nacos_proto { pub mod v2 { - tonic::include_proto!("nacos.v2"); + include!("_.rs"); } } #[cfg(test)] mod tests { - use prost_types::Any; + use crate::nacos_proto::v2::{Metadata, Payload}; use std::collections::HashMap; - use crate::nacos_proto::v2::Metadata; - use crate::nacos_proto::v2::Payload; - #[test] fn it_works_nacos_proto() { - let body = Any { + let body = prost_types::Any { type_url: String::new(), value: Vec::from("{\"cluster\":\"DEFAULT\",\"healthyOnly\":true}"), }; @@ -47,7 +44,7 @@ mod tests { metadata: Some(metadata), body: Some(body), }; - println!("{:?}", payload); + // println!("{:?}", payload); assert_eq!( payload.metadata.unwrap().r#type, "com.alibaba.nacos.api.naming.remote.request.ServiceQueryRequest"