Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions python/python/tests/test_scalar_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -2108,10 +2108,9 @@ def test_label_list_index_null_list_match(tmp_path: Path):
"array_has_any(labels, ['foo'])",
"array_has_all(labels, ['foo'])",
"array_contains(labels, 'foo')",
# TODO(issue #5904): Enable after fixing NOT filters with whole-list NULLs
# "NOT array_has_any(labels, ['foo'])",
# "NOT array_has_all(labels, ['foo'])",
# "NOT array_contains(labels, 'foo')",
"NOT array_has_any(labels, ['foo'])",
"NOT array_has_all(labels, ['foo'])",
"NOT array_contains(labels, 'foo')",
]
expected = {
f: dataset.to_table(filter=f).column("labels").to_pylist() for f in filters
Expand Down
173 changes: 163 additions & 10 deletions rust/lance-index/src/scalar/label_list.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

use std::{any::Any, collections::HashMap, fmt::Debug, pin::Pin, sync::Arc};
use std::{
any::Any,
collections::HashMap,
fmt::Debug,
pin::Pin,
sync::{Arc, LazyLock, Mutex},
};

use arrow::array::AsArray;
use arrow_array::{Array, RecordBatch, UInt64Array};
use arrow_array::{Array, BinaryArray, RecordBatch, UInt64Array};
use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef};
use async_trait::async_trait;
use datafusion::execution::RecordBatchStream;
Expand All @@ -13,8 +19,9 @@ use datafusion_common::ScalarValue;
use deepsize::DeepSizeOf;
use futures::{stream::BoxStream, StreamExt, TryStream, TryStreamExt};
use lance_core::cache::LanceCache;
use lance_core::utils::mask::{NullableRowAddrSet, RowAddrTreeMap};
use lance_core::{Error, Result};
use lance_core::error::LanceOptionExt;
use lance_core::utils::mask::{NullableRowAddrSet, RowAddrTreeMap, RowSetOps};
use lance_core::{Error, Result, ROW_ID};
use roaring::RoaringBitmap;
use snafu::location;
use tracing::instrument;
Expand All @@ -34,7 +41,17 @@ use crate::scalar::{CreatedIndex, UpdateCriteria};
use crate::{Index, IndexType};

pub const BITMAP_LOOKUP_NAME: &str = "bitmap_page_lookup.lance";
const LABEL_LIST_INDEX_VERSION: u32 = 0;
pub const LABEL_LIST_NULLS_NAME: &str = "label_list_nulls.lance";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are just writing a single buffer, why don't we instead use a global buffer in one of the existing index files? That will cut down number of files needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review and the suggestion! I’ll look into the global buffer idea and get back to you.

pub const LABEL_LIST_NULLS_MIN_VERSION: i32 = 1;
const LABEL_LIST_INDEX_VERSION: u32 = 1;

static LABEL_LIST_NULLS_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
Arc::new(Schema::new(vec![Field::new(
"nulls",
DataType::Binary,
false,
)]))
});

#[async_trait]
trait LabelListSubIndex: ScalarIndex + DeepSizeOf {
Expand Down Expand Up @@ -67,21 +84,26 @@ impl<T: ScalarIndex + DeepSizeOf> LabelListSubIndex for T {}
#[derive(Clone, Debug, DeepSizeOf)]
pub struct LabelListIndex {
values_index: Arc<dyn LabelListSubIndex>,
list_nulls: Arc<RowAddrTreeMap>,
}

impl LabelListIndex {
fn new(values_index: Arc<dyn LabelListSubIndex>) -> Self {
Self { values_index }
fn new(values_index: Arc<dyn LabelListSubIndex>, list_nulls: Arc<RowAddrTreeMap>) -> Self {
Self {
values_index,
list_nulls,
}
}

async fn load(
store: Arc<dyn IndexStore>,
frag_reuse_index: Option<Arc<FragReuseIndex>>,
index_cache: &LanceCache,
) -> Result<Arc<Self>> {
BitmapIndex::load(store, frag_reuse_index, index_cache)
.await
.map(|index| Arc::new(Self::new(index)))
let values_index =
BitmapIndex::load(store.clone(), frag_reuse_index.clone(), index_cache).await?;
let list_nulls = read_list_nulls(store, frag_reuse_index).await?;
Ok(Arc::new(Self::new(values_index, Arc::new(list_nulls))))
}
}

Expand Down Expand Up @@ -185,6 +207,13 @@ impl ScalarIndex for LabelListIndex {
self.set_union(values_results, labels.len() == 1).await
}
}?;
let row_ids = if self.list_nulls.as_ref().is_empty() {
row_ids
} else {
let mut nulls = row_ids.null_rows().clone();
nulls |= self.list_nulls.as_ref();
row_ids.with_nulls(nulls)
};
Ok(SearchResult::Exact(row_ids))
}

