Skip to content

Commit a4796fa

Browse files
authored
Minor: Move MemoryCatalog*Provider into a module, improve comments (#11183)
* Minor: Move MemoryCatalog*Provider into a module, improve comments * fix docs
1 parent 1840ab5 commit a4796fa

File tree

6 files changed

+375
-331
lines changed

6 files changed

+375
-331
lines changed

datafusion/core/src/catalog/information_schema.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
//! Implements the SQL [Information Schema] for DataFusion.
18+
//! [`InformationSchemaProvider`] that implements the SQL [Information Schema] for DataFusion.
1919
//!
2020
//! [Information Schema]: https://en.wikipedia.org/wiki/Information_schema
2121

datafusion/core/src/catalog/listing_schema.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
//! listing_schema contains a SchemaProvider that scans ObjectStores for tables automatically
18+
//! [`ListingSchemaProvider`]: [`SchemaProvider`] that scans ObjectStores for tables automatically
1919
2020
use std::any::Any;
2121
use std::collections::{HashMap, HashSet};

datafusion/core/src/catalog/memory.rs

Lines changed: 352 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,352 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! [`MemoryCatalogProvider`], [`MemoryCatalogProviderList`]: In-memory
19+
//! implementations of [`CatalogProviderList`] and [`CatalogProvider`].
20+
21+
use crate::catalog::schema::SchemaProvider;
22+
use crate::catalog::{CatalogProvider, CatalogProviderList};
23+
use crate::datasource::TableProvider;
24+
use async_trait::async_trait;
25+
use dashmap::DashMap;
26+
use datafusion_common::{exec_err, DataFusionError};
27+
use std::any::Any;
28+
use std::sync::Arc;
29+
30+
/// Simple in-memory list of catalogs
31+
pub struct MemoryCatalogProviderList {
32+
/// Collection of catalogs containing schemas and ultimately TableProviders
33+
pub catalogs: DashMap<String, Arc<dyn CatalogProvider>>,
34+
}
35+
36+
impl MemoryCatalogProviderList {
37+
/// Instantiates a new `MemoryCatalogProviderList` with an empty collection of catalogs
38+
pub fn new() -> Self {
39+
Self {
40+
catalogs: DashMap::new(),
41+
}
42+
}
43+
}
44+
45+
impl Default for MemoryCatalogProviderList {
46+
fn default() -> Self {
47+
Self::new()
48+
}
49+
}
50+
51+
impl CatalogProviderList for MemoryCatalogProviderList {
52+
fn as_any(&self) -> &dyn Any {
53+
self
54+
}
55+
56+
fn register_catalog(
57+
&self,
58+
name: String,
59+
catalog: Arc<dyn CatalogProvider>,
60+
) -> Option<Arc<dyn CatalogProvider>> {
61+
self.catalogs.insert(name, catalog)
62+
}
63+
64+
fn catalog_names(&self) -> Vec<String> {
65+
self.catalogs.iter().map(|c| c.key().clone()).collect()
66+
}
67+
68+
fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>> {
69+
self.catalogs.get(name).map(|c| c.value().clone())
70+
}
71+
}
72+
73+
/// Simple in-memory implementation of a catalog.
74+
pub struct MemoryCatalogProvider {
75+
schemas: DashMap<String, Arc<dyn SchemaProvider>>,
76+
}
77+
78+
impl MemoryCatalogProvider {
79+
/// Instantiates a new MemoryCatalogProvider with an empty collection of schemas.
80+
pub fn new() -> Self {
81+
Self {
82+
schemas: DashMap::new(),
83+
}
84+
}
85+
}
86+
87+
impl Default for MemoryCatalogProvider {
88+
fn default() -> Self {
89+
Self::new()
90+
}
91+
}
92+
93+
impl CatalogProvider for MemoryCatalogProvider {
94+
fn as_any(&self) -> &dyn Any {
95+
self
96+
}
97+
98+
fn schema_names(&self) -> Vec<String> {
99+
self.schemas.iter().map(|s| s.key().clone()).collect()
100+
}
101+
102+
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
103+
self.schemas.get(name).map(|s| s.value().clone())
104+
}
105+
106+
fn register_schema(
107+
&self,
108+
name: &str,
109+
schema: Arc<dyn SchemaProvider>,
110+
) -> datafusion_common::Result<Option<Arc<dyn SchemaProvider>>> {
111+
Ok(self.schemas.insert(name.into(), schema))
112+
}
113+
114+
fn deregister_schema(
115+
&self,
116+
name: &str,
117+
cascade: bool,
118+
) -> datafusion_common::Result<Option<Arc<dyn SchemaProvider>>> {
119+
if let Some(schema) = self.schema(name) {
120+
let table_names = schema.table_names();
121+
match (table_names.is_empty(), cascade) {
122+
(true, _) | (false, true) => {
123+
let (_, removed) = self.schemas.remove(name).unwrap();
124+
Ok(Some(removed))
125+
}
126+
(false, false) => exec_err!(
127+
"Cannot drop schema {} because other tables depend on it: {}",
128+
name,
129+
itertools::join(table_names.iter(), ", ")
130+
),
131+
}
132+
} else {
133+
Ok(None)
134+
}
135+
}
136+
}
137+
138+
/// Simple in-memory implementation of a schema.
139+
pub struct MemorySchemaProvider {
140+
tables: DashMap<String, Arc<dyn TableProvider>>,
141+
}
142+
143+
impl MemorySchemaProvider {
144+
/// Instantiates a new MemorySchemaProvider with an empty collection of tables.
145+
pub fn new() -> Self {
146+
Self {
147+
tables: DashMap::new(),
148+
}
149+
}
150+
}
151+
152+
impl Default for MemorySchemaProvider {
153+
fn default() -> Self {
154+
Self::new()
155+
}
156+
}
157+
158+
#[async_trait]
159+
impl SchemaProvider for MemorySchemaProvider {
160+
fn as_any(&self) -> &dyn Any {
161+
self
162+
}
163+
164+
fn table_names(&self) -> Vec<String> {
165+
self.tables
166+
.iter()
167+
.map(|table| table.key().clone())
168+
.collect()
169+
}
170+
171+
async fn table(
172+
&self,
173+
name: &str,
174+
) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
175+
Ok(self.tables.get(name).map(|table| table.value().clone()))
176+
}
177+
178+
fn register_table(
179+
&self,
180+
name: String,
181+
table: Arc<dyn TableProvider>,
182+
) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
183+
if self.table_exist(name.as_str()) {
184+
return exec_err!("The table {name} already exists");
185+
}
186+
Ok(self.tables.insert(name, table))
187+
}
188+
189+
fn deregister_table(
190+
&self,
191+
name: &str,
192+
) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
193+
Ok(self.tables.remove(name).map(|(_, table)| table))
194+
}
195+
196+
fn table_exist(&self, name: &str) -> bool {
197+
self.tables.contains_key(name)
198+
}
199+
}
200+
201+
#[cfg(test)]
202+
mod test {
203+
use super::*;
204+
use crate::catalog::schema::{MemorySchemaProvider, SchemaProvider};
205+
use crate::catalog::CatalogProvider;
206+
use crate::datasource::empty::EmptyTable;
207+
use crate::datasource::listing::{ListingTable, ListingTableConfig, ListingTableUrl};
208+
use crate::datasource::TableProvider;
209+
use crate::prelude::SessionContext;
210+
use arrow_schema::Schema;
211+
use datafusion_common::assert_batches_eq;
212+
use std::any::Any;
213+
use std::sync::Arc;
214+
215+
#[test]
216+
fn memory_catalog_dereg_nonempty_schema() {
217+
let cat = Arc::new(MemoryCatalogProvider::new()) as Arc<dyn CatalogProvider>;
218+
219+
let schema = Arc::new(MemorySchemaProvider::new()) as Arc<dyn SchemaProvider>;
220+
let test_table = Arc::new(EmptyTable::new(Arc::new(Schema::empty())))
221+
as Arc<dyn TableProvider>;
222+
schema.register_table("t".into(), test_table).unwrap();
223+
224+
cat.register_schema("foo", schema.clone()).unwrap();
225+
226+
assert!(
227+
cat.deregister_schema("foo", false).is_err(),
228+
"dropping empty schema without cascade should error"
229+
);
230+
assert!(cat.deregister_schema("foo", true).unwrap().is_some());
231+
}
232+
233+
#[test]
234+
fn memory_catalog_dereg_empty_schema() {
235+
let cat = Arc::new(MemoryCatalogProvider::new()) as Arc<dyn CatalogProvider>;
236+
237+
let schema = Arc::new(MemorySchemaProvider::new()) as Arc<dyn SchemaProvider>;
238+
cat.register_schema("foo", schema).unwrap();
239+
240+
assert!(cat.deregister_schema("foo", false).unwrap().is_some());
241+
}
242+
243+
#[test]
244+
fn memory_catalog_dereg_missing() {
245+
let cat = Arc::new(MemoryCatalogProvider::new()) as Arc<dyn CatalogProvider>;
246+
assert!(cat.deregister_schema("foo", false).unwrap().is_none());
247+
}
248+
249+
#[test]
250+
fn default_register_schema_not_supported() {
251+
// mimic a new CatalogProvider and ensure it does not support registering schemas
252+
struct TestProvider {}
253+
impl CatalogProvider for TestProvider {
254+
fn as_any(&self) -> &dyn Any {
255+
self
256+
}
257+
258+
fn schema_names(&self) -> Vec<String> {
259+
unimplemented!()
260+
}
261+
262+
fn schema(&self, _name: &str) -> Option<Arc<dyn SchemaProvider>> {
263+
unimplemented!()
264+
}
265+
}
266+
267+
let schema = Arc::new(MemorySchemaProvider::new()) as Arc<dyn SchemaProvider>;
268+
let catalog = Arc::new(TestProvider {});
269+
270+
match catalog.register_schema("foo", schema) {
271+
Ok(_) => panic!("unexpected OK"),
272+
Err(e) => assert_eq!(e.strip_backtrace(), "This feature is not implemented: Registering new schemas is not supported"),
273+
};
274+
}
275+
276+
#[tokio::test]
277+
async fn test_mem_provider() {
278+
let provider = MemorySchemaProvider::new();
279+
let table_name = "test_table_exist";
280+
assert!(!provider.table_exist(table_name));
281+
assert!(provider.deregister_table(table_name).unwrap().is_none());
282+
let test_table = EmptyTable::new(Arc::new(Schema::empty()));
283+
// register table successfully
284+
assert!(provider
285+
.register_table(table_name.to_string(), Arc::new(test_table))
286+
.unwrap()
287+
.is_none());
288+
assert!(provider.table_exist(table_name));
289+
let other_table = EmptyTable::new(Arc::new(Schema::empty()));
290+
let result =
291+
provider.register_table(table_name.to_string(), Arc::new(other_table));
292+
assert!(result.is_err());
293+
}
294+
295+
#[tokio::test]
296+
async fn test_schema_register_listing_table() {
297+
let testdata = crate::test_util::parquet_test_data();
298+
let testdir = if testdata.starts_with('/') {
299+
format!("file://{testdata}")
300+
} else {
301+
format!("file:///{testdata}")
302+
};
303+
let filename = if testdir.ends_with('/') {
304+
format!("{}{}", testdir, "alltypes_plain.parquet")
305+
} else {
306+
format!("{}/{}", testdir, "alltypes_plain.parquet")
307+
};
308+
309+
let table_path = ListingTableUrl::parse(filename).unwrap();
310+
311+
let catalog = MemoryCatalogProvider::new();
312+
let schema = MemorySchemaProvider::new();
313+
314+
let ctx = SessionContext::new();
315+
316+
let config = ListingTableConfig::new(table_path)
317+
.infer(&ctx.state())
318+
.await
319+
.unwrap();
320+
let table = ListingTable::try_new(config).unwrap();
321+
322+
schema
323+
.register_table("alltypes_plain".to_string(), Arc::new(table))
324+
.unwrap();
325+
326+
catalog.register_schema("active", Arc::new(schema)).unwrap();
327+
ctx.register_catalog("cat", Arc::new(catalog));
328+
329+
let df = ctx
330+
.sql("SELECT id, bool_col FROM cat.active.alltypes_plain")
331+
.await
332+
.unwrap();
333+
334+
let actual = df.collect().await.unwrap();
335+
336+
let expected = [
337+
"+----+----------+",
338+
"| id | bool_col |",
339+
"+----+----------+",
340+
"| 4 | true |",
341+
"| 5 | false |",
342+
"| 6 | true |",
343+
"| 7 | false |",
344+
"| 2 | true |",
345+
"| 3 | false |",
346+
"| 0 | true |",
347+
"| 1 | false |",
348+
"+----+----------+",
349+
];
350+
assert_batches_eq!(expected, &actual);
351+
}
352+
}

0 commit comments

Comments
 (0)