Skip to content

Commit 1bc51f5

Browse files
committed
Move TableConstraint to Constraints conversion
Reduce datafusion-common dependency on sqlparser
1 parent 4a0b768 commit 1bc51f5

File tree

2 files changed

+74
-76
lines changed

2 files changed

+74
-76
lines changed

datafusion/common/src/functional_dependencies.rs

Lines changed: 1 addition & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,8 @@ use std::fmt::{Display, Formatter};
2323
use std::ops::Deref;
2424
use std::vec::IntoIter;
2525

26-
use crate::error::_plan_err;
2726
use crate::utils::{merge_and_order_indices, set_difference};
28-
use crate::{DFSchema, DFSchemaRef, DataFusionError, JoinType, Result};
29-
30-
use sqlparser::ast::TableConstraint;
27+
use crate::{DFSchema, JoinType};
3128

3229
/// This object defines a constraint on a table.
3330
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
@@ -60,74 +57,6 @@ impl Constraints {
6057
Self { inner: constraints }
6158
}
6259

63-
/// Convert each `TableConstraint` to corresponding `Constraint`
64-
pub fn new_from_table_constraints(
65-
constraints: &[TableConstraint],
66-
df_schema: &DFSchemaRef,
67-
) -> Result<Self> {
68-
let constraints = constraints
69-
.iter()
70-
.map(|c: &TableConstraint| match c {
71-
TableConstraint::Unique { name, columns, .. } => {
72-
let field_names = df_schema.field_names();
73-
// Get unique constraint indices in the schema:
74-
let indices = columns
75-
.iter()
76-
.map(|u| {
77-
let idx = field_names
78-
.iter()
79-
.position(|item| *item == u.value)
80-
.ok_or_else(|| {
81-
let name = name
82-
.as_ref()
83-
.map(|name| format!("with name '{name}' "))
84-
.unwrap_or("".to_string());
85-
DataFusionError::Execution(
86-
format!("Column for unique constraint {}not found in schema: {}", name,u.value)
87-
)
88-
})?;
89-
Ok(idx)
90-
})
91-
.collect::<Result<Vec<_>>>()?;
92-
Ok(Constraint::Unique(indices))
93-
}
94-
TableConstraint::PrimaryKey { columns, .. } => {
95-
let field_names = df_schema.field_names();
96-
// Get primary key indices in the schema:
97-
let indices = columns
98-
.iter()
99-
.map(|pk| {
100-
let idx = field_names
101-
.iter()
102-
.position(|item| *item == pk.value)
103-
.ok_or_else(|| {
104-
DataFusionError::Execution(format!(
105-
"Column for primary key not found in schema: {}",
106-
pk.value
107-
))
108-
})?;
109-
Ok(idx)
110-
})
111-
.collect::<Result<Vec<_>>>()?;
112-
Ok(Constraint::PrimaryKey(indices))
113-
}
114-
TableConstraint::ForeignKey { .. } => {
115-
_plan_err!("Foreign key constraints are not currently supported")
116-
}
117-
TableConstraint::Check { .. } => {
118-
_plan_err!("Check constraints are not currently supported")
119-
}
120-
TableConstraint::Index { .. } => {
121-
_plan_err!("Indexes are not currently supported")
122-
}
123-
TableConstraint::FulltextOrSpatial { .. } => {
124-
_plan_err!("Indexes are not currently supported")
125-
}
126-
})
127-
.collect::<Result<Vec<_>>>()?;
128-
Ok(Constraints::new_unverified(constraints))
129-
}
130-
13160
/// Check whether constraints is empty
13261
pub fn is_empty(&self) -> bool {
13362
self.inner.is_empty()

datafusion/sql/src/statement.rs

Lines changed: 73 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,11 @@ use crate::planner::{
3030
use crate::utils::normalize_ident;
3131

3232
use arrow_schema::{DataType, Fields};
33+
use datafusion_common::error::_plan_err;
3334
use datafusion_common::parsers::CompressionTypeVariant;
3435
use datafusion_common::{
3536
exec_err, not_impl_err, plan_datafusion_err, plan_err, schema_err,
36-
unqualified_field_not_found, Column, Constraints, DFSchema, DFSchemaRef,
37+
unqualified_field_not_found, Column, Constraint, Constraints, DFSchema, DFSchemaRef,
3738
DataFusionError, Result, ScalarValue, SchemaError, SchemaReference, TableReference,
3839
ToDFSchema,
3940
};
@@ -427,7 +428,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
427428
plan
428429
};
429430

430-
let constraints = Constraints::new_from_table_constraints(
431+
let constraints = Self::new_constraint_from_table_constraints(
431432
&all_constraints,
432433
plan.schema(),
433434
)?;
@@ -452,7 +453,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
452453
schema,
453454
};
454455
let plan = LogicalPlan::EmptyRelation(plan);
455-
let constraints = Constraints::new_from_table_constraints(
456+
let constraints = Self::new_constraint_from_table_constraints(
456457
&all_constraints,
457458
plan.schema(),
458459
)?;
@@ -1242,7 +1243,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
12421243

12431244
let name = self.object_name_to_table_reference(name)?;
12441245
let constraints =
1245-
Constraints::new_from_table_constraints(&all_constraints, &df_schema)?;
1246+
Self::new_constraint_from_table_constraints(&all_constraints, &df_schema)?;
12461247
Ok(LogicalPlan::Ddl(DdlStatement::CreateExternalTable(
12471248
PlanCreateExternalTable {
12481249
schema: df_schema,
@@ -1262,6 +1263,74 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
12621263
)))
12631264
}
12641265

1266+
/// Convert each `TableConstraint` to corresponding `Constraint`
1267+
fn new_constraint_from_table_constraints(
1268+
constraints: &[TableConstraint],
1269+
df_schema: &DFSchemaRef,
1270+
) -> Result<Constraints> {
1271+
let constraints = constraints
1272+
.iter()
1273+
.map(|c: &TableConstraint| match c {
1274+
TableConstraint::Unique { name, columns, .. } => {
1275+
let field_names = df_schema.field_names();
1276+
// Get unique constraint indices in the schema:
1277+
let indices = columns
1278+
.iter()
1279+
.map(|u| {
1280+
let idx = field_names
1281+
.iter()
1282+
.position(|item| *item == u.value)
1283+
.ok_or_else(|| {
1284+
let name = name
1285+
.as_ref()
1286+
.map(|name| format!("with name '{name}' "))
1287+
.unwrap_or("".to_string());
1288+
DataFusionError::Execution(
1289+
format!("Column for unique constraint {}not found in schema: {}", name,u.value)
1290+
)
1291+
})?;
1292+
Ok(idx)
1293+
})
1294+
.collect::<Result<Vec<_>>>()?;
1295+
Ok(Constraint::Unique(indices))
1296+
}
1297+
TableConstraint::PrimaryKey { columns, .. } => {
1298+
let field_names = df_schema.field_names();
1299+
// Get primary key indices in the schema:
1300+
let indices = columns
1301+
.iter()
1302+
.map(|pk| {
1303+
let idx = field_names
1304+
.iter()
1305+
.position(|item| *item == pk.value)
1306+
.ok_or_else(|| {
1307+
DataFusionError::Execution(format!(
1308+
"Column for primary key not found in schema: {}",
1309+
pk.value
1310+
))
1311+
})?;
1312+
Ok(idx)
1313+
})
1314+
.collect::<Result<Vec<_>>>()?;
1315+
Ok(Constraint::PrimaryKey(indices))
1316+
}
1317+
TableConstraint::ForeignKey { .. } => {
1318+
_plan_err!("Foreign key constraints are not currently supported")
1319+
}
1320+
TableConstraint::Check { .. } => {
1321+
_plan_err!("Check constraints are not currently supported")
1322+
}
1323+
TableConstraint::Index { .. } => {
1324+
_plan_err!("Indexes are not currently supported")
1325+
}
1326+
TableConstraint::FulltextOrSpatial { .. } => {
1327+
_plan_err!("Indexes are not currently supported")
1328+
}
1329+
})
1330+
.collect::<Result<Vec<_>>>()?;
1331+
Ok(Constraints::new_unverified(constraints))
1332+
}
1333+
12651334
fn parse_options_map(
12661335
&self,
12671336
options: Vec<(String, Value)>,

0 commit comments

Comments
 (0)