Skip to content

Move peer discovery from "services" to "peers-clear" #122

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ error-chain = "0.12"
parking_lot = "0.9"
pwhash = "0.3"
serde = { version = "1.0", features = ["derive"], optional = true }
logos = "0.12.0"

[features]
serialization = ["serde"]
Expand Down
1 change: 1 addition & 0 deletions src/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub mod node;
pub mod node_validator;
pub mod partition;
pub mod partition_tokenizer;
pub mod peers;

use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, AtomicIsize, Ordering};
Expand Down
36 changes: 5 additions & 31 deletions src/cluster/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ use crate::errors::{ErrorKind, Result, ResultExt};
use crate::net::{ConnectionPool, Host, PooledConnection};
use crate::policy::ClientPolicy;

use super::peers::parse_peers_info;

pub const PARTITIONS: usize = 4096;

#[derive(Debug)]
Expand Down Expand Up @@ -138,9 +140,9 @@ impl Node {

const fn services_name(&self) -> &'static str {
if self.client_policy.use_services_alternate {
"services-alternate"
"peers-clear-alt"
} else {
"services"
"peers-clear-std"
}
}

Expand Down Expand Up @@ -197,35 +199,7 @@ impl Node {
Some(friend_string) => friend_string,
};

let friend_names = friend_string.split(';');
for friend in friend_names {
let mut friend_info = friend.split(':');
if friend_info.clone().count() != 2 {
error!(
"Node info from asinfo:services is malformed. Expected HOST:PORT, but got \
'{}'",
friend
);
continue;
}

let host = friend_info.next().unwrap();
let port = u16::from_str(friend_info.next().unwrap())?;
let alias = match self.client_policy.ip_map {
Some(ref ip_map) if ip_map.contains_key(host) => {
Host::new(ip_map.get(host).unwrap(), port)
}
_ => Host::new(host, port),
};

if current_aliases.contains_key(&alias) {
self.reference_count.fetch_add(1, Ordering::Relaxed);
} else if !friends.contains(&alias) {
friends.push(alias);
}
}

Ok(friends)
parse_peers_info(friend_string)
}

fn update_partitions(&self, info_map: &HashMap<String, String>) -> Result<()> {
Expand Down
128 changes: 128 additions & 0 deletions src/cluster/peers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
use crate::errors::{ErrorKind, Result};
use crate::net::{Host, ToHosts};
use logos::{Lexer, Logos};

#[derive(Logos, Debug, PartialEq)]
enum Token {
#[token("[")]
OpenBracket,

#[token("]")]
CloseBracket,

#[regex("[0-9a-zA-Z-./_: ]+")]
Text,

#[error]
#[regex(r"[,]+", logos::skip)]
Error,
}

fn parse_error(lex: &Lexer<Token>, source: &str) -> String {
format!(
"Failed to parse peers: {}, at {:?} ({})",
source,
lex.span(),
lex.slice()
)
}

pub fn parse_peers_info(info_peers: &str) -> Result<Vec<Host>> {
let mut lex = Token::lexer(info_peers);

let _peer_gen = match lex.next() {
Some(Token::Text) => lex.slice(),
_ => bail!(ErrorKind::BadResponse(parse_error(&lex, info_peers))),
};
let default_port_str = match lex.next() {
Some(Token::Text) => lex.slice(),
_ => bail!(ErrorKind::BadResponse(parse_error(&lex, info_peers))),
};

let default_port = match default_port_str.parse::<u16>() {
Ok(port) => port,
Err(_) => bail!(ErrorKind::BadResponse(format!(
"Invalid default port: {}",
default_port_str
))),
};

match lex.next() {
Some(Token::OpenBracket) => parse_peers(info_peers, &mut lex, default_port),
_ => Ok(Vec::new()),
}
}

fn parse_peers(info_peers: &str, lex: &mut Lexer<Token>, default_port: u16) -> Result<Vec<Host>> {
let mut peers = Vec::new();
loop {
match lex.next() {
Some(Token::OpenBracket) => peers.extend(parse_peer(info_peers, lex, default_port)?),
Some(Token::CloseBracket) => return Ok(peers),
_ => bail!(ErrorKind::BadResponse(parse_error(&lex, info_peers))),
}
lex.next(); // Close brackets
}
}

fn parse_peer(info_peers: &str, lex: &mut Lexer<Token>, default_port: u16) -> Result<Vec<Host>> {
let _id = match lex.next() {
Some(Token::Text) => lex.slice(),
_ => bail!(ErrorKind::BadResponse(parse_error(&lex, info_peers))),
};

let mut token = lex.next();
if Some(Token::Text) == token {
let _tls_hostname = lex.slice();
token = lex.next();
}

match token {
Some(Token::OpenBracket) => (),
_ => bail!(ErrorKind::BadResponse(parse_error(&lex, info_peers))),
};

let hosts = match lex.next() {
Some(Token::Text) => lex.slice(),
_ => bail!(ErrorKind::BadResponse(parse_error(&lex, info_peers))),
}
.to_hosts_with_default_port(default_port)?;

lex.next(); // Close brackets
Ok(hosts)
}

#[cfg(test)]
mod tests {
use std::vec;

use super::*;

#[test]
fn parse_peers_works() {
let work = "6,3000,[[12A0,aerospike.com,[1.2.3.4:4333]],[BB9040011AC4202,,[10.11.12.13]],[11A1,,[localhost]]]";
let fail = "6,3foobar,[[12A0,aerospike.com,[1.2.3.4:4333]],[11A1,,[10.11.12.13:4333]]]";
let empty = "6,3000,[]";
assert!(parse_peers_info(fail).is_err());
let work = parse_peers_info(work).unwrap();
println!("{:?}", work);
assert!(
work == vec![
Host {
name: "1.2.3.4".to_string(),
port: 4333
},
Host {
name: "10.11.12.13".to_string(),
port: 3000
},
Host {
name: "localhost".to_string(),
port: 3000
}
]
);
let empty = parse_peers_info(empty).unwrap();
assert!(empty == vec![]);
}
}
12 changes: 11 additions & 1 deletion src/net/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,17 +67,24 @@ pub trait ToHosts {
///
/// Any errors encountered during conversion will be returned as an `Err`.
fn to_hosts(&self) -> Result<Vec<Host>>;
fn to_hosts_with_default_port(&self, default_port: u16) -> Result<Vec<Host>>;
}

impl ToHosts for Vec<Host> {
fn to_hosts(&self) -> Result<Vec<Host>> {
Ok(self.clone())
}
fn to_hosts_with_default_port(&self, default_port: u16) -> Result<Vec<Host>> {
Ok(self.clone())
}
}

impl ToHosts for String {
fn to_hosts(&self) -> Result<Vec<Host>> {
let mut parser = Parser::new(self, 3000);
self.to_hosts_with_default_port(3000)
}
fn to_hosts_with_default_port(&self, default_port: u16) -> Result<Vec<Host>> {
let mut parser = Parser::new(self, default_port);
parser
.read_hosts()
.chain_err(|| ErrorKind::InvalidArgument(format!("Invalid hosts list: '{}'", self)))
Expand All @@ -88,6 +95,9 @@ impl<'a> ToHosts for &'a str {
fn to_hosts(&self) -> Result<Vec<Host>> {
(*self).to_string().to_hosts()
}
fn to_hosts_with_default_port(&self, default_port: u16) -> Result<Vec<Host>> {
(*self).to_string().to_hosts_with_default_port(default_port)
}
}

#[cfg(test)]
Expand Down