@@ -28,7 +28,11 @@ use crate::{
28
28
optimizer:: optimizer:: Optimizer ,
29
29
physical_optimizer:: optimizer:: { PhysicalOptimizer , PhysicalOptimizerRule } ,
30
30
} ;
31
- use datafusion_common:: { alias:: AliasGenerator , plan_err} ;
31
+ use datafusion_common:: {
32
+ alias:: AliasGenerator ,
33
+ plan_err,
34
+ tree_node:: { TreeNode , TreeNodeVisitor , VisitRecursion } ,
35
+ } ;
32
36
use datafusion_execution:: registry:: SerializerRegistry ;
33
37
use datafusion_expr:: {
34
38
logical_plan:: { DdlStatement , Statement } ,
@@ -163,35 +167,64 @@ where
163
167
/// * Register a custom data source that can be referenced from a SQL query.
164
168
/// * Execution a SQL query
165
169
///
170
+ /// # Example: DataFrame API
171
+ ///
166
172
/// The following example demonstrates how to use the context to execute a query against a CSV
167
173
/// data source using the DataFrame API:
168
174
///
169
175
/// ```
170
176
/// use datafusion::prelude::*;
171
- /// # use datafusion::error::Result;
177
+ /// # use datafusion::{ error::Result, assert_batches_eq} ;
172
178
/// # #[tokio::main]
173
179
/// # async fn main() -> Result<()> {
174
180
/// let ctx = SessionContext::new();
175
181
/// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
176
182
/// let df = df.filter(col("a").lt_eq(col("b")))?
177
183
/// .aggregate(vec![col("a")], vec![min(col("b"))])?
178
184
/// .limit(0, Some(100))?;
179
- /// let results = df.collect();
185
+ /// let results = df
186
+ /// .collect()
187
+ /// .await?;
188
+ /// assert_batches_eq!(
189
+ /// &[
190
+ /// "+---+----------------+",
191
+ /// "| a | MIN(?table?.b) |",
192
+ /// "+---+----------------+",
193
+ /// "| 1 | 2 |",
194
+ /// "+---+----------------+",
195
+ /// ],
196
+ /// &results
197
+ /// );
180
198
/// # Ok(())
181
199
/// # }
182
200
/// ```
183
201
///
202
+ /// # Example: SQL API
203
+ ///
184
204
/// The following example demonstrates how to execute the same query using SQL:
185
205
///
186
206
/// ```
187
207
/// use datafusion::prelude::*;
188
- ///
189
- /// # use datafusion::error::Result;
208
+ /// # use datafusion::{error::Result, assert_batches_eq};
190
209
/// # #[tokio::main]
191
210
/// # async fn main() -> Result<()> {
192
211
/// let mut ctx = SessionContext::new();
193
212
/// ctx.register_csv("example", "tests/data/example.csv", CsvReadOptions::new()).await?;
194
- /// let results = ctx.sql("SELECT a, MIN(b) FROM example GROUP BY a LIMIT 100").await?;
213
+ /// let results = ctx
214
+ /// .sql("SELECT a, MIN(b) FROM example GROUP BY a LIMIT 100")
215
+ /// .await?
216
+ /// .collect()
217
+ /// .await?;
218
+ /// assert_batches_eq!(
219
+ /// &[
220
+ /// "+---+----------------+",
221
+ /// "| a | MIN(example.b) |",
222
+ /// "+---+----------------+",
223
+ /// "| 1 | 2 |",
224
+ /// "+---+----------------+",
225
+ /// ],
226
+ /// &results
227
+ /// );
195
228
/// # Ok(())
196
229
/// # }
197
230
/// ```
@@ -342,22 +375,82 @@ impl SessionContext {
342
375
self . state . read ( ) . config . clone ( )
343
376
}
344
377
345
- /// Creates a [`DataFrame`] that will execute a SQL query.
378
+ /// Creates a [`DataFrame`] from SQL query text .
346
379
///
347
380
/// Note: This API implements DDL statements such as `CREATE TABLE` and
348
381
/// `CREATE VIEW` and DML statements such as `INSERT INTO` with in-memory
349
- /// default implementations.
382
+ /// default implementations. See [`Self::sql_with_options`].
383
+ ///
384
+ /// # Example: Running SQL queries
385
+ ///
386
+ /// See the example on [`Self`]
350
387
///
351
- /// If this is not desirable, consider using [`SessionState::create_logical_plan()`] which
352
- /// does not mutate the state based on such statements.
388
+ /// # Example: Creating a Table with SQL
389
+ ///
390
+ /// ```
391
+ /// use datafusion::prelude::*;
392
+ /// # use datafusion::{error::Result, assert_batches_eq};
393
+ /// # #[tokio::main]
394
+ /// # async fn main() -> Result<()> {
395
+ /// let mut ctx = SessionContext::new();
396
+ /// ctx
397
+ /// .sql("CREATE TABLE foo (x INTEGER)")
398
+ /// .await?
399
+ /// .collect()
400
+ /// .await?;
401
+ /// assert!(ctx.table_exist("foo").unwrap());
402
+ /// # Ok(())
403
+ /// # }
404
+ /// ```
353
405
pub async fn sql ( & self , sql : & str ) -> Result < DataFrame > {
354
- // create a query planner
406
+ self . sql_with_options ( sql, SQLOptions :: new ( ) ) . await
407
+ }
408
+
409
+ /// Creates a [`DataFrame`] from SQL query text, first validating
410
+ /// that the queries are allowed by `options`
411
+ ///
412
+ /// # Example: Preventing Creating a Table with SQL
413
+ ///
414
+ /// If you want to avoid creating tables, set [`SQLOptions`]
415
+ /// appropriately:
416
+ ///
417
+ /// ```
418
+ /// use datafusion::prelude::*;
419
+ /// # use datafusion::{error::Result};
420
+ /// # use datafusion::physical_plan::collect;
421
+ /// # #[tokio::main]
422
+ /// # async fn main() -> Result<()> {
423
+ /// let mut ctx = SessionContext::new();
424
+ /// let options = SQLOptions::new()
425
+ /// .with_allow_ddl(false);
426
+ /// let err = ctx.sql_with_options("CREATE TABLE foo (x INTEGER)", options)
427
+ /// .await
428
+ /// .unwrap_err();
429
+ /// assert_eq!(
430
+ /// err.to_string(),
431
+ /// "This feature is not implemented: Unsupported logical plan: CreateMemoryTable"
432
+ /// );
433
+ /// # Ok(())
434
+ /// # }
435
+ /// ```
436
+ pub async fn sql_with_options (
437
+ & self ,
438
+ sql : & str ,
439
+ options : SQLOptions ,
440
+ ) -> Result < DataFrame > {
355
441
let plan = self . state ( ) . create_logical_plan ( sql) . await ?;
442
+ options. verify_plan ( & plan) ?;
356
443
357
444
self . execute_logical_plan ( plan) . await
358
445
}
359
446
360
- /// Execute the [`LogicalPlan`], return a [`DataFrame`]
447
+ /// Execute the [`LogicalPlan`], return a [`DataFrame`]. This API
448
+ /// is not featured limited (so all SQL such as `CREATE TABLE` and
449
+ /// `COPY` will be run).
450
+ ///
451
+ /// If you wish to limit the type of plan that can be run from
452
+ /// SQL, see [`Self::sql_with_options`] and
453
+ /// [`SqlOptions::verify_plan`].
361
454
pub async fn execute_logical_plan ( & self , plan : LogicalPlan ) -> Result < DataFrame > {
362
455
match plan {
363
456
LogicalPlan :: Ddl ( ddl) => match ddl {
@@ -1304,7 +1397,7 @@ impl FunctionRegistry for SessionContext {
1304
1397
/// A planner used to add extensions to DataFusion logical and physical plans.
1305
1398
#[ async_trait]
1306
1399
pub trait QueryPlanner {
1307
- /// Given a `LogicalPlan`, create an `ExecutionPlan` suitable for execution
1400
+ /// Given a `LogicalPlan`, create an [ `ExecutionPlan`] suitable for execution
1308
1401
async fn create_physical_plan (
1309
1402
& self ,
1310
1403
logical_plan : & LogicalPlan ,
@@ -1317,7 +1410,7 @@ struct DefaultQueryPlanner {}
1317
1410
1318
1411
#[ async_trait]
1319
1412
impl QueryPlanner for DefaultQueryPlanner {
1320
- /// Given a `LogicalPlan`, create an `ExecutionPlan` suitable for execution
1413
+ /// Given a `LogicalPlan`, create an [ `ExecutionPlan`] suitable for execution
1321
1414
async fn create_physical_plan (
1322
1415
& self ,
1323
1416
logical_plan : & LogicalPlan ,
@@ -1628,7 +1721,8 @@ impl SessionState {
1628
1721
& mut self . table_factories
1629
1722
}
1630
1723
1631
- /// Convert a SQL string into an AST Statement
1724
+ /// Parse an SQL string into an DataFusion specific AST
1725
+ /// [`Statement`]. See [`SessionContext::sql`] for running queries.
1632
1726
pub fn sql_to_statement (
1633
1727
& self ,
1634
1728
sql : & str ,
@@ -1778,9 +1872,15 @@ impl SessionState {
1778
1872
query. statement_to_plan ( statement)
1779
1873
}
1780
1874
1781
- /// Creates a [`LogicalPlan`] from the provided SQL string
1875
+ /// Creates a [`LogicalPlan`] from the provided SQL string. This
1876
+ /// interface will plan any SQL DataFusion supports, including DML
1877
+ /// like `CREATE TABLE`, and `COPY` (which can write to local
1878
+ /// files.
1782
1879
///
1783
- /// See [`SessionContext::sql`] for a higher-level interface that also handles DDL
1880
+ /// See [`SessionContext::sql`] and
1881
+ /// [`SessionContext::sql_with_options`] for a higher-level
1882
+ /// interface that handles DDL and verification of allowed
1883
+ /// statements.
1784
1884
pub async fn create_logical_plan ( & self , sql : & str ) -> Result < LogicalPlan > {
1785
1885
let dialect = self . config . options ( ) . sql_parser . dialect . as_str ( ) ;
1786
1886
let statement = self . sql_to_statement ( sql, dialect) ?;
@@ -1861,7 +1961,11 @@ impl SessionState {
1861
1961
1862
1962
/// Creates a physical plan from a logical plan.
1863
1963
///
1864
- /// Note: this first calls [`Self::optimize`] on the provided plan
1964
+ /// Note: this first calls [`Self::optimize`] on the provided
1965
+ /// plan.
1966
+ ///
1967
+ /// This function will error for [`LogicalPlan`]s such as catalog
1968
+ /// DDL `CREATE TABLE` must be handled by another layer.
1865
1969
pub async fn create_physical_plan (
1866
1970
& self ,
1867
1971
logical_plan : & LogicalPlan ,
@@ -2086,6 +2190,92 @@ impl SerializerRegistry for EmptySerializerRegistry {
2086
2190
}
2087
2191
}
2088
2192
2193
+ /// Describes which SQL statements can be run.
2194
+ ///
2195
+ /// See [`SessionContext::sql_with_options`] for more details.
2196
+ #[ derive( Clone , Debug , Copy ) ]
2197
+ pub struct SQLOptions {
2198
+ /// See [`Self::with_allow_ddl`]
2199
+ allow_ddl : bool ,
2200
+ /// See [`Self::with_allow_dml`]
2201
+ allow_dml : bool ,
2202
+ /// See [`Self::with_allow_statements`]
2203
+ allow_statements : bool ,
2204
+ }
2205
+
2206
+ impl Default for SQLOptions {
2207
+ fn default ( ) -> Self {
2208
+ Self {
2209
+ allow_ddl : true ,
2210
+ allow_dml : true ,
2211
+ allow_statements : true ,
2212
+ }
2213
+ }
2214
+ }
2215
+
2216
+ impl SQLOptions {
2217
+ /// Create a new `SQLOptions` with default values
2218
+ pub fn new ( ) -> Self {
2219
+ Default :: default ( )
2220
+ }
2221
+
2222
+ /// Should DML data modification commands (e.g. `INSERT and COPY`) be run? Defaults to `true`.
2223
+ pub fn with_allow_ddl ( mut self , allow : bool ) -> Self {
2224
+ self . allow_ddl = allow;
2225
+ self
2226
+ }
2227
+
2228
+ /// Should DML data modification commands (e.g. `INSERT and COPY`) be run? Defaults to `true`
2229
+ pub fn with_allow_dml ( mut self , allow : bool ) -> Self {
2230
+ self . allow_dml = allow;
2231
+ self
2232
+ }
2233
+
2234
+ /// Should Statements such as (e.g. `SET VARIABLE and `BEGIN TRANSACTION` ...`) be run?. Defaults to `true`
2235
+ pub fn with_allow_statements ( mut self , allow : bool ) -> Self {
2236
+ self . allow_statements = allow;
2237
+ self
2238
+ }
2239
+
2240
+ /// Return an error if the [`LogicalPlan`] has any nodes that are
2241
+ /// incompatible with this [`SQLOptions`].
2242
+ fn verify_plan ( & self , plan : & LogicalPlan ) -> Result < ( ) > {
2243
+ plan. visit ( & mut BadPlanVisitor :: new ( self ) ) ?;
2244
+ Ok ( ( ) )
2245
+ }
2246
+ }
2247
+
2248
+ struct BadPlanVisitor < ' a > {
2249
+ options : & ' a SQLOptions ,
2250
+ }
2251
+ impl < ' a > BadPlanVisitor < ' a > {
2252
+ fn new ( options : & ' a SQLOptions ) -> Self {
2253
+ Self { options }
2254
+ }
2255
+ }
2256
+
2257
+ impl < ' a > TreeNodeVisitor for BadPlanVisitor < ' a > {
2258
+ type N = LogicalPlan ;
2259
+
2260
+ fn pre_visit ( & mut self , node : & Self :: N ) -> Result < VisitRecursion > {
2261
+ match node {
2262
+ LogicalPlan :: Ddl ( ddl) if !self . options . allow_ddl => {
2263
+ plan_err ! ( "DDL not supported: {}" , ddl. name( ) )
2264
+ }
2265
+ LogicalPlan :: Dml ( dml) if !self . options . allow_dml => {
2266
+ plan_err ! ( "DML not supported: {}" , dml. op)
2267
+ }
2268
+ LogicalPlan :: Copy ( _) if !self . options . allow_dml => {
2269
+ plan_err ! ( "DML not supported: COPY" )
2270
+ }
2271
+ LogicalPlan :: Statement ( stmt) if !self . options . allow_statements => {
2272
+ plan_err ! ( "Statement not supported: {}" , stmt. name( ) )
2273
+ }
2274
+ _ => Ok ( VisitRecursion :: Continue ) ,
2275
+ }
2276
+ }
2277
+ }
2278
+
2089
2279
#[ cfg( test) ]
2090
2280
mod tests {
2091
2281
use super :: * ;
@@ -2637,42 +2827,6 @@ mod tests {
2637
2827
Ok ( ( ) )
2638
2828
}
2639
2829
2640
- #[ tokio:: test]
2641
- async fn unsupported_sql_returns_error ( ) -> Result < ( ) > {
2642
- let ctx = SessionContext :: new ( ) ;
2643
- ctx. register_table ( "test" , test:: table_with_sequence ( 1 , 1 ) . unwrap ( ) )
2644
- . unwrap ( ) ;
2645
- let state = ctx. state ( ) ;
2646
-
2647
- // create view
2648
- let sql = "create view test_view as select * from test" ;
2649
- let plan = state. create_logical_plan ( sql) . await ;
2650
- let physical_plan = state. create_physical_plan ( & plan. unwrap ( ) ) . await ;
2651
- assert ! ( physical_plan. is_err( ) ) ;
2652
- assert_eq ! (
2653
- format!( "{}" , physical_plan. unwrap_err( ) ) ,
2654
- "This feature is not implemented: Unsupported logical plan: CreateView"
2655
- ) ;
2656
- // // drop view
2657
- let sql = "drop view test_view" ;
2658
- let plan = state. create_logical_plan ( sql) . await ;
2659
- let physical_plan = state. create_physical_plan ( & plan. unwrap ( ) ) . await ;
2660
- assert ! ( physical_plan. is_err( ) ) ;
2661
- assert_eq ! (
2662
- format!( "{}" , physical_plan. unwrap_err( ) ) ,
2663
- "This feature is not implemented: Unsupported logical plan: DropView"
2664
- ) ;
2665
- // // drop table
2666
- let sql = "drop table test" ;
2667
- let plan = state. create_logical_plan ( sql) . await ;
2668
- let physical_plan = state. create_physical_plan ( & plan. unwrap ( ) ) . await ;
2669
- assert ! ( physical_plan. is_err( ) ) ;
2670
- assert_eq ! (
2671
- format!( "{}" , physical_plan. unwrap_err( ) ) ,
2672
- "This feature is not implemented: Unsupported logical plan: DropTable"
2673
- ) ;
2674
- Ok ( ( ) )
2675
- }
2676
2830
2677
2831
struct MyPhysicalPlanner { }
2678
2832
0 commit comments