Skip to content

Rack Awareness, Sequence Policies, More complex policies and other Fixes #140

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 26 commits into
base: async
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
a371fc4
Fixed some bugs in Msgpack encoding
CMoore-Darwinium Sep 9, 2021
e57f93f
Removed undue casts to signle byte values
CMoore-Darwinium Sep 29, 2021
8c2910d
Convert list flags to use bitmasks also.
CMoore-Darwinium Mar 2, 2022
007d965
Use traits to allow single enums or groups of enums to be passed to m…
CMoore-Darwinium Mar 24, 2022
ff5c0eb
Converted maps and HLL to follow the new pattern of allowing multiple…
CMoore-Darwinium Mar 24, 2022
66d52cb
Fixed comments
CMoore-Darwinium Mar 24, 2022
f8c2558
Updated upstream dependencies to latest versions. As scoped-pool is n…
CMoore-Darwinium Sep 16, 2022
998eb2a
Fixed merge issues
CMoore-Darwinium Oct 26, 2022
fd1a882
Updated dependencies again
CMoore-Darwinium Oct 26, 2022
36189d9
Updated dependencies yet once more
CMoore-Darwinium Oct 26, 2022
8577254
Attempted to fix issues with reading/writing to the wrong node.
CMoore-Darwinium Nov 28, 2022
275da09
Simplified error handling in batches.
CMoore-Darwinium Nov 28, 2022
23ab1e5
Added debug message
CMoore-Darwinium Nov 28, 2022
37f0b4f
Fixed ordering of batch reads. Made partitions match C implementation
CMoore-Darwinium Nov 28, 2022
796a213
Removed debug statements.
CMoore-Darwinium Nov 28, 2022
fbef7ed
Added parralel and almost sequential mode.
CMoore-Darwinium Nov 28, 2022
dd1d12e
Implemented Sequence and PreferRack replica policies.
CMoore-Darwinium Dec 6, 2022
0a5b76c
Fixed reigime handling
CMoore-Darwinium Dec 6, 2022
92dbbfc
Revert private changes
CMoore-Darwinium Dec 6, 2022
5874717
Fixed warnings, fixed important bug leaking connections in pool
CMoore-Darwinium Jul 5, 2023
8e8d467
Fixed reintroduced issues in merge
CMoore-Darwinium Jul 5, 2023
3661913
Cleaned up node
CMoore-Darwinium Jul 5, 2023
4b1dc88
Pool isn't useful
CMoore-Darwinium Jul 5, 2023
0071d9b
Use older version of clap
CMoore-Darwinium Jul 5, 2023
c503b13
Fixed benchmarks
CMoore-Darwinium Jul 5, 2023
c9d6dc7
Merge branch 'async' into caleb-on-top-of-async
CMoore-Darwinium Oct 27, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ aerospike-sync = {path = "./aerospike-sync", optional = true}
aerospike-macro = {path = "./aerospike-macro", optional = true}

[features]
default = ["async", "serialization"]
default = ["async", "serialization", "rt-tokio"]
serialization = ["aerospike-core/serialization"]
async = ["aerospike-core"]
sync = ["aerospike-sync"]
Expand All @@ -50,6 +50,7 @@ bencher = "0.1"
serde_json = "1.0"
rand = "0.7"
lazy_static = "1.4"
ripemd = "0.1"
aerospike-macro = {path = "./aerospike-macro"}
aerospike-rt = {path = "./aerospike-rt"}
futures = {version = "0.3.16" }
Expand Down
12 changes: 6 additions & 6 deletions aerospike-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ edition = "2018"
[dependencies]
log = "0.4"
byteorder = "1.3"
ripemd160 = "0.8"
base64 = "0.11"
crossbeam-queue = "0.2"
rand = "0.7"
ripemd = "0.1"
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ripemd160 was not maintained last time I checked and we were getting audit warnings

base64 = "0.13"
crossbeam-queue = "0.3"
rand = "0.8"
lazy_static = "1.4"
error-chain = "0.12"
pwhash = "0.3"
pwhash = "1.0"
serde = { version = "1.0", features = ["derive"], optional = true }
aerospike-rt = {path = "../aerospike-rt"}
futures = {version = "0.3.16" }
Expand All @@ -26,7 +26,7 @@ rt-tokio = ["aerospike-rt/rt-tokio"]
rt-async-std = ["aerospike-rt/rt-async-std"]

[dev-dependencies]
env_logger = "0.9.3"
env_logger = "0.9"
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Allow other versions of env_logger means it can keep up to date.

