Skip to content

Commit 4b8a3d6

Browse files
authored
Parallel parquet schema inference (#6366)
1 parent d01002c commit 4b8a3d6

File tree

1 file changed

+7
-6
lines changed
  • datafusion/core/src/datasource/file_format

1 file changed

+7
-6
lines changed

datafusion/core/src/datasource/file_format/parquet.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use async_trait::async_trait;
2626
use bytes::{BufMut, BytesMut};
2727
use datafusion_common::DataFusionError;
2828
use datafusion_physical_expr::PhysicalExpr;
29+
use futures::{StreamExt, TryStreamExt};
2930
use hashbrown::HashMap;
3031
use object_store::{ObjectMeta, ObjectStore};
3132
use parquet::arrow::parquet_to_arrow_schema;
@@ -151,12 +152,12 @@ impl FileFormat for ParquetFormat {
151152
store: &Arc<dyn ObjectStore>,
152153
objects: &[ObjectMeta],
153154
) -> Result<SchemaRef> {
154-
let mut schemas = Vec::with_capacity(objects.len());
155-
for object in objects {
156-
let schema =
157-
fetch_schema(store.as_ref(), object, self.metadata_size_hint).await?;
158-
schemas.push(schema)
159-
}
155+
let schemas: Vec<_> = futures::stream::iter(objects)
156+
.map(|object| fetch_schema(store.as_ref(), object, self.metadata_size_hint))
157+
.boxed() // Workaround https://github.com/rust-lang/rust/issues/64552
158+
.buffered(32)
159+
.try_collect()
160+
.await?;
160161

161162
let schema = if self.skip_metadata(state.config_options()) {
162163
Schema::try_merge(clear_metadata(schemas))

0 commit comments

Comments
 (0)