Skip to content

Commit 117ab1b

Browse files
authored
Add LogicalPlan::CreateIndex (#11817)
* Add create index plan * Fix clippy lints
1 parent bddb641 commit 117ab1b

File tree

7 files changed

+121
-27
lines changed

7 files changed

+121
-27
lines changed

datafusion/core/src/execution/context/mod.rs

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -544,30 +544,35 @@ impl SessionContext {
544544
// stack overflows.
545545
match ddl {
546546
DdlStatement::CreateExternalTable(cmd) => {
547-
Box::pin(async move { self.create_external_table(&cmd).await })
548-
as std::pin::Pin<Box<dyn futures::Future<Output = _> + Send>>
547+
(Box::pin(async move { self.create_external_table(&cmd).await })
548+
as std::pin::Pin<Box<dyn futures::Future<Output = _> + Send>>)
549+
.await
549550
}
550551
DdlStatement::CreateMemoryTable(cmd) => {
551-
Box::pin(self.create_memory_table(cmd))
552+
Box::pin(self.create_memory_table(cmd)).await
553+
}
554+
DdlStatement::CreateView(cmd) => {
555+
Box::pin(self.create_view(cmd)).await
552556
}
553-
DdlStatement::CreateView(cmd) => Box::pin(self.create_view(cmd)),
554557
DdlStatement::CreateCatalogSchema(cmd) => {
555-
Box::pin(self.create_catalog_schema(cmd))
558+
Box::pin(self.create_catalog_schema(cmd)).await
556559
}
557560
DdlStatement::CreateCatalog(cmd) => {
558-
Box::pin(self.create_catalog(cmd))
561+
Box::pin(self.create_catalog(cmd)).await
559562
}
560-
DdlStatement::DropTable(cmd) => Box::pin(self.drop_table(cmd)),
561-
DdlStatement::DropView(cmd) => Box::pin(self.drop_view(cmd)),
563+
DdlStatement::DropTable(cmd) => Box::pin(self.drop_table(cmd)).await,
564+
DdlStatement::DropView(cmd) => Box::pin(self.drop_view(cmd)).await,
562565
DdlStatement::DropCatalogSchema(cmd) => {
563-
Box::pin(self.drop_schema(cmd))
566+
Box::pin(self.drop_schema(cmd)).await
564567
}
565568
DdlStatement::CreateFunction(cmd) => {
566-
Box::pin(self.create_function(cmd))
569+
Box::pin(self.create_function(cmd)).await
570+
}
571+
DdlStatement::DropFunction(cmd) => {
572+
Box::pin(self.drop_function(cmd)).await
567573
}
568-
DdlStatement::DropFunction(cmd) => Box::pin(self.drop_function(cmd)),
574+
ddl => Ok(DataFrame::new(self.state(), LogicalPlan::Ddl(ddl))),
569575
}
570-
.await
571576
}
572577
// TODO what about the other statements (like TransactionStart and TransactionEnd)
573578
LogicalPlan::Statement(Statement::SetVariable(stmt)) => {

datafusion/expr/src/logical_plan/ddl.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ pub enum DdlStatement {
4141
CreateCatalogSchema(CreateCatalogSchema),
4242
/// Creates a new catalog (aka "Database").
4343
CreateCatalog(CreateCatalog),
44+
/// Creates a new index.
45+
CreateIndex(CreateIndex),
4446
/// Drops a table.
4547
DropTable(DropTable),
4648
/// Drops a view.
@@ -66,6 +68,7 @@ impl DdlStatement {
6668
schema
6769
}
6870
DdlStatement::CreateCatalog(CreateCatalog { schema, .. }) => schema,
71+
DdlStatement::CreateIndex(CreateIndex { schema, .. }) => schema,
6972
DdlStatement::DropTable(DropTable { schema, .. }) => schema,
7073
DdlStatement::DropView(DropView { schema, .. }) => schema,
7174
DdlStatement::DropCatalogSchema(DropCatalogSchema { schema, .. }) => schema,
@@ -83,6 +86,7 @@ impl DdlStatement {
8386
DdlStatement::CreateView(_) => "CreateView",
8487
DdlStatement::CreateCatalogSchema(_) => "CreateCatalogSchema",
8588
DdlStatement::CreateCatalog(_) => "CreateCatalog",
89+
DdlStatement::CreateIndex(_) => "CreateIndex",
8690
DdlStatement::DropTable(_) => "DropTable",
8791
DdlStatement::DropView(_) => "DropView",
8892
DdlStatement::DropCatalogSchema(_) => "DropCatalogSchema",
@@ -101,6 +105,7 @@ impl DdlStatement {
101105
vec![input]
102106
}
103107
DdlStatement::CreateView(CreateView { input, .. }) => vec![input],
108+
DdlStatement::CreateIndex(_) => vec![],
104109
DdlStatement::DropTable(_) => vec![],
105110
DdlStatement::DropView(_) => vec![],
106111
DdlStatement::DropCatalogSchema(_) => vec![],
@@ -147,6 +152,9 @@ impl DdlStatement {
147152
}) => {
148153
write!(f, "CreateCatalog: {catalog_name:?}")
149154
}
155+
DdlStatement::CreateIndex(CreateIndex { name, .. }) => {
156+
write!(f, "CreateIndex: {name:?}")
157+
}
150158
DdlStatement::DropTable(DropTable {
151159
name, if_exists, ..
152160
}) => {
@@ -351,3 +359,14 @@ pub struct DropFunction {
351359
pub if_exists: bool,
352360
pub schema: DFSchemaRef,
353361
}
362+
363+
#[derive(Clone, PartialEq, Eq, Hash, Debug)]
364+
pub struct CreateIndex {
365+
pub name: Option<String>,
366+
pub table: TableReference,
367+
pub using: Option<String>,
368+
pub columns: Vec<Expr>,
369+
pub unique: bool,
370+
pub if_not_exists: bool,
371+
pub schema: DFSchemaRef,
372+
}

datafusion/expr/src/logical_plan/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ pub use builder::{
3030
};
3131
pub use ddl::{
3232
CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateFunction,
33-
CreateFunctionBody, CreateMemoryTable, CreateView, DdlStatement, DropCatalogSchema,
34-
DropFunction, DropTable, DropView, OperateFunctionArg,
33+
CreateFunctionBody, CreateIndex, CreateMemoryTable, CreateView, DdlStatement,
34+
DropCatalogSchema, DropFunction, DropTable, DropView, OperateFunctionArg,
3535
};
3636
pub use dml::{DmlStatement, WriteOp};
3737
pub use plan::{

datafusion/expr/src/logical_plan/tree_node.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,7 @@ impl TreeNode for LogicalPlan {
303303
DdlStatement::CreateExternalTable(_)
304304
| DdlStatement::CreateCatalogSchema(_)
305305
| DdlStatement::CreateCatalog(_)
306+
| DdlStatement::CreateIndex(_)
306307
| DdlStatement::DropTable(_)
307308
| DdlStatement::DropView(_)
308309
| DdlStatement::DropCatalogSchema(_)

datafusion/proto/src/logical_plan/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1645,6 +1645,9 @@ impl AsLogicalPlan for LogicalPlanNode {
16451645
LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(_)) => Err(proto_error(
16461646
"LogicalPlan serde is not yet implemented for CreateMemoryTable",
16471647
)),
1648+
LogicalPlan::Ddl(DdlStatement::CreateIndex(_)) => Err(proto_error(
1649+
"LogicalPlan serde is not yet implemented for CreateIndex",
1650+
)),
16481651
LogicalPlan::Ddl(DdlStatement::DropTable(_)) => Err(proto_error(
16491652
"LogicalPlan serde is not yet implemented for DropTable",
16501653
)),

datafusion/sql/src/statement.rs

Lines changed: 47 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -45,20 +45,20 @@ use datafusion_expr::utils::expr_to_columns;
4545
use datafusion_expr::{
4646
cast, col, Analyze, CreateCatalog, CreateCatalogSchema,
4747
CreateExternalTable as PlanCreateExternalTable, CreateFunction, CreateFunctionBody,
48-
CreateMemoryTable, CreateView, DescribeTable, DmlStatement, DropCatalogSchema,
49-
DropFunction, DropTable, DropView, EmptyRelation, Explain, Expr, ExprSchemable,
50-
Filter, LogicalPlan, LogicalPlanBuilder, OperateFunctionArg, PlanType, Prepare,
51-
SetVariable, Statement as PlanStatement, ToStringifiedPlan, TransactionAccessMode,
52-
TransactionConclusion, TransactionEnd, TransactionIsolationLevel, TransactionStart,
53-
Volatility, WriteOp,
48+
CreateIndex as PlanCreateIndex, CreateMemoryTable, CreateView, DescribeTable,
49+
DmlStatement, DropCatalogSchema, DropFunction, DropTable, DropView, EmptyRelation,
50+
Explain, Expr, ExprSchemable, Filter, LogicalPlan, LogicalPlanBuilder,
51+
OperateFunctionArg, PlanType, Prepare, SetVariable, Statement as PlanStatement,
52+
ToStringifiedPlan, TransactionAccessMode, TransactionConclusion, TransactionEnd,
53+
TransactionIsolationLevel, TransactionStart, Volatility, WriteOp,
5454
};
5555
use sqlparser::ast;
5656
use sqlparser::ast::{
57-
Assignment, AssignmentTarget, ColumnDef, CreateTable, CreateTableOptions, Delete,
58-
DescribeAlias, Expr as SQLExpr, FromTable, Ident, Insert, ObjectName, ObjectType,
59-
OneOrManyWithParens, Query, SchemaName, SetExpr, ShowCreateObject,
60-
ShowStatementFilter, Statement, TableConstraint, TableFactor, TableWithJoins,
61-
TransactionMode, UnaryOperator, Value,
57+
Assignment, AssignmentTarget, ColumnDef, CreateIndex, CreateTable,
58+
CreateTableOptions, Delete, DescribeAlias, Expr as SQLExpr, FromTable, Ident, Insert,
59+
ObjectName, ObjectType, OneOrManyWithParens, Query, SchemaName, SetExpr,
60+
ShowCreateObject, ShowStatementFilter, Statement, TableConstraint, TableFactor,
61+
TableWithJoins, TransactionMode, UnaryOperator, Value,
6262
};
6363
use sqlparser::parser::ParserError::ParserError;
6464

@@ -769,6 +769,42 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
769769
exec_err!("Function name not provided")
770770
}
771771
}
772+
Statement::CreateIndex(CreateIndex {
773+
name,
774+
table_name,
775+
using,
776+
columns,
777+
unique,
778+
if_not_exists,
779+
..
780+
}) => {
781+
let name: Option<String> = name.as_ref().map(object_name_to_string);
782+
let table = self.object_name_to_table_reference(table_name)?;
783+
let table_schema = self
784+
.context_provider
785+
.get_table_source(table.clone())?
786+
.schema()
787+
.to_dfschema_ref()?;
788+
let using: Option<String> = using.as_ref().map(ident_to_string);
789+
let columns = self.order_by_to_sort_expr(
790+
columns,
791+
&table_schema,
792+
planner_context,
793+
false,
794+
None,
795+
)?;
796+
Ok(LogicalPlan::Ddl(DdlStatement::CreateIndex(
797+
PlanCreateIndex {
798+
name,
799+
table,
800+
using,
801+
columns,
802+
unique,
803+
if_not_exists,
804+
schema: DFSchemaRef::new(DFSchema::empty()),
805+
},
806+
)))
807+
}
772808
_ => {
773809
not_impl_err!("Unsupported SQL statement: {sql:?}")
774810
}

datafusion/sql/tests/sql_integration.rs

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,12 @@ use datafusion_common::{
2828
assert_contains, DataFusionError, ParamValues, Result, ScalarValue,
2929
};
3030
use datafusion_expr::{
31+
col,
3132
dml::CopyTo,
3233
logical_plan::{LogicalPlan, Prepare},
3334
test::function_stub::sum_udaf,
34-
ColumnarValue, CreateExternalTable, DdlStatement, ScalarUDF, ScalarUDFImpl,
35-
Signature, Volatility,
35+
ColumnarValue, CreateExternalTable, CreateIndex, DdlStatement, ScalarUDF,
36+
ScalarUDFImpl, Signature, Volatility,
3637
};
3738
use datafusion_functions::{string, unicode};
3839
use datafusion_sql::{
@@ -4426,6 +4427,35 @@ fn test_parse_escaped_string_literal_value() {
44264427
)
44274428
}
44284429

4430+
#[test]
4431+
fn plan_create_index() {
4432+
let sql =
4433+
"CREATE UNIQUE INDEX IF NOT EXISTS idx_name ON test USING btree (name, age DESC)";
4434+
let plan = logical_plan_with_options(sql, ParserOptions::default()).unwrap();
4435+
match plan {
4436+
LogicalPlan::Ddl(DdlStatement::CreateIndex(CreateIndex {
4437+
name,
4438+
table,
4439+
using,
4440+
columns,
4441+
unique,
4442+
if_not_exists,
4443+
..
4444+
})) => {
4445+
assert_eq!(name, Some("idx_name".to_string()));
4446+
assert_eq!(format!("{table}"), "test");
4447+
assert_eq!(using, Some("btree".to_string()));
4448+
assert_eq!(
4449+
columns,
4450+
vec![col("name").sort(true, false), col("age").sort(false, true),]
4451+
);
4452+
assert!(unique);
4453+
assert!(if_not_exists);
4454+
}
4455+
_ => panic!("wrong plan type"),
4456+
}
4457+
}
4458+
44294459
fn assert_field_not_found(err: DataFusionError, name: &str) {
44304460
match err {
44314461
DataFusionError::SchemaError { .. } => {

0 commit comments

Comments
 (0)