Skip to content

Commit c105089

Browse files
authored
Merge pull request #2624 from flaneur2020/add-system-table-columns
Add system.columns table
2 parents 1367213 + ff46fc9 commit c105089

File tree

5 files changed

+181
-0
lines changed

5 files changed

+181
-0
lines changed

query/src/catalogs/impls/catalog/system_catalog.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ impl SystemCatalog {
7575
Arc::new(system::ProcessesTable::create(next_id())),
7676
Arc::new(system::ConfigsTable::create(next_id())),
7777
Arc::new(system::MetricsTable::create(next_id())),
78+
Arc::new(system::ColumnsTable::create(next_id())),
7879
];
7980

8081
let mut tables = InMemoryMetas::create();
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
// Copyright 2020 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::any::Any;
16+
use std::sync::Arc;
17+
18+
use common_context::IOContext;
19+
use common_context::TableIOContext;
20+
use common_datablocks::DataBlock;
21+
use common_datavalues::series::Series;
22+
use common_datavalues::series::SeriesFrom;
23+
use common_datavalues::DataField;
24+
use common_datavalues::DataSchemaRefExt;
25+
use common_datavalues::DataType;
26+
use common_exception::Result;
27+
use common_meta_types::TableIdent;
28+
use common_meta_types::TableInfo;
29+
use common_meta_types::TableMeta;
30+
use common_planners::ReadDataSourcePlan;
31+
use common_streams::DataBlockStream;
32+
use common_streams::SendableDataBlockStream;
33+
34+
use crate::catalogs::Catalog;
35+
use crate::catalogs::Table;
36+
use crate::sessions::DatabendQueryContext;
37+
38+
pub struct ColumnsTable {
39+
table_info: TableInfo,
40+
}
41+
42+
impl ColumnsTable {
43+
pub fn create(table_id: u64) -> Self {
44+
let schema = DataSchemaRefExt::create(vec![
45+
DataField::new("name", DataType::String, false),
46+
DataField::new("database", DataType::String, false),
47+
DataField::new("table", DataType::String, false),
48+
DataField::new("data_type", DataType::String, false),
49+
DataField::new("is_nullable", DataType::Boolean, false),
50+
]);
51+
52+
let table_info = TableInfo {
53+
desc: "'system'.'columns'".to_string(),
54+
name: "columns".to_string(),
55+
ident: TableIdent::new(table_id, 0),
56+
meta: TableMeta {
57+
schema,
58+
engine: "SystemColumns".to_string(),
59+
..Default::default()
60+
},
61+
};
62+
63+
Self { table_info }
64+
}
65+
66+
pub async fn dump_table_columns(
67+
&self,
68+
ctx: Arc<DatabendQueryContext>,
69+
) -> Result<Vec<(String, String, DataField)>> {
70+
let catalog = ctx.get_catalog();
71+
let databases = catalog.get_databases().await?;
72+
73+
let mut rows: Vec<(String, String, DataField)> = vec![];
74+
for database in databases {
75+
for table in catalog.get_tables(database.name()).await? {
76+
for field in table.schema().fields() {
77+
rows.push((database.name().into(), table.name().into(), field.clone()))
78+
}
79+
}
80+
}
81+
82+
Ok(rows)
83+
}
84+
}
85+
86+
#[async_trait::async_trait]
87+
impl Table for ColumnsTable {
88+
fn as_any(&self) -> &dyn Any {
89+
self
90+
}
91+
92+
fn get_table_info(&self) -> &TableInfo {
93+
&self.table_info
94+
}
95+
96+
async fn read(
97+
&self,
98+
io_ctx: Arc<TableIOContext>,
99+
_plan: &ReadDataSourcePlan,
100+
) -> Result<SendableDataBlockStream> {
101+
let ctx: Arc<DatabendQueryContext> = io_ctx
102+
.get_user_data()?
103+
.expect("DatabendQueryContext should not be None");
104+
105+
let rows = self.dump_table_columns(ctx).await?;
106+
let mut names: Vec<Vec<u8>> = Vec::with_capacity(rows.len());
107+
let mut tables: Vec<Vec<u8>> = Vec::with_capacity(rows.len());
108+
let mut databases: Vec<Vec<u8>> = Vec::with_capacity(rows.len());
109+
let mut data_types: Vec<Vec<u8>> = Vec::with_capacity(rows.len());
110+
let mut is_nullables: Vec<bool> = Vec::with_capacity(rows.len());
111+
for (database_name, table_name, field) in rows.into_iter() {
112+
names.push(field.name().clone().into_bytes());
113+
tables.push(table_name.into_bytes());
114+
databases.push(database_name.into_bytes());
115+
data_types.push(field.data_type().to_string().into_bytes());
116+
is_nullables.push(field.is_nullable());
117+
}
118+
119+
let block = DataBlock::create_by_array(self.table_info.schema(), vec![
120+
Series::new(names),
121+
Series::new(databases),
122+
Series::new(tables),
123+
Series::new(data_types),
124+
Series::new(is_nullables),
125+
]);
126+
Ok(Box::pin(DataBlockStream::create(
127+
self.table_info.schema(),
128+
None,
129+
vec![block],
130+
)))
131+
}
132+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
// Copyright 2020 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::sync::Arc;
16+
17+
use common_base::tokio;
18+
use common_exception::Result;
19+
use futures::TryStreamExt;
20+
21+
use crate::catalogs::Table;
22+
use crate::catalogs::ToReadDataSourcePlan;
23+
use crate::datasources::database::system::ColumnsTable;
24+
25+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
26+
async fn test_columns_table() -> Result<()> {
27+
let ctx = crate::tests::try_create_context()?;
28+
29+
let table: Arc<dyn Table> = Arc::new(ColumnsTable::create(1));
30+
let io_ctx = ctx.get_single_node_table_io_context()?;
31+
let io_ctx = Arc::new(io_ctx);
32+
let source_plan = table.read_plan(
33+
io_ctx.clone(),
34+
None,
35+
Some(ctx.get_settings().get_max_threads()? as usize),
36+
)?;
37+
38+
let stream = table.read(io_ctx, &source_plan).await?;
39+
let result = stream.try_collect::<Vec<_>>().await?;
40+
let block = &result[0];
41+
assert_eq!(block.num_columns(), 5);
42+
Ok(())
43+
}

query/src/datasources/database/system/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
pub use clusters_table::ClustersTable;
16+
pub use columns_table::ColumnsTable;
1617
pub use configs_table::ConfigsTable;
1718
pub use contributors_table::ContributorsTable;
1819
pub use credits_table::CreditsTable;
@@ -30,6 +31,8 @@ pub use tracing_table_stream::TracingTableStream;
3031
#[cfg(test)]
3132
mod clusters_table_test;
3233
#[cfg(test)]
34+
mod columns_table_test;
35+
#[cfg(test)]
3336
mod configs_table_test;
3437
#[cfg(test)]
3538
mod contributors_table_test;
@@ -49,6 +52,7 @@ mod tables_table_test;
4952
mod tracing_table_test;
5053

5154
mod clusters_table;
55+
mod columns_table;
5256
mod configs_table;
5357
mod contributors_table;
5458
mod credits_table;

query/src/datasources/database/system/tables_table_test.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ async fn test_tables_table() -> Result<()> {
4444
"| database | name | engine |",
4545
"+----------+--------------+--------------------+",
4646
"| system | clusters | SystemClusters |",
47+
"| system | columns | SystemColumns |",
4748
"| system | configs | SystemConfigs |",
4849
"| system | contributors | SystemContributors |",
4950
"| system | credits | SystemCredits |",

0 commit comments

Comments
 (0)