File tree 8 files changed +17
-25
lines changed
datafusion/core/src/datasource
datafusion-examples/examples
8 files changed +17
-25
lines changed Original file line number Diff line number Diff line change @@ -71,7 +71,7 @@ async fn csv_opener() -> Result<()> {
71
71
. with_batch_size ( 8192 )
72
72
. with_projection ( & scan_config) ;
73
73
74
- let opener = config. create_file_opener ( Ok ( object_store) , & scan_config, 0 ) ?;
74
+ let opener = config. create_file_opener ( object_store, & scan_config, 0 ) ?;
75
75
76
76
let mut result = vec ! [ ] ;
77
77
let mut stream =
Original file line number Diff line number Diff line change @@ -38,7 +38,7 @@ pub trait FileSource: Send + Sync {
38
38
/// Creates a `dyn FileOpener` based on given parameters
39
39
fn create_file_opener (
40
40
& self ,
41
- object_store : datafusion_common :: Result < Arc < dyn ObjectStore > > ,
41
+ object_store : Arc < dyn ObjectStore > ,
42
42
base_config : & FileScanConfig ,
43
43
partition : usize ,
44
44
) -> datafusion_common:: Result < Arc < dyn FileOpener > > ;
Original file line number Diff line number Diff line change @@ -211,12 +211,12 @@ pub struct ArrowSource {
211
211
impl FileSource for ArrowSource {
212
212
fn create_file_opener (
213
213
& self ,
214
- object_store : Result < Arc < dyn ObjectStore > > ,
214
+ object_store : Arc < dyn ObjectStore > ,
215
215
base_config : & FileScanConfig ,
216
216
_partition : usize ,
217
217
) -> Result < Arc < dyn FileOpener > > {
218
218
Ok ( Arc :: new ( ArrowOpener {
219
- object_store : object_store? ,
219
+ object_store,
220
220
projection : base_config. file_column_projection_indices ( ) ,
221
221
} ) )
222
222
}
Original file line number Diff line number Diff line change @@ -194,20 +194,20 @@ impl FileSource for AvroSource {
194
194
#[ cfg( feature = "avro" ) ]
195
195
fn create_file_opener (
196
196
& self ,
197
- object_store : Result < Arc < dyn ObjectStore > > ,
197
+ object_store : Arc < dyn ObjectStore > ,
198
198
_base_config : & FileScanConfig ,
199
199
_partition : usize ,
200
200
) -> Result < Arc < dyn FileOpener > > {
201
201
Ok ( Arc :: new ( private:: AvroOpener {
202
202
config : Arc :: new ( self . clone ( ) ) ,
203
- object_store : object_store? ,
203
+ object_store,
204
204
} ) )
205
205
}
206
206
207
207
#[ cfg( not( feature = "avro" ) ) ]
208
208
fn create_file_opener (
209
209
& self ,
210
- _object_store : Result < Arc < dyn ObjectStore > > ,
210
+ _object_store : Arc < dyn ObjectStore > ,
211
211
_base_config : & FileScanConfig ,
212
212
_partition : usize ,
213
213
) -> Result < Arc < dyn FileOpener > > {
Original file line number Diff line number Diff line change @@ -564,14 +564,14 @@ impl CsvOpener {
564
564
impl FileSource for CsvSource {
565
565
fn create_file_opener (
566
566
& self ,
567
- object_store : Result < Arc < dyn ObjectStore > > ,
567
+ object_store : Arc < dyn ObjectStore > ,
568
568
base_config : & FileScanConfig ,
569
569
_partition : usize ,
570
570
) -> Result < Arc < dyn FileOpener > > {
571
571
Ok ( Arc :: new ( CsvOpener {
572
572
config : Arc :: new ( self . clone ( ) ) ,
573
573
file_compression_type : base_config. file_compression_type ,
574
- object_store : object_store? ,
574
+ object_store,
575
575
} ) )
576
576
}
577
577
Original file line number Diff line number Diff line change @@ -162,7 +162,7 @@ impl DataSource for FileScanConfig {
162
162
partition : usize ,
163
163
context : Arc < TaskContext > ,
164
164
) -> Result < SendableRecordBatchStream > {
165
- let object_store = context. runtime_env ( ) . object_store ( & self . object_store_url ) ;
165
+ let object_store = context. runtime_env ( ) . object_store ( & self . object_store_url ) ? ;
166
166
167
167
let source = self
168
168
. source
Original file line number Diff line number Diff line change @@ -262,7 +262,7 @@ impl JsonSource {
262
262
impl FileSource for JsonSource {
263
263
fn create_file_opener (
264
264
& self ,
265
- object_store : Result < Arc < dyn ObjectStore > > ,
265
+ object_store : Arc < dyn ObjectStore > ,
266
266
base_config : & FileScanConfig ,
267
267
_partition : usize ,
268
268
) -> Result < Arc < dyn FileOpener > > {
@@ -272,7 +272,7 @@ impl FileSource for JsonSource {
272
272
. expect ( "Batch size must set before creating opener" ) ,
273
273
projected_schema : base_config. projected_file_schema ( ) ,
274
274
file_compression_type : base_config. file_compression_type ,
275
- object_store : object_store? ,
275
+ object_store,
276
276
} ) )
277
277
}
278
278
Original file line number Diff line number Diff line change @@ -463,7 +463,7 @@ impl ParquetSource {
463
463
impl FileSource for ParquetSource {
464
464
fn create_file_opener (
465
465
& self ,
466
- object_store : datafusion_common :: Result < Arc < dyn ObjectStore > > ,
466
+ object_store : Arc < dyn ObjectStore > ,
467
467
base_config : & FileScanConfig ,
468
468
partition : usize ,
469
469
) -> datafusion_common:: Result < Arc < dyn FileOpener > > {
@@ -475,15 +475,10 @@ impl FileSource for ParquetSource {
475
475
. clone ( )
476
476
. unwrap_or_else ( || Arc :: new ( DefaultSchemaAdapterFactory ) ) ;
477
477
478
- let parquet_file_reader_factory = self
479
- . parquet_file_reader_factory
480
- . as_ref ( )
481
- . map ( |f| Ok ( Arc :: clone ( f) ) )
482
- . unwrap_or_else ( || {
483
- object_store. map ( |store| {
484
- Arc :: new ( DefaultParquetFileReaderFactory :: new ( store) ) as _
485
- } )
486
- } ) ?;
478
+ let parquet_file_reader_factory =
479
+ self . parquet_file_reader_factory . clone ( ) . unwrap_or_else ( || {
480
+ Arc :: new ( DefaultParquetFileReaderFactory :: new ( object_store) ) as _
481
+ } ) ;
487
482
488
483
Ok ( Arc :: new ( ParquetOpener {
489
484
partition_index : partition,
@@ -586,7 +581,4 @@ impl FileSource for ParquetSource {
586
581
}
587
582
}
588
583
}
589
- fn supports_repartition ( & self , _config : & FileScanConfig ) -> bool {
590
- true
591
- }
592
584
}
You can’t perform that action at this time.
0 commit comments