hex = "0.4"
bencher = "0.1"
serde_json = "1.0"
Expand Down
75 changes: 20 additions & 55 deletions aerospike-core/src/batch/batch_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,16 @@
// License for the specific language governing permissions and limitations under
// the License.

use std::cmp;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::{Arc, Weak};

use crate::batch::BatchRead;
use crate::cluster::partition::Partition;
use crate::cluster::{Cluster, Node};
use crate::commands::BatchReadCommand;
use crate::errors::{Error, Result};
use crate::errors::Result;
use crate::policy::{BatchPolicy, Concurrency};
use crate::Key;
use futures::lock::Mutex;

pub struct BatchExecutor {
cluster: Arc<Cluster>,
Expand All @@ -40,80 +38,47 @@ impl BatchExecutor {
policy: &BatchPolicy,
batch_reads: Vec<BatchRead>,
) -> Result<Vec<BatchRead>> {
let mut batch_nodes = self.get_batch_nodes(&batch_reads).await?;
let batch_nodes = self.get_batch_nodes(&batch_reads, policy.replica).await?;
let jobs = batch_nodes
.drain()
.into_iter()
.map(|(node, reads)| BatchReadCommand::new(policy, node, reads))
.collect();
let reads = self.execute_batch_jobs(jobs, &policy.concurrency).await?;
let mut res: Vec<BatchRead> = vec![];
for mut read in reads {
res.append(&mut read.batch_reads);
}
Ok(res)
let reads = self.execute_batch_jobs(jobs, policy.concurrency).await?;
let mut all_results: Vec<_> = reads.into_iter().flat_map(|cmd|cmd.batch_reads).collect();
all_results.sort_by_key(|(_, i)|*i);
Ok(all_results.into_iter().map(|(b, _)|b).collect())
}

async fn execute_batch_jobs(
&self,
jobs: Vec<BatchReadCommand>,
concurrency: &Concurrency,
concurrency: Concurrency,
) -> Result<Vec<BatchReadCommand>> {
let threads = match *concurrency {
Concurrency::Sequential => 1,
Concurrency::Parallel => jobs.len(),
Concurrency::MaxThreads(max) => cmp::min(max, jobs.len()),
};
let size = jobs.len() / threads;
let mut overhead = jobs.len() % threads;
let last_err: Arc<Mutex<Option<Error>>> = Arc::default();
let mut slice_index = 0;
let mut handles = vec![];
let res = Arc::new(Mutex::new(vec![]));
for _ in 0..threads {
let mut thread_size = size;
if overhead >= 1 {
thread_size += 1;
overhead -= 1;
}
let slice = Vec::from(&jobs[slice_index..slice_index + thread_size]);
slice_index = thread_size + 1;
let last_err = last_err.clone();
let res = res.clone();
let handle = aerospike_rt::spawn(async move {
//let next_job = async { jobs.lock().await.next().await};
for mut cmd in slice {
if let Err(err) = cmd.execute().await {
*last_err.lock().await = Some(err);
};
res.lock().await.push(cmd);
}
});
handles.push(handle);
}
futures::future::join_all(handles).await;
match Arc::try_unwrap(last_err).unwrap().into_inner() {
None => Ok(res.lock().await.to_vec()),
Some(err) => Err(err),
let handles = jobs.into_iter().map(|job|job.execute(self.cluster.clone()));
match concurrency {
Concurrency::Sequential => futures::future::join_all(handles).await.into_iter().collect(),
Concurrency::Parallel => futures::future::join_all(handles.map(aerospike_rt::spawn)).await.into_iter().map(|value|value.map_err(|e|e.to_string())?).collect(),
}
}

async fn get_batch_nodes(
&self,
batch_reads: &[BatchRead],
) -> Result<HashMap<Arc<Node>, Vec<BatchRead>>> {
replica: crate::policy::Replica,
) -> Result<HashMap<Arc<Node>, Vec<(BatchRead, usize)>>> {
let mut map = HashMap::new();
for (_, batch_read) in batch_reads.iter().enumerate() {
let node = self.node_for_key(&batch_read.key).await?;
for (index, batch_read) in batch_reads.iter().enumerate() {
let node = self.node_for_key(&batch_read.key, replica).await?;
map.entry(node)
.or_insert_with(Vec::new)
.push(batch_read.clone());
.push((batch_read.clone(), index));
}
Ok(map)
}

async fn node_for_key(&self, key: &Key) -> Result<Arc<Node>> {
async fn node_for_key(&self, key: &Key, replica: crate::policy::Replica) -> Result<Arc<Node>> {
let partition = Partition::new_by_key(key);
let node = self.cluster.get_node(&partition).await?;
let node = self.cluster.get_node(&partition, replica, Weak::new()).await?;
Ok(node)
}
}
2 changes: 1 addition & 1 deletion aerospike-core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ impl Client {
T: Into<Bins> + Send + Sync + 'static,
{
let bins = bins.into();
let mut command = ReadCommand::new(policy, self.cluster.clone(), key, bins);
let mut command = ReadCommand::new(&policy.base_policy, self.cluster.clone(), key, bins, policy.replica);
command.execute().await?;
Ok(command.record.unwrap())
}
Expand Down
132 changes: 100 additions & 32 deletions aerospike-core/src/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub mod partition_tokenizer;
use aerospike_rt::time::{Duration, Instant};
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, AtomicIsize, Ordering};
use std::sync::Arc;
use std::sync::{Arc, Weak};
use std::vec::Vec;

pub use self::node::Node;
Expand All @@ -30,6 +30,7 @@ use self::node_validator::NodeValidator;
use self::partition::Partition;
use self::partition_tokenizer::PartitionTokenizer;

use crate::commands::Message;
use crate::errors::{ErrorKind, Result};
use crate::net::Host;
use crate::policy::ClientPolicy;
Expand All @@ -38,6 +39,65 @@ use futures::channel::mpsc;
use futures::channel::mpsc::{Receiver, Sender};
use futures::lock::Mutex;

#[derive(Debug)]
pub struct PartitionForNamespace {
nodes: Vec<(u32, Option<Arc<Node>>)>,
replicas: usize,
}
type PartitionTable = HashMap<String, PartitionForNamespace>;

impl Default for PartitionForNamespace {
fn default() -> Self {
Self { nodes: Vec::default(), replicas: 0 }
}
}

impl PartitionForNamespace {
fn all_replicas(&self, index: usize) -> impl Iterator<Item = Option<Arc<Node>>> + '_ {
(0..self.replicas).map(move |i|self.nodes.get(i * node::PARTITIONS + index).and_then(|(_, item)|item.clone()))
}

async fn get_node(&self, cluster: &Cluster, partition: &Partition<'_>, replica: crate::policy::Replica, last_tried: Weak<Node>) -> Result<Arc<Node>> {
fn get_next_in_sequence<I: Iterator<Item = Arc<Node>>, F: Fn()->I>(get_sequence: F, last_tried: Weak<Node>) -> Option<Arc<Node>> {
if let Some(last_tried) = last_tried.upgrade() {
// If this isn't the first attempt, try the replica immediately after in sequence (that is actually valid)
let mut replicas = get_sequence();
while let Some(replica) = replicas.next() {
if Arc::ptr_eq(&replica, &last_tried) {
if let Some(in_sequence_after) = replicas.next() {
return Some(in_sequence_after)
}

// No more after this? Drop through to try from the beginning.
break;
}
}
}
// If we get here, we're on the first attempt, the last node is already gone, or there are no more nodes in sequence. Just find the next populated option.
get_sequence().next()
}


let node = match replica {
crate::policy::Replica::Master => self.all_replicas(partition.partition_id).next().flatten(),
crate::policy::Replica::Sequence => {
get_next_in_sequence(||self.all_replicas(partition.partition_id).flatten(), last_tried)
},
crate::policy::Replica::PreferRack => {
let rack_ids = cluster.client_policy.rack_ids.as_ref().ok_or_else(||"Attempted to use Replica::PreferRack without configuring racks in client policy".to_string())?;
get_next_in_sequence(||
self
.all_replicas(partition.partition_id)
.flatten()
.filter(|node|node.is_in_rack(partition.namespace, rack_ids)), last_tried.clone())
.or_else(||get_next_in_sequence(||self.all_replicas(partition.partition_id).flatten(), last_tried))
},
};

node.ok_or_else(||format!("Cannot get appropriate node for namespace: {} partition: {}", partition.namespace, partition.partition_id).into())
}
}

// Cluster encapsulates the aerospike cluster nodes and manages
// them.
#[derive(Debug)]
Expand All @@ -51,8 +111,8 @@ pub struct Cluster {
// Active nodes in cluster.
nodes: Arc<RwLock<Vec<Arc<Node>>>>,

// Hints for best node for a partition
partition_write_map: Arc<RwLock<HashMap<String, Vec<Arc<Node>>>>>,
// Which partition contains the key.
partition_write_map: RwLock<PartitionTable>,

// Random node index.
node_index: AtomicIsize,
Expand All @@ -73,7 +133,7 @@ impl Cluster {
aliases: Arc::new(RwLock::new(HashMap::new())),
nodes: Arc::new(RwLock::new(vec![])),

partition_write_map: Arc::new(RwLock::new(HashMap::new())),
partition_write_map: RwLock::new(HashMap::default()),
node_index: AtomicIsize::new(0),

tend_channel: Mutex::new(tx),
Expand Down Expand Up @@ -137,6 +197,7 @@ impl Cluster {
// Refresh all known nodes.
for node in nodes {
let old_gen = node.partition_generation();
let old_rebalance_gen = node.rebalance_generation();
if node.is_active() {
match node.refresh(self.aliases().await).await {
Ok(friends) => {
Expand All @@ -147,7 +208,11 @@ impl Cluster {
}

if old_gen != node.partition_generation() {
self.update_partitions(node.clone()).await?;
self.update_partitions(&node).await?;
}

if old_rebalance_gen != node.rebalance_generation() {
self.update_rack_ids(&node).await?;
}
}
Err(err) => {
Expand Down Expand Up @@ -231,23 +296,14 @@ impl Cluster {
Ok(aliases.contains_key(host))
}

async fn set_partitions(&self, partitions: HashMap<String, Vec<Arc<Node>>>) {
let mut partition_map = self.partition_write_map.write().await;
*partition_map = partitions;
}

fn partitions(&self) -> Arc<RwLock<HashMap<String, Vec<Arc<Node>>>>> {
self.partition_write_map.clone()
}

pub async fn node_partitions(&self, node: &Node, namespace: &str) -> Vec<u16> {
let mut res: Vec<u16> = vec![];
let partitions = self.partitions();
let partitions = partitions.read().await;
let partitions = self.partition_write_map.read().await;

if let Some(node_array) = partitions.get(namespace) {
for (i, tnode) in node_array.iter().enumerate() {
if node == tnode.as_ref() {
for (i, (_, tnode)) in node_array.nodes.iter().enumerate().take(node::PARTITIONS) {
if tnode.as_ref().map_or(false, |tnode|tnode.as_ref() == node) {
res.push(i as u16);
}
}
Expand All @@ -256,15 +312,29 @@ impl Cluster {
res
}

pub async fn update_partitions(&self, node: Arc<Node>) -> Result<()> {
pub async fn update_partitions(&self, node: &Arc<Node>) -> Result<()> {
let mut conn = node.get_connection().await?;
let tokens = PartitionTokenizer::new(&mut conn).await.map_err(|e| {
let tokens = PartitionTokenizer::new(&mut conn, node).await.map_err(|e| {
conn.invalidate();
e
})?;

let nmap = tokens.update_partition(self.partitions(), node).await?;
self.set_partitions(nmap).await;
let mut partitions = self.partition_write_map.write().await;
tokens.update_partition(&mut partitions, node)?;

Ok(())
}

pub async fn update_rack_ids(&self, node: &Arc<Node>) -> Result<()> {
const RACK_IDS: &str = "rack-ids";
let mut conn = node.get_connection().await?;
let info_map = Message::info(&mut conn, &[RACK_IDS, node::REBALANCE_GENERATION]).await?;
if let Some(buf) = info_map.get(RACK_IDS) {
node.parse_rack(buf.as_str())?;
}

// We re-update the rebalance generation right now (in case its changed since it was last polled)
node.update_rebalance_generation(&info_map)?;

Ok(())
}
Expand Down Expand Up @@ -440,10 +510,11 @@ impl Cluster {
}

async fn find_node_in_partition_map(&self, filter: Arc<Node>) -> bool {
let filter = Some(filter);
let partitions = self.partition_write_map.read().await;
(*partitions)
.values()
.any(|map| map.iter().any(|node| *node == filter))
.any(|map| map.nodes.iter().any(|(_, node)| *node == filter))
}

async fn add_nodes(&self, friend_list: &[Arc<Node>]) {
Expand Down Expand Up @@ -492,17 +563,14 @@ impl Cluster {
*nodes = new_nodes;
}

pub async fn get_node(&self, partition: &Partition<'_>) -> Result<Arc<Node>> {
let partitions = self.partitions();
let partitions = partitions.read().await;

if let Some(node_array) = partitions.get(partition.namespace) {
if let Some(node) = node_array.get(partition.partition_id) {
return Ok(node.clone());
}
}
pub async fn get_node(&self, partition: &Partition<'_>, replica: crate::policy::Replica, last_tried: Weak<Node>) -> Result<Arc<Node>> {
let partitions = self.partition_write_map.read().await;

self.get_random_node().await
let namespace = partitions
.get(partition.namespace)
.ok_or_else(||format!("Cannot get appropriate node for namespace: {}", partition.namespace))?;

namespace.get_node(self, partition, replica, last_tried).await
}

pub async fn get_random_node(&self) -> Result<Arc<Node>> {
Expand Down
Loading