Skip to content

Commit 2f5b25d

Browse files
authored
Minor: refactor streaming CSV inference code (#4717)
1 parent af9cd58 commit 2f5b25d

File tree

1 file changed

+105
-69
lines changed
  • datafusion/core/src/datasource/file_format

1 file changed

+105
-69
lines changed

datafusion/core/src/datasource/file_format/csv.rs

Lines changed: 105 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,11 @@ use std::sync::Arc;
2525
use arrow::datatypes::{DataType, Field, Schema};
2626
use arrow::{self, datatypes::SchemaRef};
2727
use async_trait::async_trait;
28-
use bytes::Buf;
28+
use bytes::{Buf, Bytes};
2929

3030
use datafusion_common::DataFusionError;
3131

32-
use futures::{pin_mut, StreamExt, TryStreamExt};
32+
use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
3333
use object_store::{ObjectMeta, ObjectStore};
3434

3535
use super::FileFormat;
@@ -125,75 +125,16 @@ impl FileFormat for CsvFormat {
125125

126126
let mut records_to_read = self.schema_infer_max_rec.unwrap_or(usize::MAX);
127127

128-
'iterating_objects: for object in objects {
128+
for object in objects {
129129
// stream to only read as many rows as needed into memory
130-
let stream = store
131-
.get(&object.location)
132-
.await?
133-
.into_stream()
134-
.map_err(|e| DataFusionError::External(Box::new(e)));
135-
let stream = newline_delimited_stream(stream);
136-
pin_mut!(stream);
137-
138-
let mut column_names = vec![];
139-
let mut column_type_possibilities = vec![];
140-
let mut first_chunk = true;
141-
142-
'reading_object: while let Some(data) = stream.next().await.transpose()? {
143-
let (Schema { fields, .. }, records_read) =
144-
arrow::csv::reader::infer_reader_schema(
145-
self.file_compression_type.convert_read(data.reader())?,
146-
self.delimiter,
147-
Some(records_to_read),
148-
// only consider header for first chunk
149-
self.has_header && first_chunk,
150-
)?;
151-
records_to_read -= records_read;
152-
153-
if first_chunk {
154-
// set up initial structures for recording inferred schema across chunks
155-
(column_names, column_type_possibilities) = fields
156-
.into_iter()
157-
.map(|field| {
158-
let mut possibilities = HashSet::new();
159-
if records_read > 0 {
160-
// at least 1 data row read, record the inferred datatype
161-
possibilities.insert(field.data_type().clone());
162-
}
163-
(field.name().clone(), possibilities)
164-
})
165-
.unzip();
166-
first_chunk = false;
167-
} else {
168-
if fields.len() != column_type_possibilities.len() {
169-
return Err(DataFusionError::Execution(
170-
format!(
171-
"Encountered unequal lengths between records on CSV file whilst inferring schema. \
172-
Expected {} records, found {} records",
173-
column_type_possibilities.len(),
174-
fields.len()
175-
)
176-
));
177-
}
178-
179-
column_type_possibilities.iter_mut().zip(fields).for_each(
180-
|(possibilities, field)| {
181-
possibilities.insert(field.data_type().clone());
182-
},
183-
);
184-
}
185-
186-
if records_to_read == 0 {
187-
break 'reading_object;
188-
}
189-
}
190-
191-
schemas.push(build_schema_helper(
192-
column_names,
193-
&column_type_possibilities,
194-
));
130+
let stream = read_to_delimited_chunks(store, object).await;
131+
let (schema, records_read) = self
132+
.infer_schema_from_stream(records_to_read, stream)
133+
.await?;
134+
records_to_read -= records_read;
135+
schemas.push(schema);
195136
if records_to_read == 0 {
196-
break 'iterating_objects;
137+
break;
197138
}
198139
}
199140

@@ -227,6 +168,101 @@ impl FileFormat for CsvFormat {
227168
}
228169
}
229170

171+
/// Return a newline delimited stream from the specified file on
172+
/// object store
173+
///
174+
/// Each returned `Bytes` has a whole number of newline delimited rows
175+
async fn read_to_delimited_chunks(
176+
store: &Arc<dyn ObjectStore>,
177+
object: &ObjectMeta,
178+
) -> impl Stream<Item = Result<Bytes>> {
179+
// stream to only read as many rows as needed into memory
180+
let stream = store
181+
.get(&object.location)
182+
.await
183+
.map_err(DataFusionError::ObjectStore);
184+
185+
match stream {
186+
Ok(s) => newline_delimited_stream(
187+
s.into_stream()
188+
.map_err(|e| DataFusionError::External(Box::new(e))),
189+
)
190+
.left_stream(),
191+
Err(e) => futures::stream::iter(vec![Err(e)]).right_stream(),
192+
}
193+
}
194+
195+
impl CsvFormat {
196+
/// Return the inferred schema reading up to records_to_read from a
197+
/// stream of delimited chunks returning the inferred schema and the
198+
/// number of lines that were read
199+
async fn infer_schema_from_stream(
200+
&self,
201+
mut records_to_read: usize,
202+
stream: impl Stream<Item = Result<Bytes>>,
203+
) -> Result<(Schema, usize)> {
204+
let mut total_records_read = 0;
205+
let mut column_names = vec![];
206+
let mut column_type_possibilities = vec![];
207+
let mut first_chunk = true;
208+
209+
pin_mut!(stream);
210+
211+
while let Some(chunk) = stream.next().await.transpose()? {
212+
let (Schema { fields, .. }, records_read) =
213+
arrow::csv::reader::infer_reader_schema(
214+
self.file_compression_type.convert_read(chunk.reader())?,
215+
self.delimiter,
216+
Some(records_to_read),
217+
// only consider header for first chunk
218+
self.has_header && first_chunk,
219+
)?;
220+
records_to_read -= records_read;
221+
total_records_read += records_read;
222+
223+
if first_chunk {
224+
// set up initial structures for recording inferred schema across chunks
225+
(column_names, column_type_possibilities) = fields
226+
.into_iter()
227+
.map(|field| {
228+
let mut possibilities = HashSet::new();
229+
if records_read > 0 {
230+
// at least 1 data row read, record the inferred datatype
231+
possibilities.insert(field.data_type().clone());
232+
}
233+
(field.name().clone(), possibilities)
234+
})
235+
.unzip();
236+
first_chunk = false;
237+
} else {
238+
if fields.len() != column_type_possibilities.len() {
239+
return Err(DataFusionError::Execution(
240+
format!(
241+
"Encountered unequal lengths between records on CSV file whilst inferring schema. \
242+
Expected {} records, found {} records",
243+
column_type_possibilities.len(),
244+
fields.len()
245+
)
246+
));
247+
}
248+
249+
column_type_possibilities.iter_mut().zip(fields).for_each(
250+
|(possibilities, field)| {
251+
possibilities.insert(field.data_type().clone());
252+
},
253+
);
254+
}
255+
256+
if records_to_read == 0 {
257+
break;
258+
}
259+
}
260+
261+
let schema = build_schema_helper(column_names, &column_type_possibilities);
262+
Ok((schema, total_records_read))
263+
}
264+
}
265+
230266
fn build_schema_helper(names: Vec<String>, types: &[HashSet<DataType>]) -> Schema {
231267
let fields = names
232268
.into_iter()

0 commit comments

Comments
 (0)