Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions benches/agg_bench.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use binggan::plugins::PeakMemAllocPlugin;
use binggan::{black_box, InputGroup, PeakMemAlloc, INSTRUMENTED_SYSTEM};
use common::DateTime;
use rand::prelude::SliceRandom;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
Expand Down Expand Up @@ -61,6 +62,12 @@ fn bench_agg(mut group: InputGroup<Index>) {
register!(group, terms_many_with_avg_sub_agg);
register!(group, terms_many_json_mixed_type_with_avg_sub_agg);

register!(group, composite_term_many_page_1000);
register!(group, composite_term_many_page_1000_with_avg_sub_agg);
register!(group, composite_term_few);
register!(group, composite_histogram);
register!(group, composite_histogram_calendar);

register!(group, cardinality_agg);
register!(group, terms_few_with_cardinality_agg);

Expand Down Expand Up @@ -225,6 +232,75 @@ fn terms_many_json_mixed_type_with_avg_sub_agg(index: &Index) {
});
execute_agg(index, agg_req);
}
fn composite_term_few(index: &Index) {
let agg_req = json!({
"my_ctf": {
"composite": {
"sources": [
{ "text_few_terms": { "terms": { "field": "text_few_terms" } } }
],
"size": 1000
}
},
});
execute_agg(index, agg_req);
}
fn composite_term_many_page_1000(index: &Index) {
let agg_req = json!({
"my_ctmp1000": {
"composite": {
"sources": [
{ "text_many_terms": { "terms": { "field": "text_many_terms" } } }
],
"size": 1000
}
},
});
execute_agg(index, agg_req);
}
fn composite_term_many_page_1000_with_avg_sub_agg(index: &Index) {
let agg_req = json!({
"my_ctmp1000wasa": {
"composite": {
"sources": [
{ "text_many_terms": { "terms": { "field": "text_many_terms" } } }
],
"size": 1000,

},
"aggs": {
"average_f64": { "avg": { "field": "score_f64" } }
}
},
});
execute_agg(index, agg_req);
}
fn composite_histogram(index: &Index) {
let agg_req = json!({
"my_ch": {
"composite": {
"sources": [
{ "f64_histogram": { "histogram": { "field": "score_f64", "interval": 1 } } }
],
"size": 1000
}
},
});
execute_agg(index, agg_req);
}
fn composite_histogram_calendar(index: &Index) {
let agg_req = json!({
"my_chc": {
"composite": {
"sources": [
{ "time_histogram": { "date_histogram": { "field": "timestamp", "calendar_interval": "month" } } }
],
"size": 1000
}
},
});
execute_agg(index, agg_req);
}

