Skip to content

Commit 23f254f

Browse files
authored
feat: impl NgramIndex for FuseTable, improve like query performance (#17852)
* feat: impl `NgramIndex` for `FuseTable`, improve like query performance Signed-off-by: Kould <[email protected]> * test: add explain test for ngram index Signed-off-by: Kould <[email protected]> * chore: fix ci fail Signed-off-by: Kould <[email protected]> * chore: fix ci fail Signed-off-by: Kould <[email protected]> * chore: add ngram index options check Signed-off-by: Kould <[email protected]> * chore: Logic for distinguishing partial ngrams from bloom indexes Signed-off-by: Kould <[email protected]> * chore: `FilterImpl` uses `Xor8Filter` instead Signed-off-by: Kould <[email protected]> * chore: fix filter size on logic test Signed-off-by: Kould <[email protected]> * refactor: impl new BloomFilter Signed-off-by: Kould <[email protected]> * chore: codefmt Signed-off-by: Kould <[email protected]> * test: add unit test for BloomFilter Signed-off-by: Kould <[email protected]> * chore: codefmt Signed-off-by: Kould <[email protected]> * chore: `FilterImpl::to_bytes` determine the Filter type by the first byte Signed-off-by: Kould <[email protected]> --------- Signed-off-by: Kould <[email protected]>
1 parent 3b729e5 commit 23f254f

File tree

37 files changed

+1742
-316
lines changed

37 files changed

+1742
-316
lines changed

src/query/ee/tests/it/inverted_index/pruning.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ async fn apply_block_pruning(
7070
let segment_locs = table_snapshot.segments.clone();
7171
let segment_locs = create_segment_location_vector(segment_locs, None);
7272

73-
FusePruner::create(&ctx, dal, schema, push_down, bloom_index_cols, None)?
73+
FusePruner::create(&ctx, dal, schema, push_down, bloom_index_cols, vec![], None)?
7474
.read_pruning(segment_locs)
7575
.await
7676
}

src/query/functions/src/scalars/hash.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -250,13 +250,13 @@ where for<'a> T::ScalarRef<'a>: DFHash {
250250
);
251251
}
252252

