Skip to content

Add SQLOptions for controlling allowed SQL statements, update docs #7333

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Aug 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
263 changes: 208 additions & 55 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@ use crate::{
optimizer::optimizer::Optimizer,
physical_optimizer::optimizer::{PhysicalOptimizer, PhysicalOptimizerRule},
};
use datafusion_common::{alias::AliasGenerator, not_impl_err, plan_err};
use datafusion_common::{
alias::AliasGenerator,
not_impl_err, plan_err,
tree_node::{TreeNode, TreeNodeVisitor, VisitRecursion},
};
use datafusion_execution::registry::SerializerRegistry;
use datafusion_expr::{
logical_plan::{DdlStatement, Statement},
Expand Down Expand Up @@ -163,35 +167,64 @@ where
/// * Register a custom data source that can be referenced from a SQL query.
/// * Execution a SQL query
///
/// # Example: DataFrame API
///
/// The following example demonstrates how to use the context to execute a query against a CSV
/// data source using the DataFrame API:
///
/// ```
/// use datafusion::prelude::*;
/// # use datafusion::error::Result;
/// # use datafusion::{error::Result, assert_batches_eq};
/// # #[tokio::main]
/// # async fn main() -> Result<()> {
/// let ctx = SessionContext::new();
/// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
/// let df = df.filter(col("a").lt_eq(col("b")))?
/// .aggregate(vec![col("a")], vec![min(col("b"))])?
/// .limit(0, Some(100))?;
/// let results = df.collect();
/// let results = df
/// .collect()
/// .await?;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since the results aren't checked it turns out this code was not actually running the plan (missing an await so I fixed that)

/// assert_batches_eq!(
/// &[
/// "+---+----------------+",
/// "| a | MIN(?table?.b) |",
/// "+---+----------------+",
/// "| 1 | 2 |",
/// "+---+----------------+",
/// ],
/// &results
/// );
/// # Ok(())
/// # }
/// ```
///
/// # Example: SQL API
///
/// The following example demonstrates how to execute the same query using SQL:
///
/// ```
/// use datafusion::prelude::*;
///
/// # use datafusion::error::Result;
/// # use datafusion::{error::Result, assert_batches_eq};
/// # #[tokio::main]
/// # async fn main() -> Result<()> {
/// let mut ctx = SessionContext::new();
/// ctx.register_csv("example", "tests/data/example.csv", CsvReadOptions::new()).await?;
/// let results = ctx.sql("SELECT a, MIN(b) FROM example GROUP BY a LIMIT 100").await?;
/// let results = ctx
/// .sql("SELECT a, MIN(b) FROM example GROUP BY a LIMIT 100")
/// .await?
/// .collect()
/// .await?;
/// assert_batches_eq!(
/// &[
/// "+---+----------------+",
/// "| a | MIN(example.b) |",
/// "+---+----------------+",
/// "| 1 | 2 |",
/// "+---+----------------+",
/// ],
/// &results
/// );
/// # Ok(())
/// # }
/// ```
Expand Down Expand Up @@ -342,22 +375,82 @@ impl SessionContext {
self.state.read().config.clone()
}

/// Creates a [`DataFrame`] that will execute a SQL query.
/// Creates a [`DataFrame`] from SQL query text.
///
/// Note: This API implements DDL statements such as `CREATE TABLE` and
/// `CREATE VIEW` and DML statements such as `INSERT INTO` with in-memory
/// default implementations.
/// default implementations. See [`Self::sql_with_options`].
///
/// # Example: Running SQL queries
///
/// See the example on [`Self`]
///
/// If this is not desirable, consider using [`SessionState::create_logical_plan()`] which
/// does not mutate the state based on such statements.
/// # Example: Creating a Table with SQL
///
/// ```
/// use datafusion::prelude::*;
/// # use datafusion::{error::Result, assert_batches_eq};
/// # #[tokio::main]
/// # async fn main() -> Result<()> {
/// let mut ctx = SessionContext::new();
/// ctx
/// .sql("CREATE TABLE foo (x INTEGER)")
/// .await?
/// .collect()
/// .await?;
/// assert!(ctx.table_exist("foo").unwrap());
/// # Ok(())
/// # }
/// ```
pub async fn sql(&self, sql: &str) -> Result<DataFrame> {
// create a query planner
self.sql_with_options(sql, SQLOptions::new()).await
}

/// Creates a [`DataFrame`] from SQL query text, first validating
/// that the queries are allowed by `options`
///
/// # Example: Preventing Creating a Table with SQL
///
/// If you want to avoid creating tables, or modifying data or the
/// session, set [`SQLOptions`] appropriately:
///
/// ```
/// use datafusion::prelude::*;
/// # use datafusion::{error::Result};
/// # use datafusion::physical_plan::collect;
/// # #[tokio::main]
/// # async fn main() -> Result<()> {
/// let mut ctx = SessionContext::new();
/// let options = SQLOptions::new()
/// .with_allow_ddl(false);
/// let err = ctx.sql_with_options("CREATE TABLE foo (x INTEGER)", options)
/// .await
/// .unwrap_err();
/// assert_eq!(
/// err.to_string(),
/// "Error during planning: DDL not supported: CreateMemoryTable"
/// );
/// # Ok(())
/// # }
/// ```
pub async fn sql_with_options(
&self,
sql: &str,
options: SQLOptions,
) -> Result<DataFrame> {
let plan = self.state().create_logical_plan(sql).await?;
options.verify_plan(&plan)?;

self.execute_logical_plan(plan).await
}

/// Execute the [`LogicalPlan`], return a [`DataFrame`]
/// Execute the [`LogicalPlan`], return a [`DataFrame`]. This API
/// is not featured limited (so all SQL such as `CREATE TABLE` and
/// `COPY` will be run).
///
/// If you wish to limit the type of plan that can be run from
/// SQL, see [`Self::sql_with_options`] and
/// [`SQLOptions::verify_plan`].
pub async fn execute_logical_plan(&self, plan: LogicalPlan) -> Result<DataFrame> {
match plan {
LogicalPlan::Ddl(ddl) => match ddl {
Expand Down Expand Up @@ -1304,7 +1397,7 @@ impl FunctionRegistry for SessionContext {
/// A planner used to add extensions to DataFusion logical and physical plans.
#[async_trait]
pub trait QueryPlanner {
/// Given a `LogicalPlan`, create an `ExecutionPlan` suitable for execution
/// Given a `LogicalPlan`, create an [`ExecutionPlan`] suitable for execution
async fn create_physical_plan(
&self,
logical_plan: &LogicalPlan,
Expand All @@ -1317,7 +1410,7 @@ struct DefaultQueryPlanner {}

#[async_trait]
impl QueryPlanner for DefaultQueryPlanner {
/// Given a `LogicalPlan`, create an `ExecutionPlan` suitable for execution
/// Given a `LogicalPlan`, create an [`ExecutionPlan`] suitable for execution
async fn create_physical_plan(
&self,
logical_plan: &LogicalPlan,
Expand Down Expand Up @@ -1628,7 +1721,8 @@ impl SessionState {
&mut self.table_factories
}

/// Convert a SQL string into an AST Statement
/// Parse an SQL string into an DataFusion specific AST
/// [`Statement`]. See [`SessionContext::sql`] for running queries.
pub fn sql_to_statement(
&self,
sql: &str,
Expand Down Expand Up @@ -1787,9 +1881,15 @@ impl SessionState {
query.statement_to_plan(statement)
}

/// Creates a [`LogicalPlan`] from the provided SQL string
/// Creates a [`LogicalPlan`] from the provided SQL string. This
/// interface will plan any SQL DataFusion supports, including DML
/// like `CREATE TABLE`, and `COPY` (which can write to local
/// files.
///
/// See [`SessionContext::sql`] for a higher-level interface that also handles DDL
/// See [`SessionContext::sql`] and
/// [`SessionContext::sql_with_options`] for a higher-level
/// interface that handles DDL and verification of allowed
/// statements.
pub async fn create_logical_plan(&self, sql: &str) -> Result<LogicalPlan> {
let dialect = self.config.options().sql_parser.dialect.as_str();
let statement = self.sql_to_statement(sql, dialect)?;
Expand Down Expand Up @@ -1870,7 +1970,11 @@ impl SessionState {

/// Creates a physical plan from a logical plan.
///
/// Note: this first calls [`Self::optimize`] on the provided plan
/// Note: this first calls [`Self::optimize`] on the provided
/// plan.
///
/// This function will error for [`LogicalPlan`]s such as catalog
/// DDL `CREATE TABLE` must be handled by another layer.
pub async fn create_physical_plan(
&self,
logical_plan: &LogicalPlan,
Expand Down Expand Up @@ -2095,6 +2199,92 @@ impl SerializerRegistry for EmptySerializerRegistry {
}
}

/// Describes which SQL statements can be run.
///
/// See [`SessionContext::sql_with_options`] for more details.
#[derive(Clone, Debug, Copy)]
pub struct SQLOptions {
/// See [`Self::with_allow_ddl`]
allow_ddl: bool,
/// See [`Self::with_allow_dml`]
allow_dml: bool,
/// See [`Self::with_allow_statements`]
allow_statements: bool,
}

impl Default for SQLOptions {
fn default() -> Self {
Self {
allow_ddl: true,
allow_dml: true,
allow_statements: true,
}
}
}

impl SQLOptions {
/// Create a new `SQLOptions` with default values
pub fn new() -> Self {
Default::default()
}

/// Should DML data modification commands (e.g. `INSERT and COPY`) be run? Defaults to `true`.
pub fn with_allow_ddl(mut self, allow: bool) -> Self {
self.allow_ddl = allow;
self
}

/// Should DML data modification commands (e.g. `INSERT and COPY`) be run? Defaults to `true`
pub fn with_allow_dml(mut self, allow: bool) -> Self {
self.allow_dml = allow;
self
}

/// Should Statements such as (e.g. `SET VARIABLE and `BEGIN TRANSACTION` ...`) be run?. Defaults to `true`
pub fn with_allow_statements(mut self, allow: bool) -> Self {
self.allow_statements = allow;
self
}

/// Return an error if the [`LogicalPlan`] has any nodes that are
/// incompatible with this [`SQLOptions`].
pub fn verify_plan(&self, plan: &LogicalPlan) -> Result<()> {
plan.visit(&mut BadPlanVisitor::new(self))?;
Ok(())
}
}

struct BadPlanVisitor<'a> {
options: &'a SQLOptions,
}
impl<'a> BadPlanVisitor<'a> {
fn new(options: &'a SQLOptions) -> Self {
Self { options }
}
}

impl<'a> TreeNodeVisitor for BadPlanVisitor<'a> {
type N = LogicalPlan;

fn pre_visit(&mut self, node: &Self::N) -> Result<VisitRecursion> {
match node {
LogicalPlan::Ddl(ddl) if !self.options.allow_ddl => {
plan_err!("DDL not supported: {}", ddl.name())
}
LogicalPlan::Dml(dml) if !self.options.allow_dml => {
plan_err!("DML not supported: {}", dml.op)
}
LogicalPlan::Copy(_) if !self.options.allow_dml => {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks so much @alamb for adding this to the core DF APIs. The addition of the Copy operator is a great example of why it belongs here: we already have a visitor that does similar filtering, but we would probably have failed to update it to include Copy when we migrate to the latest DF version.

plan_err!("DML not supported: COPY")
}
LogicalPlan::Statement(stmt) if !self.options.allow_statements => {
plan_err!("Statement not supported: {}", stmt.name())
}
_ => Ok(VisitRecursion::Continue),
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -2646,43 +2836,6 @@ mod tests {
Ok(())
}

#[tokio::test]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this into core_integration as it seemed like a test of the public API rather than an internal unit test

async fn unsupported_sql_returns_error() -> Result<()> {
let ctx = SessionContext::new();
ctx.register_table("test", test::table_with_sequence(1, 1).unwrap())
.unwrap();
let state = ctx.state();

// create view
let sql = "create view test_view as select * from test";
let plan = state.create_logical_plan(sql).await;
let physical_plan = state.create_physical_plan(&plan.unwrap()).await;
assert!(physical_plan.is_err());
assert_eq!(
format!("{}", physical_plan.unwrap_err()),
"This feature is not implemented: Unsupported logical plan: CreateView"
);
// // drop view
let sql = "drop view test_view";
let plan = state.create_logical_plan(sql).await;
let physical_plan = state.create_physical_plan(&plan.unwrap()).await;
assert!(physical_plan.is_err());
assert_eq!(
format!("{}", physical_plan.unwrap_err()),
"This feature is not implemented: Unsupported logical plan: DropView"
);
// // drop table
let sql = "drop table test";
let plan = state.create_logical_plan(sql).await;
let physical_plan = state.create_physical_plan(&plan.unwrap()).await;
assert!(physical_plan.is_err());
assert_eq!(
format!("{}", physical_plan.unwrap_err()),
"This feature is not implemented: Unsupported logical plan: DropTable"
);
Ok(())
}

struct MyPhysicalPlanner {}

#[async_trait]
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
//! ```

pub use crate::dataframe::DataFrame;
pub use crate::execution::context::{SessionConfig, SessionContext};
pub use crate::execution::context::{SQLOptions, SessionConfig, SessionContext};
pub use crate::execution::options::{
AvroReadOptions, CsvReadOptions, NdJsonReadOptions, ParquetReadOptions,
};
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/tests/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ pub mod projection;
pub mod references;
pub mod repartition;
pub mod select;
mod sql_api;
pub mod subqueries;
pub mod timestamp;
pub mod udf;
Expand Down
Loading