Skip to content

Commit 1c54b38

Browse files
authored
Simplify FileSource::create_file_opener's signature (apache#14798)
* simplify fn signature * .
1 parent 0bd9083 commit 1c54b38

File tree

8 files changed

+36
-41
lines changed

8 files changed

+36
-41
lines changed

datafusion-examples/examples/csv_json_opener.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ async fn csv_opener() -> Result<()> {
7171
.with_batch_size(8192)
7272
.with_projection(&scan_config);
7373

74-
let opener = config.create_file_opener(Ok(object_store), &scan_config, 0)?;
74+
let opener = config.create_file_opener(object_store, &scan_config, 0);
7575

7676
let mut result = vec![];
7777
let mut stream =

datafusion/core/src/datasource/data_source.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,10 @@ pub trait FileSource: Send + Sync {
4040
/// Creates a `dyn FileOpener` based on given parameters
4141
fn create_file_opener(
4242
&self,
43-
object_store: datafusion_common::Result<Arc<dyn ObjectStore>>,
43+
object_store: Arc<dyn ObjectStore>,
4444
base_config: &FileScanConfig,
4545
partition: usize,
46-
) -> datafusion_common::Result<Arc<dyn FileOpener>>;
46+
) -> Arc<dyn FileOpener>;
4747
/// Any
4848
fn as_any(&self) -> &dyn Any;
4949
/// Initialize new type with batch size configuration

datafusion/core/src/datasource/physical_plan/arrow_file.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -211,14 +211,14 @@ pub struct ArrowSource {
211211
impl FileSource for ArrowSource {
212212
fn create_file_opener(
213213
&self,
214-
object_store: Result<Arc<dyn ObjectStore>>,
214+
object_store: Arc<dyn ObjectStore>,
215215
base_config: &FileScanConfig,
216216
_partition: usize,
217-
) -> Result<Arc<dyn FileOpener>> {
218-
Ok(Arc::new(ArrowOpener {
219-
object_store: object_store?,
217+
) -> Arc<dyn FileOpener> {
218+
Arc::new(ArrowOpener {
219+
object_store,
220220
projection: base_config.file_column_projection_indices(),
221-
}))
221+
})
222222
}
223223

224224
fn as_any(&self) -> &dyn Any {

datafusion/core/src/datasource/physical_plan/avro.rs

+7-7
Original file line numberDiff line numberDiff line change
@@ -194,23 +194,23 @@ impl FileSource for AvroSource {
194194
#[cfg(feature = "avro")]
195195
fn create_file_opener(
196196
&self,
197-
object_store: Result<Arc<dyn ObjectStore>>,
197+
object_store: Arc<dyn ObjectStore>,
198198
_base_config: &FileScanConfig,
199199
_partition: usize,
200-
) -> Result<Arc<dyn FileOpener>> {
201-
Ok(Arc::new(private::AvroOpener {
200+
) -> Arc<dyn FileOpener> {
201+
Arc::new(private::AvroOpener {
202202
config: Arc::new(self.clone()),
203-
object_store: object_store?,
204-
}))
203+
object_store,
204+
})
205205
}
206206

207207
#[cfg(not(feature = "avro"))]
208208
fn create_file_opener(
209209
&self,
210-
_object_store: Result<Arc<dyn ObjectStore>>,
210+
_object_store: Arc<dyn ObjectStore>,
211211
_base_config: &FileScanConfig,
212212
_partition: usize,
213-
) -> Result<Arc<dyn FileOpener>> {
213+
) -> Arc<dyn FileOpener> {
214214
panic!("Avro feature is not enabled in this build")
215215
}
216216

datafusion/core/src/datasource/physical_plan/csv.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -564,15 +564,15 @@ impl CsvOpener {
564564
impl FileSource for CsvSource {
565565
fn create_file_opener(
566566
&self,
567-
object_store: Result<Arc<dyn ObjectStore>>,
567+
object_store: Arc<dyn ObjectStore>,
568568
base_config: &FileScanConfig,
569569
_partition: usize,
570-
) -> Result<Arc<dyn FileOpener>> {
571-
Ok(Arc::new(CsvOpener {
570+
) -> Arc<dyn FileOpener> {
571+
Arc::new(CsvOpener {
572572
config: Arc::new(self.clone()),
573573
file_compression_type: base_config.file_compression_type,
574-
object_store: object_store?,
575-
}))
574+
object_store,
575+
})
576576
}
577577

578578
fn as_any(&self) -> &dyn Any {

datafusion/core/src/datasource/physical_plan/file_scan_config.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -162,15 +162,15 @@ impl DataSource for FileScanConfig {
162162
partition: usize,
163163
context: Arc<TaskContext>,
164164
) -> Result<SendableRecordBatchStream> {
165-
let object_store = context.runtime_env().object_store(&self.object_store_url);
165+
let object_store = context.runtime_env().object_store(&self.object_store_url)?;
166166

167167
let source = self
168168
.source
169169
.with_batch_size(context.session_config().batch_size())
170170
.with_schema(Arc::clone(&self.file_schema))
171171
.with_projection(self);
172172

173-
let opener = source.create_file_opener(object_store, self, partition)?;
173+
let opener = source.create_file_opener(object_store, self, partition);
174174

175175
let stream = FileStream::new(self, partition, opener, source.metrics())?;
176176
Ok(Box::pin(stream))

datafusion/core/src/datasource/physical_plan/json.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -262,18 +262,18 @@ impl JsonSource {
262262
impl FileSource for JsonSource {
263263
fn create_file_opener(
264264
&self,
265-
object_store: Result<Arc<dyn ObjectStore>>,
265+
object_store: Arc<dyn ObjectStore>,
266266
base_config: &FileScanConfig,
267267
_partition: usize,
268-
) -> Result<Arc<dyn FileOpener>> {
269-
Ok(Arc::new(JsonOpener {
268+
) -> Arc<dyn FileOpener> {
269+
Arc::new(JsonOpener {
270270
batch_size: self
271271
.batch_size
272272
.expect("Batch size must set before creating opener"),
273273
projected_schema: base_config.projected_file_schema(),
274274
file_compression_type: base_config.file_compression_type,
275-
object_store: object_store?,
276-
}))
275+
object_store,
276+
})
277277
}
278278

279279
fn as_any(&self) -> &dyn Any {

datafusion/core/src/datasource/physical_plan/parquet/source.rs

+9-14
Original file line numberDiff line numberDiff line change
@@ -463,10 +463,10 @@ impl ParquetSource {
463463
impl FileSource for ParquetSource {
464464
fn create_file_opener(
465465
&self,
466-
object_store: datafusion_common::Result<Arc<dyn ObjectStore>>,
466+
object_store: Arc<dyn ObjectStore>,
467467
base_config: &FileScanConfig,
468468
partition: usize,
469-
) -> datafusion_common::Result<Arc<dyn FileOpener>> {
469+
) -> Arc<dyn FileOpener> {
470470
let projection = base_config
471471
.file_column_projection_indices()
472472
.unwrap_or_else(|| (0..base_config.file_schema.fields().len()).collect());
@@ -475,17 +475,12 @@ impl FileSource for ParquetSource {
475475
.clone()
476476
.unwrap_or_else(|| Arc::new(DefaultSchemaAdapterFactory));
477477

478-
let parquet_file_reader_factory = self
479-
.parquet_file_reader_factory
480-
.as_ref()
481-
.map(|f| Ok(Arc::clone(f)))
482-
.unwrap_or_else(|| {
483-
object_store.map(|store| {
484-
Arc::new(DefaultParquetFileReaderFactory::new(store)) as _
485-
})
486-
})?;
487-
488-
Ok(Arc::new(ParquetOpener {
478+
let parquet_file_reader_factory =
479+
self.parquet_file_reader_factory.clone().unwrap_or_else(|| {
480+
Arc::new(DefaultParquetFileReaderFactory::new(object_store)) as _
481+
});
482+
483+
Arc::new(ParquetOpener {
489484
partition_index: partition,
490485
projection: Arc::from(projection),
491486
batch_size: self
@@ -504,7 +499,7 @@ impl FileSource for ParquetSource {
504499
enable_page_index: self.enable_page_index(),
505500
enable_bloom_filter: self.bloom_filter_on_read(),
506501
schema_adapter_factory,
507-
}))
502+
})
508503
}
509504

510505
fn as_any(&self) -> &dyn Any {

0 commit comments

Comments
 (0)