Skip to content

This changes destreaming to use serde. Also changed batch read to be … #5

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ on:
pull_request:
branches:
- master
workflow_dispatch:

env:
CARGO_TERM_COLOR: always
Expand Down
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
/target
/Cargo.lock
*.todo

aerospike-rust-client.sublime-project
aerospike-rust-client.sublime-workspace
6 changes: 4 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,11 @@ harness = false
members = ["tools/benchmark", "aerospike-core", "aerospike-rt", "aerospike-sync", "aerospike-macro"]

[dev-dependencies]
env_logger = "0.9"
log = "0.4"
env_logger = "0.10.0"
hex = "0.4"
bencher = "0.1"
criterion = { version = "0.5.1", features = ["async_tokio", "async_futures", "async"]}
serde = "1.0"
serde_json = "1.0"
rand = "0.8"
lazy_static = "1.4"
Expand Down
6 changes: 4 additions & 2 deletions aerospike-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ rand = "0.8"
lazy_static = "1.4"
error-chain = { version = "0.12.4", default-features = false }
pwhash = "1.0"
serde = { version = "1.0", features = ["derive"], optional = true }
serde = { version = "1.0", features = ["derive"] }
serde_bytes = "0.11"
aerospike-rt = {path = "../aerospike-rt"}
futures = {version = "0.3.16" }
async-trait = "0.1.51"
num = "0.4.0"

[features]
serialization = ["serde"]
serialization = []
rt-tokio = ["aerospike-rt/rt-tokio"]
rt-async-std = ["aerospike-rt/rt-async-std"]

Expand Down
100 changes: 74 additions & 26 deletions aerospike-core/src/batch/batch_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,49 +36,97 @@ impl BatchExecutor {
}


