Skip to content

Commit a3d3f3a

Browse files
committed
Async catalog support (#3777)
1 parent 8c539b7 commit a3d3f3a

31 files changed

+300
-158
lines changed

Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,3 +50,6 @@ opt-level = 3
5050
overflow-checks = false
5151
panic = 'unwind'
5252
rpath = false
53+
54+
[patch.crates-io]
55+
sqlparser = { git = "https://github.com/tustvold/sqlparser-rs.git", rev = "a3cd766d11845fcaa9ece9c1b048cc8463d92252" }

datafusion-cli/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,3 +39,6 @@ object_store = { version = "0.5.0", features = ["aws", "gcp"] }
3939
rustyline = "10.0"
4040
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "parking_lot"] }
4141
url = "2.2"
42+
43+
[patch.crates-io]
44+
sqlparser = { git = "https://github.com/tustvold/sqlparser-rs.git", rev = "a3cd766d11845fcaa9ece9c1b048cc8463d92252" }

datafusion/common/src/table_reference.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ pub enum TableReference<'a> {
5454

5555
/// Represents a path to a table that may require further resolution
5656
/// that owns the underlying names
57-
#[derive(Debug, Clone)]
57+
#[derive(Debug, Clone, Hash, PartialEq)]
5858
pub enum OwnedTableReference {
5959
/// An unqualified table reference, e.g. "table"
6060
Bare {

datafusion/core/src/catalog/information_schema.rs

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use arrow::{
2929
datatypes::{DataType, Field, Schema, SchemaRef},
3030
record_batch::RecordBatch,
3131
};
32+
use async_trait::async_trait;
3233
use parking_lot::RwLock;
3334

3435
use datafusion_common::Result;
@@ -45,12 +46,16 @@ use super::{
4546
schema::SchemaProvider,
4647
};
4748

48-
const INFORMATION_SCHEMA: &str = "information_schema";
49+
/// The name of the information schema
50+
pub const INFORMATION_SCHEMA: &str = "information_schema";
4951
const TABLES: &str = "tables";
5052
const VIEWS: &str = "views";
5153
const COLUMNS: &str = "columns";
5254
const DF_SETTINGS: &str = "df_settings";
5355

56+
/// All information schema tables
57+
pub const INFORMATION_SCHEMA_TABLES: &[&str] = &[TABLES, VIEWS, COLUMNS, DF_SETTINGS];
58+
5459
/// Wraps another [`CatalogProvider`] and adds a "information_schema"
5560
/// schema that can introspect on tables in the catalog_list
5661
pub(crate) struct CatalogWithInformationSchema {
@@ -132,7 +137,7 @@ struct InformationSchemaConfig {
132137

133138
impl InformationSchemaConfig {
134139
/// Construct the `information_schema.tables` virtual table
135-
fn make_tables(&self, builder: &mut InformationSchemaTablesBuilder) {
140+
async fn make_tables(&self, builder: &mut InformationSchemaTablesBuilder) {
136141
// create a mem table with the names of tables
137142

138143
for catalog_name in self.catalog_list.catalog_names() {
@@ -142,7 +147,7 @@ impl InformationSchemaConfig {
142147
if schema_name != INFORMATION_SCHEMA {
143148
let schema = catalog.schema(&schema_name).unwrap();
144149
for table_name in schema.table_names() {
145-
let table = schema.table(&table_name).unwrap();
150+
let table = schema.table(&table_name).await.unwrap();
146151
builder.add_table(
147152
&catalog_name,
148153
&schema_name,
@@ -171,15 +176,15 @@ impl InformationSchemaConfig {
171176
}
172177
}
173178

174-
fn make_views(&self, builder: &mut InformationSchemaViewBuilder) {
179+
async fn make_views(&self, builder: &mut InformationSchemaViewBuilder) {
175180
for catalog_name in self.catalog_list.catalog_names() {
176181
let catalog = self.catalog_list.catalog(&catalog_name).unwrap();
177182

178183
for schema_name in catalog.schema_names() {
179184
if schema_name != INFORMATION_SCHEMA {
180185
let schema = catalog.schema(&schema_name).unwrap();
181186
for table_name in schema.table_names() {
182-
let table = schema.table(&table_name).unwrap();
187+
let table = schema.table(&table_name).await.unwrap();
183188
builder.add_view(
184189
&catalog_name,
185190
&schema_name,
@@ -193,15 +198,15 @@ impl InformationSchemaConfig {
193198
}
194199

195200
/// Construct the `information_schema.columns` virtual table
196-
fn make_columns(&self, builder: &mut InformationSchemaColumnsBuilder) {
201+
async fn make_columns(&self, builder: &mut InformationSchemaColumnsBuilder) {
197202
for catalog_name in self.catalog_list.catalog_names() {
198203
let catalog = self.catalog_list.catalog(&catalog_name).unwrap();
199204

200205
for schema_name in catalog.schema_names() {
201206
if schema_name != INFORMATION_SCHEMA {
202207
let schema = catalog.schema(&schema_name).unwrap();
203208
for table_name in schema.table_names() {
204-
let table = schema.table(&table_name).unwrap();
209+
let table = schema.table(&table_name).await.unwrap();
205210
for (i, field) in table.schema().fields().iter().enumerate() {
206211
builder.add_column(
207212
&catalog_name,
@@ -227,6 +232,7 @@ impl InformationSchemaConfig {
227232
}
228233
}
229234

235+
#[async_trait]
230236
impl SchemaProvider for InformationSchemaProvider {
231237
fn as_any(&self) -> &(dyn Any + 'static) {
232238
self
@@ -241,7 +247,7 @@ impl SchemaProvider for InformationSchemaProvider {
241247
]
242248
}
243249

244-
fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
250+
async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
245251
let config = self.config.clone();
246252
let table: Arc<dyn PartitionStream> = if name.eq_ignore_ascii_case("tables") {
247253
Arc::new(InformationSchemaTables::new(config))
@@ -305,7 +311,7 @@ impl PartitionStream for InformationSchemaTables {
305311
self.schema.clone(),
306312
// TODO: Stream this
307313
futures::stream::once(async move {
308-
config.make_tables(&mut builder);
314+
config.make_tables(&mut builder).await;
309315
Ok(builder.finish())
310316
}),
311317
))
@@ -396,7 +402,7 @@ impl PartitionStream for InformationSchemaViews {
396402
self.schema.clone(),
397403
// TODO: Stream this
398404
futures::stream::once(async move {
399-
config.make_views(&mut builder);
405+
config.make_views(&mut builder).await;
400406
Ok(builder.finish())
401407
}),
402408
))
@@ -510,7 +516,7 @@ impl PartitionStream for InformationSchemaColumns {
510516
self.schema.clone(),
511517
// TODO: Stream this
512518
futures::stream::once(async move {
513-
config.make_columns(&mut builder);
519+
config.make_columns(&mut builder).await;
514520
Ok(builder.finish())
515521
}),
516522
))

datafusion/core/src/catalog/listing_schema.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use std::any::Any;
2929
use std::collections::{HashMap, HashSet};
3030
use std::path::Path;
3131
use std::sync::{Arc, Mutex};
32+
use async_trait::async_trait;
3233

3334
/// A `SchemaProvider` that scans an `ObjectStore` to automatically discover tables
3435
///
@@ -148,6 +149,7 @@ impl ListingSchemaProvider {
148149
}
149150
}
150151

152+
#[async_trait]
151153
impl SchemaProvider for ListingSchemaProvider {
152154
fn as_any(&self) -> &dyn Any {
153155
self
@@ -162,7 +164,7 @@ impl SchemaProvider for ListingSchemaProvider {
162164
.collect()
163165
}
164166

165-
fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
167+
async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
166168
self.tables
167169
.lock()
168170
.expect("Can't lock tables")

datafusion/core/src/catalog/schema.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,13 @@
2121
use dashmap::DashMap;
2222
use std::any::Any;
2323
use std::sync::Arc;
24+
use async_trait::async_trait;
2425

2526
use crate::datasource::TableProvider;
2627
use crate::error::{DataFusionError, Result};
2728

2829
/// Represents a schema, comprising a number of named tables.
30+
#[async_trait]
2931
pub trait SchemaProvider: Sync + Send {
3032
/// Returns the schema provider as [`Any`](std::any::Any)
3133
/// so that it can be downcast to a specific implementation.
@@ -35,7 +37,7 @@ pub trait SchemaProvider: Sync + Send {
3537
fn table_names(&self) -> Vec<String>;
3638

3739
/// Retrieves a specific table from the schema by name, provided it exists.
38-
fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>>;
40+
async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>>;
3941

4042
/// If supported by the implementation, adds a new table to this schema.
4143
/// If a table of the same name existed before, it returns "Table already exists" error.
@@ -85,6 +87,7 @@ impl Default for MemorySchemaProvider {
8587
}
8688
}
8789

90+
#[async_trait]
8891
impl SchemaProvider for MemorySchemaProvider {
8992
fn as_any(&self) -> &dyn Any {
9093
self
@@ -97,7 +100,7 @@ impl SchemaProvider for MemorySchemaProvider {
97100
.collect()
98101
}
99102

100-
fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
103+
async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
101104
self.tables.get(name).map(|table| table.value().clone())
102105
}
103106

datafusion/core/src/dataframe.rs

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -921,7 +921,7 @@ mod tests {
921921
let ctx = SessionContext::new();
922922
ctx.register_batch("t", batch)?;
923923

924-
let df = ctx.table("t")?.select_columns(&["f.c1"])?;
924+
let df = ctx.table("t").await?.select_columns(&["f.c1"])?;
925925

926926
let df_results = df.collect().await?;
927927

@@ -1040,16 +1040,17 @@ mod tests {
10401040
));
10411041

10421042
// build query with a UDF using DataFrame API
1043-
let df = ctx.table("aggregate_test_100")?;
1043+
let df = ctx.table("aggregate_test_100").await?;
10441044

10451045
let f = df.registry();
10461046

10471047
let df = df.select(vec![f.udf("my_fn")?.call(vec![col("c12")])])?;
10481048
let plan = df.plan.clone();
10491049

10501050
// build query using SQL
1051-
let sql_plan =
1052-
ctx.create_logical_plan("SELECT my_fn(c12) FROM aggregate_test_100")?;
1051+
let sql_plan = ctx
1052+
.create_logical_plan("SELECT my_fn(c12) FROM aggregate_test_100")
1053+
.await?;
10531054

10541055
// the two plans should be identical
10551056
assert_same_plan(&plan, &sql_plan);
@@ -1106,7 +1107,7 @@ mod tests {
11061107
ctx.register_table("test_table", df_impl.clone())?;
11071108

11081109
// pull the table out
1109-
let table = ctx.table("test_table")?;
1110+
let table = ctx.table("test_table").await?;
11101111

11111112
let group_expr = vec![col("c1")];
11121113
let aggr_expr = vec![sum(col("c12"))];
@@ -1160,13 +1161,13 @@ mod tests {
11601161
async fn create_plan(sql: &str) -> Result<LogicalPlan> {
11611162
let mut ctx = SessionContext::new();
11621163
register_aggregate_csv(&mut ctx, "aggregate_test_100").await?;
1163-
ctx.create_logical_plan(sql)
1164+
ctx.create_logical_plan(sql).await
11641165
}
11651166

11661167
async fn test_table_with_name(name: &str) -> Result<Arc<DataFrame>> {
11671168
let mut ctx = SessionContext::new();
11681169
register_aggregate_csv(&mut ctx, name).await?;
1169-
ctx.table(name)
1170+
ctx.table(name).await
11701171
}
11711172

11721173
async fn test_table() -> Result<Arc<DataFrame>> {
@@ -1300,8 +1301,15 @@ mod tests {
13001301
ctx.register_table("t1", df.clone())?;
13011302
ctx.register_table("t2", df)?;
13021303
let df = ctx
1303-
.table("t1")?
1304-
.join(ctx.table("t2")?, JoinType::Inner, &["c1"], &["c1"], None)?
1304+
.table("t1")
1305+
.await?
1306+
.join(
1307+
ctx.table("t2").await?,
1308+
JoinType::Inner,
1309+
&["c1"],
1310+
&["c1"],
1311+
None,
1312+
)?
13051313
.sort(vec![
13061314
// make the test deterministic
13071315
col("t1.c1").sort(true, true),
@@ -1378,10 +1386,11 @@ mod tests {
13781386
)
13791387
.await?;
13801388

1381-
ctx.register_table("t1", ctx.table("test")?)?;
1389+
ctx.register_table("t1", ctx.table("test").await?)?;
13821390

13831391
let df = ctx
1384-
.table("t1")?
1392+
.table("t1")
1393+
.await?
13851394
.filter(col("id").eq(lit(1)))?
13861395
.select_columns(&["bool_col", "int_col"])?;
13871396

@@ -1462,7 +1471,8 @@ mod tests {
14621471
ctx.register_batch("t", batch)?;
14631472

14641473
let df = ctx
1465-
.table("t")?
1474+
.table("t")
1475+
.await?
14661476
// try and create a column with a '.' in it
14671477
.with_column("f.c2", lit("hello"))?;
14681478

datafusion/core/src/datasource/view.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -431,12 +431,13 @@ mod tests {
431431
)
432432
.await?;
433433

434-
ctx.register_table("t1", ctx.table("test")?)?;
434+
ctx.register_table("t1", ctx.table("test").await?)?;
435435

436436
ctx.sql("CREATE VIEW t2 as SELECT * FROM t1").await?;
437437

438438
let df = ctx
439-
.table("t2")?
439+
.table("t2")
440+
.await?
440441
.filter(col("id").eq(lit(1)))?
441442
.select_columns(&["bool_col", "int_col"])?;
442443

@@ -460,12 +461,13 @@ mod tests {
460461
)
461462
.await?;
462463

463-
ctx.register_table("t1", ctx.table("test")?)?;
464+
ctx.register_table("t1", ctx.table("test").await?)?;
464465

465466
ctx.sql("CREATE VIEW t2 as SELECT * FROM t1").await?;
466467

467468
let df = ctx
468-
.table("t2")?
469+
.table("t2")
470+
.await?
469471
.limit(0, Some(10))?
470472
.select_columns(&["bool_col", "int_col"])?;
471473

0 commit comments

Comments
 (0)