fn execute_agg(index: &Index, agg_req: serde_json::Value) {
let agg_req: Aggregations = serde_json::from_value(agg_req).unwrap();
Expand Down Expand Up @@ -404,6 +480,7 @@ fn get_test_index_bench(cardinality: Cardinality) -> tantivy::Result<Index> {
let score_field = schema_builder.add_u64_field("score", score_fieldtype.clone());
let score_field_f64 = schema_builder.add_f64_field("score_f64", score_fieldtype.clone());
let score_field_i64 = schema_builder.add_i64_field("score_i64", score_fieldtype);
let date_field = schema_builder.add_date_field("timestamp", FAST);
let index = Index::create_from_tempdir(schema_builder.build())?;
let few_terms_data = ["INFO", "ERROR", "WARN", "DEBUG"];

Expand Down Expand Up @@ -459,6 +536,7 @@ fn get_test_index_bench(cardinality: Cardinality) -> tantivy::Result<Index> {
score_field => val as u64,
score_field_f64 => lg_norm.sample(&mut rng),
score_field_i64 => val as i64,
date_field => DateTime::from_timestamp_millis((val * 1_000_000.) as i64),
))?;
if cardinality == Cardinality::OptionalSparse {
for _ in 0..20 {
Expand Down
2 changes: 1 addition & 1 deletion columnar/src/column_values/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub use u64_based::{
serialize_and_load_u64_based_column_values, serialize_u64_based_column_values,
};
pub use u128_based::{
CompactSpaceU64Accessor, open_u128_as_compact_u64, open_u128_mapped,
CompactHit, CompactSpaceU64Accessor, open_u128_as_compact_u64, open_u128_mapped,
serialize_column_values_u128,
};
pub use vec_column::VecColumn;
Expand Down
73 changes: 73 additions & 0 deletions columnar/src/column_values/u128_based/compact_space/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,19 @@ impl BinarySerializable for IPCodecParams {
}
}

/// Represents the result of looking up a u128 value in the compact space.
///
/// If a value is outside the compact space, the next compact value is returned.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CompactHit {
/// The value exists in the compact space
Exact(u32),
/// The value does not exist in the compact space, but the next higher value does
Next(u32),
/// The value is greater than the maximum compact value
AfterLast,
}

/// Exposes the compact space compressed values as u64.
///
/// This allows faster access to the values, as u64 is faster to work with than u128.
Expand All @@ -309,6 +322,11 @@ impl CompactSpaceU64Accessor {
pub fn compact_to_u128(&self, compact: u32) -> u128 {
self.0.compact_to_u128(compact)
}

/// Finds the next compact space value for a given u128 value.
pub fn u128_to_next_compact(&self, value: u128) -> CompactHit {
self.0.u128_to_next_compact(value)
}
}

impl ColumnValues<u64> for CompactSpaceU64Accessor {
Expand Down Expand Up @@ -430,6 +448,26 @@ impl CompactSpaceDecompressor {
Ok(decompressor)
}

/// Finds the next compact space value for a given u128 value
pub fn u128_to_next_compact(&self, value: u128) -> CompactHit {
// Try to convert to compact space
match self.u128_to_compact(value) {
// Value is in compact space, return its compact representation
Ok(compact) => CompactHit::Exact(compact),
// Value is not in compact space
Err(pos) => {
if pos >= self.params.compact_space.ranges_mapping.len() {
// Value is beyond all ranges, no next value exists
CompactHit::AfterLast
} else {
// Get the next range and return its start compact value
let next_range = &self.params.compact_space.ranges_mapping[pos];
CompactHit::Next(next_range.compact_start)
}
}
}
}

/// Converting to compact space for the decompressor is more complex, since we may get values
/// which are outside the compact space. e.g. if we map
/// 1000 => 5
Expand Down Expand Up @@ -823,6 +861,41 @@ mod tests {
let _data = test_aux_vals(vals);
}

#[test]
fn test_u128_to_next_compact() {
let vals = &[100u128, 200u128, 1_000_000_000u128, 1_000_000_100u128];
let mut data = test_aux_vals(vals);

let _header = U128Header::deserialize(&mut data);
let decomp = CompactSpaceDecompressor::open(data).unwrap();

// Test value that's already in a range
let compact_100 = decomp.u128_to_compact(100).unwrap();
assert_eq!(
decomp.u128_to_next_compact(100),
CompactHit::Exact(compact_100)
);

// Test value between two ranges
let compact_million = decomp.u128_to_compact(1_000_000_000).unwrap();
assert_eq!(
decomp.u128_to_next_compact(250),
CompactHit::Next(compact_million)
);

// Test value before the first range
assert_eq!(
decomp.u128_to_next_compact(50),
CompactHit::Next(compact_100)
);

// Test value after the last range
assert_eq!(
decomp.u128_to_next_compact(10_000_000_000),
CompactHit::AfterLast
);
}

use proptest::prelude::*;

fn num_strategy() -> impl Strategy<Value = u128> {
Expand Down
2 changes: 1 addition & 1 deletion columnar/src/column_values/u128_based/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ mod compact_space;

use common::{BinarySerializable, OwnedBytes, VInt};
pub use compact_space::{
CompactSpaceCompressor, CompactSpaceDecompressor, CompactSpaceU64Accessor,
CompactHit, CompactSpaceCompressor, CompactSpaceDecompressor, CompactSpaceU64Accessor,
};

use crate::column_values::monotonic_map_column;
Expand Down
2 changes: 1 addition & 1 deletion columnar/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub struct RowAddr {
pub row_id: RowId,
}

pub use sstable::Dictionary;
pub use sstable::{Dictionary, TermOrdHit};
pub type Streamer<'a> = sstable::Streamer<'a, VoidSSTable>;

pub use common::DateTime;
Expand Down
16 changes: 13 additions & 3 deletions src/aggregation/accessor_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,21 @@ pub(crate) fn get_all_ff_reader_or_empty(
allowed_column_types: Option<&[ColumnType]>,
fallback_type: ColumnType,
) -> crate::Result<Vec<(columnar::Column<u64>, ColumnType)>> {
let ff_fields = reader.fast_fields();
let mut ff_field_with_type =
ff_fields.u64_lenient_for_type_all(allowed_column_types, field_name)?;
let mut ff_field_with_type = get_all_ff_readers(reader, field_name, allowed_column_types)?;
if ff_field_with_type.is_empty() {
ff_field_with_type.push((Column::build_empty_column(reader.num_docs()), fallback_type));
}
Ok(ff_field_with_type)
}

/// Get all fast field reader.
pub(crate) fn get_all_ff_readers(
reader: &SegmentReader,
field_name: &str,
allowed_column_types: Option<&[ColumnType]>,
) -> crate::Result<Vec<(columnar::Column<u64>, ColumnType)>> {
let ff_fields = reader.fast_fields();
let ff_field_with_type =
ff_fields.u64_lenient_for_type_all(allowed_column_types, field_name)?;
Ok(ff_field_with_type)
}
Loading
Loading