pub async fn execute_batch_read(
pub async fn execute_batch_read<T: serde::de::DeserializeOwned + Send + 'static>(
&self,
policy: &BatchPolicy,
batch_reads: Vec<BatchRead>,
) -> Result<Vec<BatchRead>> {
let batch_nodes = self.get_batch_nodes(&batch_reads, policy.replica)?;
let mut jobs = Vec::<BatchReadCommand>::new();
for (node, node_jobs) in batch_nodes {
for node_chunk in node_jobs.chunks(MAX_BATCH_REQUEST_SIZE) {
jobs.push( BatchReadCommand::new(policy, node.clone(), node_chunk.to_vec()) );
batch_reads: Vec<BatchRead<T>>,
) -> Result<Vec<BatchRead<T>>> {
let total = batch_reads.len();
let jobs = self.get_batch_nodes(policy, batch_reads)?;
let reads = self.execute_batch_jobs::<T>(jobs, policy.concurrency).await?;

let mut as_iter = reads.into_iter();
if let Some(BatchReadCommand { mut batch_reads, mut original_indexes, .. }) = as_iter.next() {
// Reserve enough to make the first element the return value
batch_reads.reserve_exact(total - batch_reads.len());
original_indexes.reserve_exact(total - original_indexes.len());
// Shove everything into the same list
for another_job in as_iter {
batch_reads.extend(another_job.batch_reads);
original_indexes.extend(another_job.original_indexes);
}

// Put records back where it belongs... this is 0(n) because everything is swapped into its correct position
for i in 0..batch_reads.len() {
while original_indexes[i] != i {
let to = original_indexes[i];
batch_reads.swap(i, to);
original_indexes.swap(i, to);
}
}
Ok(batch_reads)
} else {
Ok(Default::default())
}
let reads = self.execute_batch_jobs(jobs, policy.concurrency).await?;
let mut all_results: Vec<_> = reads.into_iter().flat_map(|cmd|cmd.batch_reads).collect();
all_results.sort_by_key(|(_, i)|*i);
Ok(all_results.into_iter().map(|(b, _)|b).collect())
}

async fn execute_batch_jobs(
async fn execute_batch_jobs<T: serde::de::DeserializeOwned + Send + 'static>(
&self,
jobs: Vec<BatchReadCommand>,
jobs: Vec<BatchReadCommand<T>>,
concurrency: Concurrency,
) -> Result<Vec<BatchReadCommand>> {
) -> Result<Vec<BatchReadCommand<T>>> {
let handles = jobs.into_iter().map(|job|job.execute(self.cluster.clone()));
match concurrency {
Concurrency::Sequential => futures::future::join_all(handles).await.into_iter().collect(),
Concurrency::Parallel => futures::future::join_all(handles.map(aerospike_rt::spawn)).await.into_iter().map(|value|value.map_err(|e|e.to_string())?).collect(),
}
}

fn get_batch_nodes(
fn get_batch_nodes<'l, T: serde::de::DeserializeOwned + Send>(
&self,
batch_reads: &[BatchRead],
replica: crate::policy::Replica,
) -> Result<HashMap<Arc<Node>, Vec<(BatchRead, usize)>>> {
let mut map = HashMap::new();
for (index, batch_read) in batch_reads.iter().enumerate() {
let node = self.node_for_key(&batch_read.key, replica)?;
map.entry(node)
.or_insert_with(Vec::new)
.push((batch_read.clone(), index));
policy: &BatchPolicy,
batch_reads: Vec<BatchRead<T>>,
) -> Result<Vec<BatchReadCommand<T>>> {
let mut map: HashMap<Arc<Node>, (Vec<BatchRead<T>>, Vec<usize>)> = HashMap::new();
let mut vec = Vec::new();
let choices = batch_reads.first().map(|read|self.cluster.n_nodes_for_policy(&read.key.namespace, policy.replica)).unwrap_or_default();
vec.reserve(choices);
let estimate = batch_reads.len() / (choices.max(2) - 1);

for (index, batch_read) in batch_reads.into_iter().enumerate() {
let node = self.node_for_key(&batch_read.key, policy.replica)?;
let (reads, indexes) = map.entry(node)
.or_insert_with(||{
let mut reads = Vec::new();
let mut indexes = Vec::new();
if estimate > MAX_BATCH_REQUEST_SIZE {
reads.reserve_exact(MAX_BATCH_REQUEST_SIZE);
indexes.reserve_exact(MAX_BATCH_REQUEST_SIZE);
} else {
reads.reserve(estimate);
indexes.reserve(estimate);
}
(reads, indexes)
});

// Enough reads, make a new one.
if reads.len() >= MAX_BATCH_REQUEST_SIZE {
// To avoid copying node above, we just re-do node from key when it's needed.
let node = self.node_for_key(&batch_read.key, policy.replica)?;
vec.push(BatchReadCommand::new(policy, node, std::mem::take(reads), std::mem::take(indexes)));
// If we're blowing out buffers, we'll probably do it again.
reads.reserve_exact(MAX_BATCH_REQUEST_SIZE);
indexes.reserve_exact(MAX_BATCH_REQUEST_SIZE);
}
reads.push(batch_read);
indexes.push(index);
}

vec.reserve_exact(map.len());
for (node, (reads, indexes)) in map {
vec.push(BatchReadCommand::new(policy, node, reads, indexes));
}
Ok(map)
Ok(vec)
}

fn node_for_key(&self, key: &Key, replica: crate::policy::Replica) -> Result<Arc<Node>> {
Expand Down
10 changes: 5 additions & 5 deletions aerospike-core/src/batch/batch_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,18 @@ use serde::Serialize;
/// Key and bin names used in batch read commands where variable bins are needed for each key.
#[cfg_attr(feature = "serialization", derive(Serialize))]
#[derive(Debug, Clone)]
pub struct BatchRead {
pub struct BatchRead<T: serde::de::DeserializeOwned> {
/// Key.
pub key: Key,

/// Bins to retrieve for this key.
pub bins: Bins,

/// Will contain the record after the batch read operation.
pub record: Option<Record>,
pub record: Option<Record<T>>,
}

impl BatchRead {
impl<T: serde::de::DeserializeOwned> BatchRead<T> {
/// Create a new `BatchRead` instance for the given key and bin selector.
pub const fn new(key: Key, bins: Bins) -> Self {
BatchRead {
Expand All @@ -44,11 +44,11 @@ impl BatchRead {
}

#[doc(hidden)]
pub fn match_header(&self, other: &BatchRead, match_set: bool) -> bool {
pub fn match_header(&self, other: &BatchRead<T>, match_set: bool) -> bool {
let key = &self.key;
let other_key = &other.key;
(key.namespace == other_key.namespace)
&& (match_set && (key.set_name == other_key.set_name))
&& (self.bins == other.bins)
}
}
}
57 changes: 11 additions & 46 deletions aerospike-core/src/bin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,22 @@ use std::convert::From;

/// Container object for a record bin, comprising a name and a value.
#[derive(Clone)]
pub struct Bin<'a> {
pub struct Bin {
/// Bin name
pub name: &'a str,
pub name: String,

/// Bin value
pub value: Value,
}

impl<'a> Bin<'a> {
impl Bin {
/// Construct a new bin given a name and a value.
pub const fn new(name: &'a str, val: Value) -> Self {
pub const fn new(name: String, val: Value) -> Self {
Bin { name, value: val }
}
}

impl<'a> AsRef<Bin<'a>> for Bin<'a> {
impl AsRef<Bin> for Bin {
fn as_ref(&self) -> &Self {
self
}
Expand All @@ -45,10 +45,10 @@ impl<'a> AsRef<Bin<'a>> for Bin<'a> {
#[macro_export]
macro_rules! as_bin {
($bin_name:expr, None) => {{
$crate::Bin::new($bin_name, $crate::Value::Nil)
$crate::Bin::new($bin_name.into(), $crate::Value::Nil)
}};
($bin_name:expr, $val:expr) => {{
$crate::Bin::new($bin_name, $crate::Value::from($val))
$crate::Bin::new($bin_name.into(), $crate::Value::from($val))
}};
}

Expand Down Expand Up @@ -78,50 +78,15 @@ impl Bins {
}
}

impl<'a> From<&'a [&'a str]> for Bins {
fn from(bins: &'a [&'a str]) -> Self {
impl From<&[&str]> for Bins {
fn from(bins: &[&str]) -> Self {
let bins = bins.iter().copied().map(String::from).collect();
Bins::Some(bins)
}
}

impl<'a> From<[&'a str; 1]> for Bins {
fn from(bins: [&'a str; 1]) -> Self {
let bins = bins.iter().copied().map(String::from).collect();
Bins::Some(bins)
}
}

impl<'a> From<[&'a str; 2]> for Bins {
fn from(bins: [&'a str; 2]) -> Self {
let bins = bins.iter().copied().map(String::from).collect();
Bins::Some(bins)
}
}

impl<'a> From<[&'a str; 3]> for Bins {
fn from(bins: [&'a str; 3]) -> Self {
let bins = bins.iter().copied().map(String::from).collect();
Bins::Some(bins)
}
}

impl<'a> From<[&'a str; 4]> for Bins {
fn from(bins: [&'a str; 4]) -> Self {
let bins = bins.iter().copied().map(String::from).collect();
Bins::Some(bins)
}
}

impl<'a> From<[&'a str; 5]> for Bins {
fn from(bins: [&'a str; 5]) -> Self {
let bins = bins.iter().copied().map(String::from).collect();
Bins::Some(bins)
}
}

impl<'a> From<[&'a str; 6]> for Bins {
fn from(bins: [&'a str; 6]) -> Self {
impl<const COUNT: usize> From<[&str; COUNT]> for Bins {
fn from(bins: [&str; COUNT]) -> Self {
let bins = bins.iter().copied().map(String::from).collect();
Bins::Some(bins)
}
Expand Down
Loading
Loading