253-
struct CityHasher64 {
253+
pub struct CityHasher64 {
254254
seed: u64,
255255
value: u64,
256256
}
257257

258258
impl CityHasher64 {
259-
fn with_seed(s: u64) -> Self {
259+
pub fn with_seed(s: u64) -> Self {
260260
Self { seed: s, value: 0 }
261261
}
262262
}

src/query/functions/src/scalars/mod.rs

+2
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ mod vector;
4848
pub use comparison::ALL_COMP_FUNC_NAMES;
4949
use databend_functions_scalar_arithmetic::arithmetic;
5050
use databend_functions_scalar_numeric_basic_arithmetic::register_numeric_basic_arithmetic;
51+
pub use hash::CityHasher64;
52+
pub use hash::DFHash;
5153
pub use string::ALL_STRING_FUNC_NAMES;
5254

5355
pub fn register(registry: &mut FunctionRegistry) {

src/query/service/src/interpreters/interpreter_table_show_create.rs

+7-1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use databend_common_expression::ComputedExpr;
2727
use databend_common_expression::DataBlock;
2828
use databend_common_expression::Scalar;
2929
use databend_common_expression::Value;
30+
use databend_common_meta_app::schema::TableIndexType;
3031
use databend_common_sql::plans::ShowCreateTablePlan;
3132
use databend_common_storages_fuse::FUSE_OPT_KEY_ATTACH_COLUMN_IDS;
3233
use databend_common_storages_stream::stream_table::StreamTable;
@@ -242,9 +243,14 @@ impl ShowCreateTableInterpreter {
242243
let option = format!("{} = '{}'", key, value);
243244
options.push(option);
244245
}
246+
let index_type = match index_field.index_type {
247+
TableIndexType::Inverted => "INVERTED",
248+
TableIndexType::Ngram => "NGRAM",
249+
};
245250
let mut index_str = format!(
246-
" {} INVERTED INDEX {} ({})",
251+
" {} {} INDEX {} ({})",
247252
sync,
253+
index_type,
248254
display_ident(
249255
&index_field.name,
250256
force_quoted_ident,

src/query/service/src/test_kits/block_writer.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,8 @@ impl<'a> BlockWriter<'a> {
130130
let bloom_index_cols = BloomIndexColumns::All;
131131
let bloom_columns_map =
132132
bloom_index_cols.bloom_index_fields(schema.clone(), BloomIndex::supported_type)?;
133-
let mut builder = BloomIndexBuilder::create(FunctionContext::default(), bloom_columns_map);
133+
let mut builder =
134+
BloomIndexBuilder::create(FunctionContext::default(), bloom_columns_map, &[])?;
134135
builder.add_block(block)?;
135136
let maybe_bloom_index = builder.finalize()?;
136137
if let Some(bloom_index) = maybe_bloom_index {

src/query/service/tests/it/storages/fuse/pruning.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ async fn apply_block_pruning(
6363
let ctx: Arc<dyn TableContext> = ctx;
6464
let segment_locs = table_snapshot.segments.clone();
6565
let segment_locs = create_segment_location_vector(segment_locs, None);
66-
FusePruner::create(&ctx, op, schema, push_down, bloom_index_cols, None)?
66+
FusePruner::create(&ctx, op, schema, push_down, bloom_index_cols, vec![], None)?
6767
.read_pruning(segment_locs)
6868
.await
6969
.map(|v| v.into_iter().map(|(_, v)| v).collect())

src/query/service/tests/it/storages/fuse/pruning_column_oriented_segment.rs

+1
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ async fn apply_snapshot_pruning(
8080
schema.clone(),
8181
push_down,
8282
bloom_index_cols,
83+
vec![],
8384
None,
8485
)?);
8586

src/query/service/tests/it/storages/fuse/pruning_pipeline.rs

+1
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ async fn apply_snapshot_pruning(
8080
schema,
8181
push_down,
8282
bloom_index_cols,
83+
vec![],
8384
None,
8485
)?);
8586

src/query/sql/src/planner/binder/ddl/index.rs

+43-4
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,9 @@ use crate::MetadataRef;
6565
use crate::RefreshAggregatingIndexRewriter;
6666
use crate::SUPPORTED_AGGREGATING_INDEX_FUNCTIONS;
6767

68+
const MAXIMUM_BLOOM_SIZE: u64 = 10 * 1024 * 1024;
69+
const MINIMUM_BLOOM_SIZE: u64 = 512;
70+
6871
// valid values for inverted index option tokenizer
6972
static INDEX_TOKENIZER_VALUES: LazyLock<HashSet<&'static str>> = LazyLock::new(|| {
7073
let mut r = HashSet::new();
@@ -580,13 +583,49 @@ impl Binder {
580583
let value = val.to_lowercase();
581584
match key.as_str() {
582585
"gram_size" => {
583-
if value.parse::<u32>().is_err() {
584-
return Err(ErrorCode::IndexOptionInvalid(format!(
585-
"value `{value}` is not a legal number",
586-
)));
586+
match value.parse::<usize>() {
587+
Ok(num) => {
588+
if num == 0 {
589+
return Err(ErrorCode::IndexOptionInvalid(
590+
"`gram_size` cannot be 0",
591+
));
592+
}
593+
}
594+
Err(_) => {
595+
return Err(ErrorCode::IndexOptionInvalid(format!(
596+
"value `{value}` is not a legal number",
597+
)));
598+
}
587599
}
588600
options.insert("gram_size".to_string(), value);
589601
}
602+
"bloom_size" => {
603+
match value.parse::<u64>() {
604+
Ok(num) => {
605+
if num == 0 {
606+
return Err(ErrorCode::IndexOptionInvalid(
607+
"`bloom_size` cannot be 0",
608+
));
609+
}
610+
if num < MINIMUM_BLOOM_SIZE {
611+
return Err(ErrorCode::IndexOptionInvalid(format!(
612+
"bloom_size: `{num}` is too small (bloom_size is minimum: {MINIMUM_BLOOM_SIZE})",
613+
)));
614+
}
615+
if num > MAXIMUM_BLOOM_SIZE {
616+
return Err(ErrorCode::IndexOptionInvalid(format!(
617+
"bloom_size: `{num}` is too large (bloom_size is maximum: {MAXIMUM_BLOOM_SIZE})",
618+
)));
619+
}
620+
}
621+
Err(_) => {
622+
return Err(ErrorCode::IndexOptionInvalid(format!(
623+
"value `{value}` is not a legal number",
624+
)));
625+
}
626+
}
627+
options.insert("bloom_size".to_string(), value);
628+
}
590629
_ => {
591630
return Err(ErrorCode::IndexOptionInvalid(format!(
592631
"index option `{key}` is invalid key for create ngram index statement",

src/query/storages/common/cache/src/cache_items.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ pub use databend_common_catalog::plan::PartStatistics;
1717
pub use databend_common_catalog::plan::Partitions;
1818
pub use databend_common_catalog::table::Table;
1919
use databend_common_exception::ErrorCode;
20-
pub use databend_storages_common_index::filters::Xor8Filter;
20+
pub use databend_storages_common_index::filters::FilterImpl;
2121
pub use databend_storages_common_index::BloomIndexMeta;
2222
pub use databend_storages_common_index::InvertedIndexFile;
2323
pub use databend_storages_common_index::InvertedIndexMeta;

src/query/storages/common/cache/src/caches.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ pub type TableSnapshotCache = InMemoryLruCache<TableSnapshot>;
4545
pub type TableSnapshotStatisticCache = InMemoryLruCache<TableSnapshotStatistics>;
4646
/// In memory object cache of bloom filter.
4747
/// For each indexed data block, the bloom xor8 filter of column is cached individually
48-
pub type BloomIndexFilterCache = HybridCache<Xor8Filter>;
48+
pub type BloomIndexFilterCache = HybridCache<FilterImpl>;
4949
/// In memory object cache of parquet FileMetaData of bloom index data
5050
pub type BloomIndexMetaCache = HybridCache<BloomIndexMeta>;
5151

@@ -123,7 +123,7 @@ impl CachedObject<(PartStatistics, Partitions)> for (PartStatistics, Partitions)
123123
}
124124
}
125125

126-
impl CachedObject<Xor8Filter> for Xor8Filter {
126+
impl CachedObject<FilterImpl> for FilterImpl {
127127
type Cache = BloomIndexFilterCache;
128128
fn cache() -> Option<Self::Cache> {
129129
CacheManager::instance().get_bloom_index_filter_cache()
@@ -235,10 +235,10 @@ impl From<TableSnapshotStatistics> for CacheValue<TableSnapshotStatistics> {
235235
}
236236
}
237237

238-
impl From<Xor8Filter> for CacheValue<Xor8Filter> {
239-
fn from(value: Xor8Filter) -> Self {
238+
impl From<FilterImpl> for CacheValue<FilterImpl> {
239+
fn from(value: FilterImpl) -> Self {
240240
CacheValue {
241-
mem_bytes: std::mem::size_of::<Xor8Filter>() + value.filter.finger_prints.len(),
241+
mem_bytes: value.mem_bytes(),
242242
inner: Arc::new(value),
243243
}
244244
}

0 commit comments

Comments
 (0)