Skip to content

Commit cf10913

Browse files
committed
Make SchemaProvider async (#3777)
1 parent ece75a4 commit cf10913

File tree

22 files changed

+212
-160
lines changed

22 files changed

+212
-160
lines changed

datafusion-cli/Cargo.lock

Lines changed: 12 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion-examples/examples/dataframe_in_memory.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ async fn main() -> Result<()> {
4747

4848
// declare a table in memory. In spark API, this corresponds to createDataFrame(...).
4949
ctx.register_batch("t", batch)?;
50-
let df = ctx.table("t")?;
50+
let df = ctx.table("t").await?;
5151

5252
// construct an expression corresponding to "SELECT a, b FROM t WHERE b = 10" in SQL
5353
let filter = col("b").eq(lit(10));

datafusion-examples/examples/simple_udaf.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ async fn main() -> Result<()> {
179179

180180
// get a DataFrame from the context
181181
// this table has 1 column `a` f32 with values {2,4,8,64}, whose geometric mean is 8.0.
182-
let df = ctx.table("t")?;
182+
let df = ctx.table("t").await?;
183183

184184
// perform the aggregation
185185
let df = df.aggregate(vec![], vec![geometric_mean.call(vec![col("a")])])?;

datafusion-examples/examples/simple_udf.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ async fn main() -> Result<()> {
122122
let expr = pow.call(vec![col("a"), col("b")]);
123123

124124
// get a DataFrame from the context
125-
let df = ctx.table("t")?;
125+
let df = ctx.table("t").await?;
126126

127127
// if we do not have `pow` in the scope and we registered it, we can get it from the registry
128128
let pow = df.registry().udf("pow")?;

datafusion/common/src/table_reference.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,10 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use std::fmt::{Display, Formatter};
19+
1820
/// A resolved path to a table of the form "catalog.schema.table"
19-
#[derive(Debug, Clone, Copy)]
21+
#[derive(Debug, Clone, Copy, PartialEq, Hash)]
2022
pub struct ResolvedTableReference<'a> {
2123
/// The catalog (aka database) containing the table
2224
pub catalog: &'a str,
@@ -26,6 +28,12 @@ pub struct ResolvedTableReference<'a> {
2628
pub table: &'a str,
2729
}
2830

31+
impl<'a> Display for ResolvedTableReference<'a> {
32+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
33+
write!(f, "{}.{}.{}", self.catalog, self.schema, self.table)
34+
}
35+
}
36+
2937
/// Represents a path to a table that may require further resolution
3038
#[derive(Debug, Clone, Copy)]
3139
pub enum TableReference<'a> {

datafusion/core/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ pyo3 = { version = "0.17.1", optional = true }
9090
rand = "0.8"
9191
rayon = { version = "1.5", optional = true }
9292
smallvec = { version = "1.6", features = ["union"] }
93-
sqlparser = "0.29"
93+
sqlparser = { version = "0.29", features = ["visitor"] }
9494
tempfile = "3"
9595
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "fs", "parking_lot"] }
9696
tokio-stream = "0.1"

datafusion/core/src/catalog/information_schema.rs

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
//!
2020
//! Information Schema]<https://en.wikipedia.org/wiki/Information_schema>
2121
22+
use async_trait::async_trait;
2223
use std::{any::Any, sync::Arc};
2324