Expand All @@ -199,6 +228,15 @@ impl ScalarIndex for LabelListIndex {
dest_store: &dyn IndexStore,
) -> Result<CreatedIndex> {
self.values_index.remap(mapping, dest_store).await?;
let remapped_nulls =
RowAddrTreeMap::from_iter(self.list_nulls.row_addrs().unwrap().filter_map(|addr| {
let addr_as_u64 = u64::from(addr);
mapping
.get(&addr_as_u64)
.copied()
.unwrap_or(Some(addr_as_u64))
}));
write_list_nulls(dest_store, remapped_nulls).await?;

Ok(CreatedIndex {
index_details: prost_types::Any::from_msg(&pbold::LabelListIndexDetails::default())
Expand All @@ -214,9 +252,17 @@ impl ScalarIndex for LabelListIndex {
dest_store: &dyn IndexStore,
valid_old_fragments: Option<&RoaringBitmap>,
) -> Result<CreatedIndex> {
let list_nulls = Arc::new(Mutex::new(RowAddrTreeMap::new()));
let new_data = track_list_nulls(new_data, list_nulls.clone());
self.values_index
.update(unnest_chunks(new_data)?, dest_store, valid_old_fragments)
.await?;
let mut merged_nulls = (*self.list_nulls).clone();
let new_nulls = list_nulls.lock().unwrap().clone();
if !new_nulls.is_empty() {
merged_nulls |= &new_nulls;
}
write_list_nulls(dest_store, merged_nulls).await?;

Ok(CreatedIndex {
index_details: prost_types::Any::from_msg(&pbold::LabelListIndexDetails::default())
Expand Down Expand Up @@ -259,6 +305,44 @@ fn extract_flatten_indices(list_arr: &dyn Array) -> UInt64Array {
}
}

/// Collect row_ids for list-level NULLs before unnest; unnest drops NULL lists entirely.
fn track_list_nulls(
source: SendableRecordBatchStream,
list_nulls: Arc<Mutex<RowAddrTreeMap>>,
) -> SendableRecordBatchStream {
let schema = source.schema();
let stream = source.try_filter_map(move |batch| {
let list_nulls = list_nulls.clone();
async move {
record_list_nulls(&batch, &list_nulls)?;
Ok(Some(batch))
}
});

Box::pin(RecordBatchStreamAdapter::new(schema, stream))
}

fn record_list_nulls(
batch: &RecordBatch,
list_nulls: &Arc<Mutex<RowAddrTreeMap>>,
) -> datafusion_common::Result<()> {
let values = batch.column_by_name(VALUE_COLUMN_NAME).expect_ok()?;
let row_ids = batch.column_by_name(ROW_ID).expect_ok()?;
let row_ids = row_ids.as_any().downcast_ref::<UInt64Array>().unwrap();

let mut local_nulls = RowAddrTreeMap::new();
for i in 0..values.len() {
if values.is_null(i) {
local_nulls.insert(row_ids.value(i));
}
}
if !local_nulls.is_empty() {
let mut guard = list_nulls.lock().unwrap();
*guard |= &local_nulls;
}
Ok(())
}

fn unnest_schema(schema: &Schema) -> SchemaRef {
let mut fields_iter = schema.fields.iter().cloned();
let key_field = fields_iter.next().unwrap();
Expand Down Expand Up @@ -356,6 +440,71 @@ fn unnest_chunks(
)))
}

async fn read_list_nulls(
store: Arc<dyn IndexStore>,
frag_reuse_index: Option<Arc<FragReuseIndex>>,
) -> Result<RowAddrTreeMap> {
let reader = match store.open_index_file(LABEL_LIST_NULLS_NAME).await {
Ok(reader) => reader,
Err(err) => {
// Old LabelList indices don't have label_list_nulls.lance; treat as empty for compatibility.
let is_not_found = matches!(
&err,
Error::IO { source, .. }
if source
.downcast_ref::<object_store::Error>()
.map(|err| matches!(err, object_store::Error::NotFound { .. }))
.unwrap_or(false)
);
if is_not_found {
return Ok(RowAddrTreeMap::default());
}
return Err(err);
}
};

let batch = reader.read_range(0..1, None).await?;
let null_map = match batch.num_rows() {
0 => RowAddrTreeMap::default(),
1 => {
let bytes = batch
.column(0)
.as_any()
.downcast_ref::<BinaryArray>()
.unwrap()
.value(0);
RowAddrTreeMap::deserialize_from(bytes)?
}
_ => {
return Err(Error::Internal {
message: "label_list_nulls.lance should contain at most one row".to_string(),
location: location!(),
});
}
};

if let Some(frag_reuse_index) = frag_reuse_index {
Ok(frag_reuse_index.remap_row_addrs_tree_map(&null_map))
} else {
Ok(null_map)
}
}

async fn write_list_nulls(store: &dyn IndexStore, null_map: RowAddrTreeMap) -> Result<()> {
let mut writer = store
.new_index_file(LABEL_LIST_NULLS_NAME, LABEL_LIST_NULLS_SCHEMA.clone())
.await?;
let mut bytes = Vec::new();
null_map.serialize_into(&mut bytes)?;
let batch = RecordBatch::try_new(
LABEL_LIST_NULLS_SCHEMA.clone(),
vec![Arc::new(BinaryArray::from_vec(vec![&bytes]))],
)?;

writer.write_record_batch(batch).await?;
writer.finish().await
}

#[derive(Debug, Default)]
pub struct LabelListIndexPlugin;

Expand Down Expand Up @@ -449,11 +598,15 @@ impl ScalarIndexPlugin for LabelListIndexPlugin {
});
}

let list_nulls = Arc::new(Mutex::new(RowAddrTreeMap::new()));
let data = track_list_nulls(data, list_nulls.clone());
let data = unnest_chunks(data)?;
let bitmap_plugin = BitmapIndexPlugin;
bitmap_plugin
.train_index(data, index_store, request, fragment_ids, progress)
.await?;
let list_nulls = list_nulls.lock().unwrap().clone();
write_list_nulls(index_store, list_nulls).await?;
Ok(CreatedIndex {
index_details: prost_types::Any::from_msg(&pbold::LabelListIndexDetails::default())
.unwrap(),
Expand Down
66 changes: 65 additions & 1 deletion rust/lance-index/src/scalar/lance_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ pub mod tests {
use crate::pbold;
use crate::scalar::bitmap::BitmapIndexPlugin;
use crate::scalar::btree::{BTreeIndexPlugin, BTreeParameters};
use crate::scalar::label_list::LabelListIndexPlugin;
use crate::scalar::label_list::{LabelListIndexPlugin, LABEL_LIST_NULLS_NAME};
use crate::scalar::registry::{ScalarIndexPlugin, VALUE_COLUMN_NAME};
use crate::scalar::{
bitmap::BitmapIndex,
Expand Down Expand Up @@ -1598,4 +1598,68 @@ pub mod tests {
_ => panic!("Expected Exact search result"),
}
}

#[tokio::test]
async fn test_label_list_missing_nulls_file_is_compatible() {
let tempdir = TempDir::default();
let index_store = test_store(&tempdir);

let list_array = ListArray::from_iter_primitive::<UInt8Type, _, _>(vec![
Some(vec![Some(1)]),
None,
Some(vec![Some(2)]),
]);
let row_ids = UInt64Array::from_iter_values(0..3);
let schema = Arc::new(Schema::new(vec![
Field::new(
VALUE_COLUMN_NAME,
DataType::List(Arc::new(Field::new("item", DataType::UInt8, true))),
true,
),
Field::new(ROW_ID, DataType::UInt64, false),
]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(list_array), Arc::new(row_ids)],
)
.unwrap();

let batch_reader = RecordBatchIterator::new(vec![Ok(batch)], schema);
train_tag(&index_store, batch_reader).await;

index_store
.delete_index_file(LABEL_LIST_NULLS_NAME)
.await
.unwrap();

let index = LabelListIndexPlugin
.load_index(
index_store,
&default_details::<pbold::LabelListIndexDetails>(),
None,
&LanceCache::no_cache(),
)
.await
.unwrap();

let query = LabelListQuery::HasAnyLabel(vec![ScalarValue::UInt8(Some(1))]);
let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();

match result {
SearchResult::Exact(row_ids) => {
assert!(
row_ids.null_rows().is_empty(),
"missing nulls file should default to empty null rows"
);
let actual_rows: Vec<u64> = row_ids
.true_rows()
.row_addrs()
.unwrap()
.map(u64::from)
.collect();
assert_eq!(actual_rows, vec![0]);
}
_ => panic!("Expected Exact search result"),
}
}
}
24 changes: 24 additions & 0 deletions rust/lance/src/index/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use lance_index::pbold::{
use lance_index::progress::IndexBuildProgress;
use lance_index::registry::IndexPluginRegistry;
use lance_index::scalar::inverted::METADATA_FILE;
use lance_index::scalar::label_list::{LABEL_LIST_NULLS_MIN_VERSION, LABEL_LIST_NULLS_NAME};
use lance_index::scalar::registry::{
ScalarIndexPlugin, TrainingCriteria, TrainingOrdering, VALUE_COLUMN_NAME,
};
Expand Down Expand Up @@ -334,6 +335,29 @@ pub async fn open_scalar_index(
let index_details = fetch_index_details(dataset, column, index).await?;
let plugin = SCALAR_INDEX_PLUGIN_REGISTRY.get_plugin_by_details(index_details.as_ref())?;

if index_details.type_url.ends_with("LabelListIndexDetails") {
if let Some(field) = dataset.schema().field(column) {
if field.nullable {
if index.index_version < LABEL_LIST_NULLS_MIN_VERSION {
log::warn!(
"LabelList index {} is old; NOT filters may be incorrect on nullable lists. Consider rebuilding.",
index.name
);
} else {
let index_dir = dataset.indice_files_dir(index)?.child(uuid_str.clone());
let nulls_path = index_dir.child(LABEL_LIST_NULLS_NAME);
if !dataset.object_store.exists(&nulls_path).await? {
log::warn!(
"LabelList index {} is missing {}; NOT filters may be incorrect on nullable lists. Consider rebuilding.",
index.name,
LABEL_LIST_NULLS_NAME
);
}
}
}
}
}

let frag_reuse_index = dataset.open_frag_reuse_index(metrics).await?;

let index_cache = dataset
Expand Down
Loading