Skip to content

Commit 7fe578d

Browse files
KKouldb41sh
andauthored
chore: VirtualColumn Bind (#17862)
* feat: Bind `VirtualColumn` as a `BaseColumn`, rewrite `get_by_keypath` to `VirtualColumn` Signed-off-by: Kould <[email protected]> * chore: fix ci fail Signed-off-by: Kould <[email protected]> * chore: fix code reviews Signed-off-by: Kould <[email protected]> * fix: virtual column cast Signed-off-by: Kould <[email protected]> * chore: fix logic test Signed-off-by: Kould <[email protected]> * fix virtual column bind * add comments --------- Signed-off-by: Kould <[email protected]> Co-authored-by: baishen <[email protected]>
1 parent 23f254f commit 7fe578d

File tree

21 files changed

+685
-485
lines changed

21 files changed

+685
-485
lines changed

src/query/catalog/src/plan/pushdown.rs

-6
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ use databend_common_expression::Scalar;
2525
use databend_common_expression::TableDataType;
2626
use databend_common_expression::TableField;
2727
use databend_common_expression::TableSchema;
28-
use databend_common_expression::TableSchemaRef;
2928
use databend_storages_common_table_meta::table::ChangeType;
3029

3130
use super::AggIndexInfo;
@@ -36,8 +35,6 @@ use crate::plan::Projection;
3635
/// Generated from the source column by the paths.
3736
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq, Eq)]
3837
pub struct VirtualColumnInfo {
39-
/// The schema of virtual columns.
40-
pub schema: TableSchemaRef,
4138
/// The source column ids of virtual columns.
4239
/// If the virtual columns are not generated,
4340
/// we can read data from source column to generate them.
@@ -63,9 +60,6 @@ pub struct VirtualColumnField {
6360
pub cast_func_name: Option<String>,
6461
/// Virtual column data type.
6562
pub data_type: Box<TableDataType>,
66-
/// Is the virtual column is created,
67-
/// if not, reminder user to create it.
68-
pub is_created: bool,
6963
}
7064

7165
/// Information about prewhere optimization.

src/query/functions/src/scalars/variant.rs

+18-16
Original file line numberDiff line numberDiff line change
@@ -2324,24 +2324,26 @@ fn get_by_keypath_fn(
23242324
match json_row {
23252325
ScalarRef::Variant(v) => {
23262326
match RawJsonb::new(v).get_by_keypath(path.paths.iter()) {
2327-
Ok(Some(res)) => {
2328-
match &mut builder {
2329-
ColumnBuilder::String(builder) => {
2330-
let raw_jsonb = res.as_raw();
2331-
if let Ok(Some(s)) = raw_jsonb.as_str() {
2332-
builder.put_str(&s);
2333-
} else {
2334-
let json_str = raw_jsonb.to_string();
2335-
builder.put_str(&json_str);
2336-
}
2337-
}
2338-
ColumnBuilder::Variant(builder) => {
2339-
builder.put_slice(res.as_ref());
2327+
Ok(Some(res)) => match &mut builder {
2328+
ColumnBuilder::String(builder) => {
2329+
let raw_jsonb = res.as_raw();
2330+
if raw_jsonb.is_null().unwrap_or_default() {
2331+
validity.push(false);
2332+
} else if let Ok(Some(s)) = raw_jsonb.as_str() {
2333+
builder.put_str(&s);
2334+
validity.push(true);
2335+
} else {
2336+
let json_str = raw_jsonb.to_string();
2337+
builder.put_str(&json_str);
2338+
validity.push(true);
23402339
}
2341-
_ => unreachable!(),
23422340
}
2343-
validity.push(true);
2344-
}
2341+
ColumnBuilder::Variant(builder) => {
2342+
builder.put_slice(res.as_ref());
2343+
validity.push(true);
2344+
}
2345+
_ => unreachable!(),
2346+
},
23452347
_ => validity.push(false),
23462348
}
23472349
}

src/query/functions/tests/it/scalars/testdata/variant.txt

+6-6
Original file line numberDiff line numberDiff line change
@@ -3666,19 +3666,19 @@ output : NULL
36663666
ast : parse_json('{"k": null}') #>> '{k}'
36673667
raw expr : get_by_keypath_string(parse_json('{"k": null}'), '{k}')
36683668
checked expr : get_by_keypath_string<Variant, String>(CAST<String>("{\"k\": null}" AS Variant), "{k}")
3669-
optimized expr : "null"
3669+
optimized expr : NULL
36703670
output type : String NULL
3671-
output domain : {"null"..="null"}
3672-
output : 'null'
3671+
output domain : {NULL}
3672+
output : NULL
36733673

36743674

36753675
ast : parse_json('[10, 20, null]') #>> '{2}'
36763676
raw expr : get_by_keypath_string(parse_json('[10, 20, null]'), '{2}')
36773677
checked expr : get_by_keypath_string<Variant, String>(CAST<String>("[10, 20, null]" AS Variant), "{2}")
3678-
optimized expr : "null"
3678+
optimized expr : NULL
36793679
output type : String NULL
3680-
output domain : {"null"..="null"}
3681-
output : 'null'
3680+
output domain : {NULL}
3681+
output : NULL
36823682

36833683

36843684
ast : parse_json('[10, {"a":{"k1":[1,2,3], "k2":2}}, 30]') #>> '{1, a, k1}'

src/query/sql/src/executor/physical_plans/physical_table_scan.rs

+102-57
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ use std::collections::BTreeMap;
1717
use std::collections::HashSet;
1818
use std::sync::Arc;
1919

20+
use databend_common_ast::parser::token::TokenKind;
21+
use databend_common_ast::parser::tokenize_sql;
2022
use databend_common_catalog::catalog::CatalogManager;
2123
use databend_common_catalog::plan::DataSourcePlan;
2224
use databend_common_catalog::plan::Filters;
@@ -29,20 +31,21 @@ use databend_common_catalog::plan::VirtualColumnInfo;
2931
use databend_common_exception::ErrorCode;
3032
use databend_common_exception::Result;
3133
use databend_common_expression::type_check::check_function;
32-
use databend_common_expression::type_check::get_simple_cast_function;
3334
use databend_common_expression::types::DataType;
3435
use databend_common_expression::ConstantFolder;
3536
use databend_common_expression::DataField;
3637
use databend_common_expression::DataSchemaRef;
3738
use databend_common_expression::DataSchemaRefExt;
3839
use databend_common_expression::FieldIndex;
3940
use databend_common_expression::RemoteExpr;
41+
use databend_common_expression::Scalar;
4042
use databend_common_expression::TableDataType;
41-
use databend_common_expression::TableField;
4243
use databend_common_expression::TableSchema;
4344
use databend_common_expression::TableSchemaRef;
4445
use databend_common_expression::ROW_ID_COL_NAME;
4546
use databend_common_functions::BUILTIN_FUNCTIONS;
47+
use jsonb::keypath::KeyPath;
48+
use jsonb::keypath::KeyPaths;
4649
use rand::distributions::Bernoulli;
4750
use rand::distributions::Distribution;
4851
use rand::thread_rng;
@@ -124,7 +127,24 @@ impl PhysicalPlanBuilder {
124127
let scan = if scan.columns.is_empty() {
125128
scan.clone()
126129
} else {
127-
let columns = scan.columns.clone();
130+
let mut columns = scan.columns.clone();
131+
132+
let required_column_ids: Vec<_> = required.difference(&columns).cloned().collect();
133+
if !required_column_ids.is_empty() {
134+
// add virtual columns to table scan columns.
135+
let read_guard = self.metadata.read();
136+
let virtual_column_id_set = read_guard
137+
.virtual_columns_by_table_index(scan.table_index)
138+
.iter()
139+
.map(|column| column.index())
140+
.collect::<HashSet<_>>();
141+
for required_column_id in required_column_ids {
142+
if virtual_column_id_set.contains(&required_column_id) {
143+
columns.insert(required_column_id);
144+
}
145+
}
146+
}
147+
128148
let mut prewhere = scan.prewhere.clone();
129149
let mut used: ColumnSet = required.intersection(&columns).cloned().collect();
130150
if scan.is_lazy_table {
@@ -150,9 +170,9 @@ impl PhysicalPlanBuilder {
150170

151171
// 2. Build physical plan.
152172
let mut has_inner_column = false;
153-
let mut has_virtual_column = false;
154173
let mut name_mapping = BTreeMap::new();
155174
let mut project_internal_columns = BTreeMap::new();
175+
let mut project_virtual_columns = BTreeMap::new();
156176
let metadata = self.metadata.read().clone();
157177

158178
for index in scan.columns.iter() {
@@ -171,8 +191,8 @@ impl PhysicalPlanBuilder {
171191
}) => {
172192
project_internal_columns.insert(*index, internal_column.to_owned());
173193
}
174-
ColumnEntry::VirtualColumn(_) => {
175-
has_virtual_column = true;
194+
ColumnEntry::VirtualColumn(virtual_column) => {
195+
project_virtual_columns.insert(*index, virtual_column.clone());
176196
}
177197
_ => {}
178198
}
@@ -225,8 +245,12 @@ impl PhysicalPlanBuilder {
225245
table_schema = Arc::new(schema);
226246
}
227247

228-
let push_downs =
229-
self.push_downs(&scan, &table_schema, has_inner_column, has_virtual_column)?;
248+
let push_downs = self.push_downs(
249+
&scan,
250+
&table_schema,
251+
project_virtual_columns,
252+
has_inner_column,
253+
)?;
230254

231255
let mut source = table
232256
.read_plan(
@@ -338,8 +362,8 @@ impl PhysicalPlanBuilder {
338362
&self,
339363
scan: &crate::plans::Scan,
340364
table_schema: &TableSchema,
365+
virtual_columns: BTreeMap<IndexType, VirtualColumn>,
341366
has_inner_column: bool,
342-
has_virtual_column: bool,
343367
) -> Result<PushDownInfo> {
344368
let metadata = self.metadata.read().clone();
345369
let projection = Self::build_projection(
@@ -353,6 +377,7 @@ impl PhysicalPlanBuilder {
353377
true,
354378
true,
355379
);
380+
let has_virtual_column = !virtual_columns.is_empty();
356381

357382
let output_columns = if has_virtual_column {
358383
Some(Self::build_projection(
@@ -524,7 +549,7 @@ impl PhysicalPlanBuilder {
524549
}
525550
}
526551

527-
let virtual_column = self.build_virtual_column(&scan.columns);
552+
let virtual_column = self.build_virtual_column(virtual_columns)?;
528553

529554
Ok(PushDownInfo {
530555
projection: Some(projection),
@@ -558,63 +583,83 @@ impl PhysicalPlanBuilder {
558583
}
559584
}
560585

561-
fn build_virtual_column(&self, indices: &ColumnSet) -> Option<VirtualColumnInfo> {
562-
let mut source_column_ids = HashSet::new();
563-
let mut column_and_indices = Vec::new();
564-
for index in indices.iter() {
565-
if let ColumnEntry::VirtualColumn(virtual_column) = self.metadata.read().column(*index)
566-
{
567-
source_column_ids.insert(virtual_column.source_column_id);
568-
let cast_func_name =
569-
if virtual_column.data_type.remove_nullable() != TableDataType::Variant {
570-
let dest_type = DataType::from(&virtual_column.data_type.remove_nullable());
571-
get_simple_cast_function(true, &DataType::Variant, &dest_type)
572-
} else {
573-
None
574-
};
575-
let virtual_column_field = VirtualColumnField {
576-
source_column_id: virtual_column.source_column_id,
577-
source_name: virtual_column.source_column_name.clone(),
578-
column_id: virtual_column.column_id,
579-
name: virtual_column.column_name.clone(),
580-
key_paths: virtual_column.key_paths.clone(),
581-
cast_func_name,
582-
data_type: Box::new(virtual_column.data_type.clone()),
583-
is_created: virtual_column.is_created,
586+
fn parse_virtual_column_name(name: &str) -> Result<Scalar> {
587+
let tokens = tokenize_sql(name)?;
588+
let mut i = 0;
589+
let mut key_paths = Vec::new();
590+
while i < tokens.len() {
591+
let token = &tokens[i];
592+
if token.kind == TokenKind::LBracket {
593+
i += 1;
594+
if i >= tokens.len() {
595+
return Err(ErrorCode::Internal(format!(
596+
"Invalid virtual column name {}",
597+
name
598+
)));
599+
}
600+
let path_token = &tokens[i];
601+
let path = path_token.text();
602+
let key_path = if path_token.kind == TokenKind::LiteralString {
603+
let s = &path[1..path.len() - 1];
604+
KeyPath::QuotedName(std::borrow::Cow::Borrowed(s))
605+
} else if path_token.kind == TokenKind::LiteralInteger {
606+
let idx = path.parse::<i32>().unwrap();
607+
KeyPath::Index(idx)
608+
} else {
609+
return Err(ErrorCode::Internal(format!(
610+
"Invalid virtual column name {}",
611+
name
612+
)));
584613
};
585-
column_and_indices.push((virtual_column_field, *index));
614+
key_paths.push(key_path);
615+
// skip TokenKind::RBracket
616+
i += 1;
586617
}
618+
i += 1;
587619
}
588-
if column_and_indices.is_empty() {
589-
return None;
620+
let keypaths = KeyPaths { paths: key_paths };
621+
622+
Ok(Scalar::String(format!("{}", keypaths)))
623+
}
624+
625+
fn build_virtual_column(
626+
&self,
627+
virtual_columns: BTreeMap<IndexType, VirtualColumn>,
628+
) -> Result<Option<VirtualColumnInfo>> {
629+
if virtual_columns.is_empty() {
630+
return Ok(None);
590631
}
591-
// Make the order of virtual columns the same as their indexes.
592-
column_and_indices.sort_by_key(|(_, index)| *index);
593-
594-
let virtual_column_fields = column_and_indices
595-
.into_iter()
596-
.map(|(column, _)| column)
597-
.collect::<Vec<_>>();
598-
599-
let mut fields = Vec::with_capacity(virtual_column_fields.len());
600-
let next_column_id = virtual_column_fields[0].column_id;
601-
for virtual_column_field in &virtual_column_fields {
602-
let field = TableField::new_from_column_id(
603-
&virtual_column_field.name,
604-
*virtual_column_field.data_type.clone(),
605-
virtual_column_field.column_id,
606-
);
607-
fields.push(field);
632+
let mut source_column_ids = HashSet::new();
633+
let mut virtual_column_fields = Vec::with_capacity(virtual_columns.len());
634+
635+
for (_, virtual_column) in virtual_columns.into_iter() {
636+
source_column_ids.insert(virtual_column.source_column_id);
637+
let target_type = virtual_column.data_type.remove_nullable();
638+
639+
let key_paths = Self::parse_virtual_column_name(&virtual_column.column_name)?;
640+
let cast_func_name = if target_type != TableDataType::Variant {
641+
Some(format!("to_{}", target_type.to_string().to_lowercase()))
642+
} else {
643+
None
644+
};
645+
646+
let virtual_column_field = VirtualColumnField {
647+
source_column_id: virtual_column.source_column_id,
648+
source_name: virtual_column.source_column_name.clone(),
649+
column_id: virtual_column.column_id,
650+
name: virtual_column.column_name.clone(),
651+
key_paths,
652+
cast_func_name,
653+
data_type: Box::new(virtual_column.data_type.clone()),
654+
};
655+
virtual_column_fields.push(virtual_column_field);
608656
}
609-
let metadata = BTreeMap::new();
610-
let schema = TableSchema::new_from_column_ids(fields, metadata, next_column_id);
611657

612658
let virtual_column_info = VirtualColumnInfo {
613-
schema: Arc::new(schema),
614659
source_column_ids,
615660
virtual_column_fields,
616661
};
617-
Some(virtual_column_info)
662+
Ok(Some(virtual_column_info))
618663
}
619664

620665
pub(crate) fn build_agg_index(

0 commit comments

Comments
 (0)