2425
use arrow::{
@@ -43,6 +44,9 @@ pub const VIEWS: &str = "views";
4344
pub const COLUMNS: &str = "columns";
4445
pub const DF_SETTINGS: &str = "df_settings";
4546

47+
/// All information schema tables
48+
pub const INFORMATION_SCHEMA_TABLES: &[&str] = &[TABLES, VIEWS, COLUMNS, DF_SETTINGS];
49+
4650
/// Implements the `information_schema` virtual schema and tables
4751
///
4852
/// The underlying tables in the `information_schema` are created on
@@ -69,7 +73,7 @@ struct InformationSchemaConfig {
6973

7074
impl InformationSchemaConfig {
7175
/// Construct the `information_schema.tables` virtual table
72-
fn make_tables(&self, builder: &mut InformationSchemaTablesBuilder) {
76+
async fn make_tables(&self, builder: &mut InformationSchemaTablesBuilder) {
7377
// create a mem table with the names of tables
7478

7579
for catalog_name in self.catalog_list.catalog_names() {
@@ -79,7 +83,7 @@ impl InformationSchemaConfig {
7983
if schema_name != INFORMATION_SCHEMA {
8084
let schema = catalog.schema(&schema_name).unwrap();
8185
for table_name in schema.table_names() {
82-
let table = schema.table(&table_name).unwrap();
86+
let table = schema.table(&table_name).await.unwrap();
8387
builder.add_table(
8488
&catalog_name,
8589
&schema_name,
@@ -108,15 +112,15 @@ impl InformationSchemaConfig {
108112
}
109113
}
110114

111-
fn make_views(&self, builder: &mut InformationSchemaViewBuilder) {
115+
async fn make_views(&self, builder: &mut InformationSchemaViewBuilder) {
112116
for catalog_name in self.catalog_list.catalog_names() {
113117
let catalog = self.catalog_list.catalog(&catalog_name).unwrap();
114118

115119
for schema_name in catalog.schema_names() {
116120
if schema_name != INFORMATION_SCHEMA {
117121
let schema = catalog.schema(&schema_name).unwrap();
118122
for table_name in schema.table_names() {
119-
let table = schema.table(&table_name).unwrap();
123+
let table = schema.table(&table_name).await.unwrap();
120124
builder.add_view(
121125
&catalog_name,
122126
&schema_name,
@@ -130,15 +134,15 @@ impl InformationSchemaConfig {
130134
}
131135

132136
/// Construct the `information_schema.columns` virtual table
133-
fn make_columns(&self, builder: &mut InformationSchemaColumnsBuilder) {
137+
async fn make_columns(&self, builder: &mut InformationSchemaColumnsBuilder) {
134138
for catalog_name in self.catalog_list.catalog_names() {
135139
let catalog = self.catalog_list.catalog(&catalog_name).unwrap();
136140

137141
for schema_name in catalog.schema_names() {
138142
if schema_name != INFORMATION_SCHEMA {
139143
let schema = catalog.schema(&schema_name).unwrap();
140144
for table_name in schema.table_names() {
141-
let table = schema.table(&table_name).unwrap();
145+
let table = schema.table(&table_name).await.unwrap();
142146
for (i, field) in table.schema().fields().iter().enumerate() {
143147
builder.add_column(
144148
&catalog_name,
@@ -168,6 +172,7 @@ impl InformationSchemaConfig {
168172
}
169173
}
170174

175+
#[async_trait]
171176
impl SchemaProvider for InformationSchemaProvider {
172177
fn as_any(&self) -> &(dyn Any + 'static) {
173178
self
@@ -182,7 +187,7 @@ impl SchemaProvider for InformationSchemaProvider {
182187
]
183188
}
184189

185-
fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
190+
async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
186191
let config = self.config.clone();
187192
let table: Arc<dyn PartitionStream> = if name.eq_ignore_ascii_case("tables") {
188193
Arc::new(InformationSchemaTables::new(config))
@@ -246,7 +251,7 @@ impl PartitionStream for InformationSchemaTables {
246251
self.schema.clone(),
247252
// TODO: Stream this
248253
futures::stream::once(async move {
249-
config.make_tables(&mut builder);
254+
config.make_tables(&mut builder).await;
250255
Ok(builder.finish())
251256
}),
252257
))
@@ -337,7 +342,7 @@ impl PartitionStream for InformationSchemaViews {
337342
self.schema.clone(),
338343
// TODO: Stream this
339344
futures::stream::once(async move {
340-
config.make_views(&mut builder);
345+
config.make_views(&mut builder).await;
341346
Ok(builder.finish())
342347
}),
343348
))
@@ -451,7 +456,7 @@ impl PartitionStream for InformationSchemaColumns {
451456
self.schema.clone(),
452457
// TODO: Stream this
453458
futures::stream::once(async move {
454-
config.make_columns(&mut builder);
459+
config.make_columns(&mut builder).await;
455460
Ok(builder.finish())
456461
}),
457462
))

datafusion/core/src/catalog/listing_schema.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use crate::catalog::schema::SchemaProvider;
2020
use crate::datasource::datasource::TableProviderFactory;
2121
use crate::datasource::TableProvider;
2222
use crate::execution::context::SessionState;
23+
use async_trait::async_trait;
2324
use datafusion_common::parsers::CompressionTypeVariant;
2425
use datafusion_common::{DFSchema, DataFusionError, OwnedTableReference};
2526
use datafusion_expr::CreateExternalTable;
@@ -149,6 +150,7 @@ impl ListingSchemaProvider {
149150
}
150151
}
151152

153+
#[async_trait]
152154
impl SchemaProvider for ListingSchemaProvider {
153155
fn as_any(&self) -> &dyn Any {
154156
self
@@ -163,7 +165,7 @@ impl SchemaProvider for ListingSchemaProvider {
163165
.collect()
164166
}
165167

166-
fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
168+
async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
167169
self.tables
168170
.lock()
169171
.expect("Can't lock tables")

datafusion/core/src/catalog/schema.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
//! Describes the interface and built-in implementations of schemas,
1919
//! representing collections of named tables.
2020
21+
use async_trait::async_trait;
2122
use dashmap::DashMap;
2223
use std::any::Any;
2324
use std::sync::Arc;
@@ -26,6 +27,7 @@ use crate::datasource::TableProvider;
2627
use crate::error::{DataFusionError, Result};
2728

2829
/// Represents a schema, comprising a number of named tables.
30+
#[async_trait]
2931
pub trait SchemaProvider: Sync + Send {
3032
/// Returns the schema provider as [`Any`](std::any::Any)
3133
/// so that it can be downcast to a specific implementation.
@@ -35,7 +37,7 @@ pub trait SchemaProvider: Sync + Send {
3537
fn table_names(&self) -> Vec<String>;
3638

3739
/// Retrieves a specific table from the schema by name, provided it exists.
38-
fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>>;
40+
async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>>;
3941

4042
/// If supported by the implementation, adds a new table to this schema.
4143
/// If a table of the same name existed before, it returns "Table already exists" error.
@@ -85,6 +87,7 @@ impl Default for MemorySchemaProvider {
8587
}
8688
}
8789

90+
#[async_trait]
8891
impl SchemaProvider for MemorySchemaProvider {
8992
fn as_any(&self) -> &dyn Any {
9093
self
@@ -97,7 +100,7 @@ impl SchemaProvider for MemorySchemaProvider {
97100
.collect()
98101
}
99102

100-
fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
103+
async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
101104
self.tables.get(name).map(|table| table.value().clone())
102105
}
103106

datafusion/core/src/dataframe.rs

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -917,7 +917,7 @@ mod tests {
917917
let ctx = SessionContext::new();
918918
ctx.register_batch("t", batch)?;
919919

920-
let df = ctx.table("t")?.select_columns(&["f.c1"])?;
920+
let df = ctx.table("t").await?.select_columns(&["f.c1"])?;
921921

922922
let df_results = df.collect().await?;
923923

@@ -1036,7 +1036,7 @@ mod tests {
10361036
));
10371037

10381038
// build query with a UDF using DataFrame API
1039-
let df = ctx.table("aggregate_test_100")?;
1039+
let df = ctx.table("aggregate_test_100").await?;
10401040

10411041
let expr = df.registry().udf("my_fn")?.call(vec![col("c12")]);
10421042
let df = df.select(vec![expr])?;
@@ -1101,7 +1101,7 @@ mod tests {
11011101
ctx.register_table("test_table", Arc::new(df_impl.clone()))?;
11021102

11031103
// pull the table out
1104-
let table = ctx.table("test_table")?;
1104+
let table = ctx.table("test_table").await?;
11051105

11061106
let group_expr = vec![col("c1")];
11071107
let aggr_expr = vec![sum(col("c12"))];
@@ -1161,7 +1161,7 @@ mod tests {
11611161
async fn test_table_with_name(name: &str) -> Result<DataFrame> {
11621162
let mut ctx = SessionContext::new();
11631163
register_aggregate_csv(&mut ctx, name).await?;
1164-
ctx.table(name)
1164+
ctx.table(name).await
11651165
}
11661166

11671167
async fn test_table() -> Result<DataFrame> {
@@ -1301,8 +1301,15 @@ mod tests {
13011301
ctx.register_table("t1", table.clone())?;
13021302
ctx.register_table("t2", table)?;
13031303
let df = ctx
1304-
.table("t1")?
1305-
.join(ctx.table("t2")?, JoinType::Inner, &["c1"], &["c1"], None)?
1304+
.table("t1")
1305+
.await?
1306+
.join(
1307+
ctx.table("t2").await?,
1308+
JoinType::Inner,
1309+
&["c1"],
1310+
&["c1"],
1311+
None,
1312+
)?
13061313
.sort(vec![
13071314
// make the test deterministic
13081315
col("t1.c1").sort(true, true),
@@ -1379,10 +1386,11 @@ mod tests {
13791386
)
13801387
.await?;
13811388

1382-
ctx.register_table("t1", Arc::new(ctx.table("test")?))?;
1389+
ctx.register_table("t1", Arc::new(ctx.table("test").await?))?;
13831390

13841391
let df = ctx
1385-
.table("t1")?
1392+
.table("t1")
1393+
.await?
13861394
.filter(col("id").eq(lit(1)))?
13871395
.select_columns(&["bool_col", "int_col"])?;
13881396

@@ -1463,7 +1471,8 @@ mod tests {
14631471
ctx.register_batch("t", batch)?;
14641472

14651473
let df = ctx
1466-
.table("t")?
1474+
.table("t")
1475+
.await?
14671476
// try and create a column with a '.' in it
14681477
.with_column("f.c2", lit("hello"))?;
14691478

datafusion/core/src/datasource/view.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -428,12 +428,13 @@ mod tests {
428428
)
429429
.await?;
430430

431-
ctx.register_table("t1", Arc::new(ctx.table("test")?))?;
431+
ctx.register_table("t1", Arc::new(ctx.table("test").await?))?;
432432

433433
ctx.sql("CREATE VIEW t2 as SELECT * FROM t1").await?;
434434

435435
let df = ctx
436-
.table("t2")?
436+
.table("t2")
437+
.await?
437438
.filter(col("id").eq(lit(1)))?
438439
.select_columns(&["bool_col", "int_col"])?;
439440

@@ -457,12 +458,13 @@ mod tests {
457458
)
458459
.await?;
459460

460-
ctx.register_table("t1", Arc::new(ctx.table("test")?))?;
461+
ctx.register_table("t1", Arc::new(ctx.table("test").await?))?;
461462

462463
ctx.sql("CREATE VIEW t2 as SELECT * FROM t1").await?;
463464

464465
let df = ctx
465-
.table("t2")?
466+
.table("t2")
467+
.await?
466468
.limit(0, Some(10))?
467469
.select_columns(&["bool_col", "int_col"])?;
468470

0 commit comments

Comments
 (0)