Skip to content

Commit

Permalink
fix: customize grpc_port (#159)
Browse files Browse the repository at this point in the history
  • Loading branch information
CherishCai authored May 13, 2023
1 parent 64dfca0 commit 615f5de
Show file tree
Hide file tree
Showing 6 changed files with 5 additions and 120 deletions.
1 change: 0 additions & 1 deletion src/api/naming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@ pub trait NamingEventListener: Send + Sync + 'static {
#[doc(alias("naming", "sdk", "api"))]
#[cfg(not(feature = "async"))]
pub trait NamingService {

fn register_instance(
&self,
service_name: String,
Expand Down
4 changes: 2 additions & 2 deletions src/common/remote/grpc/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ impl GrpcConfiguration {
self
}

pub(crate) fn with_port(mut self, port: u32) -> Self {
self.port = Some(port);
pub(crate) fn with_port(mut self, port: Option<u32>) -> Self {
self.port = port;
self
}

Expand Down
2 changes: 1 addition & 1 deletion src/common/remote/grpc/nacos_grpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ impl NacosGrpcClientBuilder {
}
}

pub(crate) fn port(self, port: u32) -> Self {
pub(crate) fn port(self, port: Option<u32>) -> Self {
let grpc_config = self.grpc_config.with_port(port);
Self {
grpc_config,
Expand Down
115 changes: 0 additions & 115 deletions src/common/remote/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
pub mod grpc;

use crate::api::error::Error::WrongServerAddress;
use crate::api::error::Result;
use rand::prelude::SliceRandom;
use std::sync::atomic::{AtomicI64, Ordering};

// odd by client request id.
Expand All @@ -17,115 +14,3 @@ pub(crate) fn generate_request_id() -> String {
}
seq.to_string()
}

/// make address's port plus 1000
#[allow(clippy::get_first)]
pub(crate) fn into_grpc_server_addr(
address: &str,
shuffle: bool,
grpc_port: Option<u32>,
) -> Result<String> {
let mut hosts = address.split(',').collect::<Vec<&str>>();
if hosts.is_empty() {
return Err(WrongServerAddress(address.into()));
}

if shuffle && hosts.len() > 1 {
// shuffle for grpcio LbPolicy::PickFirst, It is a sequential attempt to link, so reorder to balance the load as much as possible.
hosts.shuffle(&mut rand::thread_rng());
}

let mut result = vec![];
for host in hosts {
let host_port_pair = host.split(':').collect::<Vec<&str>>();
if host_port_pair.len() != 2 {
return Err(WrongServerAddress(address.into()));
}

let host = host_port_pair.get(0);
let port = host_port_pair.get(1);
if host.is_none() || (port.is_none() && grpc_port.is_none()) {
return Err(WrongServerAddress(address.into()));
}

let port = if let Some(gport) = grpc_port {
gport
} else {
port.unwrap()
.parse::<u32>()
.map(|port| port + 1000)
.map_err(|_| WrongServerAddress(address.into()))?
};

result.push(format!("{}:{}", host.unwrap(), port));
}

match result.len() {
0 => Err(WrongServerAddress(address.into())),
1 => Ok(result.get(0).unwrap().to_string()),
_ => Ok(format!("ipv4:{}", result.join(","))),
}
}

#[cfg(test)]
mod tests {
use crate::common::remote::into_grpc_server_addr;

#[test]
fn test_empty_address() {
match into_grpc_server_addr("", false, None) {
Ok(_) => assert!(false),
Err(_) => assert!(true),
}
}

#[test]
fn test_host_address_without_port() {
match into_grpc_server_addr("127.0.0.1", false, None) {
Ok(_) => assert!(false),
Err(_) => assert!(true),
}
}

#[test]
fn test_host_addresses_without_one_port() {
match into_grpc_server_addr("127.0.0.1:8848,127.0.0.1", false, None) {
Ok(_) => assert!(false),
Err(_) => assert!(true),
}
}

#[test]
fn test_single_host_address() {
let addr = "127.0.0.1:8848";
let expected = "127.0.0.1:9848";
let result = into_grpc_server_addr(addr, false, None).unwrap();
assert_eq!(expected, result);
}

#[test]
fn test_single_host_address_with_grpc_port() {
let grpc_port = Some(9838);
let addr = "127.0.0.1:8848";
let expected = "127.0.0.1:9838";
let result = into_grpc_server_addr(addr, false, grpc_port).unwrap();
assert_eq!(expected, result);
}

#[test]
fn test_multiple_ipv4_address() {
let addr = "127.0.0.1:8848,127.0.0.1:8849,127.0.0.1:8850";
let expected = "ipv4:127.0.0.1:9848,127.0.0.1:9849,127.0.0.1:9850";
let result = into_grpc_server_addr(addr, false, None).unwrap();
assert_eq!(expected, result);
}

#[test]
fn test_multiple_ipv4_address_with_grpc_port() {
let grpc_port = Some(9838);
let addr = "127.0.0.1:8848,127.0.0.1:8849,127.0.0.1:8850";
let expected = "ipv4:127.0.0.1:9838,127.0.0.1:9838,127.0.0.1:9838";
let result = into_grpc_server_addr(addr, false, grpc_port).unwrap();
assert_eq!(expected, result);
}
}
1 change: 1 addition & 0 deletions src/config/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ impl ConfigWorker {
let notify_change_tx_clone = notify_change_tx.clone();

let remote_client = NacosGrpcClientBuilder::new(client_props.get_server_list()?)
.port(client_props.grpc_port)
.namespace(client_props.namespace.clone())
.app_name(client_props.app_name.clone())
.client_version(client_props.client_version.clone())
Expand Down
2 changes: 1 addition & 1 deletion src/naming/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ impl NacosNamingService {
));

let nacos_grpc_client = NacosGrpcClientBuilder::new(server_list.to_vec())
.port(client_props.grpc_port)
.namespace(namespace.clone())
.client_version(client_props.client_version)
.support_remote_connection(true)
Expand Down Expand Up @@ -572,7 +573,6 @@ impl NacosNamingService {

#[cfg(not(feature = "async"))]
impl NamingService for NacosNamingService {

#[instrument(fields(client_id = &self.client_id, group = group_name, service_name = service_name), skip_all)]
fn deregister_instance(
&self,
Expand Down

0 comments on commit 615f5de

Please sign in to comment.