@@ -23,28 +23,156 @@ pub(crate) mod spill_manager;
23
23
use std:: fs:: File ;
24
24
use std:: io:: BufReader ;
25
25
use std:: path:: { Path , PathBuf } ;
26
+ use std:: pin:: Pin ;
26
27
use std:: ptr:: NonNull ;
28
+ use std:: sync:: Arc ;
29
+ use std:: task:: { Context , Poll } ;
27
30
28
31
use arrow:: array:: ArrayData ;
29
32
use arrow:: datatypes:: { Schema , SchemaRef } ;
30
33
use arrow:: ipc:: { reader:: StreamReader , writer:: StreamWriter } ;
31
34
use arrow:: record_batch:: RecordBatch ;
32
- use tokio:: sync:: mpsc:: Sender ;
33
-
34
- use datafusion_common:: { exec_datafusion_err, HashSet , Result } ;
35
-
36
- fn read_spill ( sender : Sender < Result < RecordBatch > > , path : & Path ) -> Result < ( ) > {
37
- let file = BufReader :: new ( File :: open ( path) ?) ;
38
- // SAFETY: DataFusion's spill writer strictly follows Arrow IPC specifications
39
- // with validated schemas and buffers. Skip redundant validation during read
40
- // to speedup read operation. This is safe for DataFusion as input guaranteed to be correct when written.
41
- let reader = unsafe { StreamReader :: try_new ( file, None ) ?. with_skip_validation ( true ) } ;
42
- for batch in reader {
43
- sender
44
- . blocking_send ( batch. map_err ( Into :: into) )
45
- . map_err ( |e| exec_datafusion_err ! ( "{e}" ) ) ?;
35
+
36
+ use datafusion_common:: { exec_datafusion_err, DataFusionError , HashSet , Result } ;
37
+ use datafusion_common_runtime:: SpawnedTask ;
38
+ use datafusion_execution:: disk_manager:: RefCountedTempFile ;
39
+ use datafusion_execution:: RecordBatchStream ;
40
+ use futures:: { FutureExt as _, Stream } ;
41
+
42
+ /// Stream that reads spill files from disk where each batch is read in a spawned blocking task
43
+ /// It will read one batch at a time and will not do any buffering, to buffer data use [`crate::common::spawn_buffered`]
44
+ struct SpillReaderStream {
45
+ schema : SchemaRef ,
46
+ state : SpillReaderStreamState ,
47
+ }
48
+
49
+ /// When we poll for the next batch, we will get back both the batch and the reader,
50
+ /// so we can call `next` again.
51
+ type NextRecordBatchResult = Result < ( StreamReader < BufReader < File > > , Option < RecordBatch > ) > ;
52
+
53
+ enum SpillReaderStreamState {
54
+ /// Initial state: the stream was not initialized yet
55
+ /// and the file was not opened
56
+ Uninitialized ( RefCountedTempFile ) ,
57
+
58
+ /// A read is in progress in a spawned blocking task for which we hold the handle.
59
+ ReadInProgress ( SpawnedTask < NextRecordBatchResult > ) ,
60
+
61
+ /// A read has finished and we wait for being polled again in order to start reading the next batch.
62
+ Waiting ( StreamReader < BufReader < File > > ) ,
63
+
64
+ /// The stream has finished, successfully or not.
65
+ Done ,
66
+ }
67
+
68
+ impl SpillReaderStream {
69
+ fn new ( schema : SchemaRef , spill_file : RefCountedTempFile ) -> Self {
70
+ Self {
71
+ schema,
72
+ state : SpillReaderStreamState :: Uninitialized ( spill_file) ,
73
+ }
74
+ }
75
+
76
+ fn poll_next_inner (
77
+ & mut self ,
78
+ cx : & mut Context < ' _ > ,
79
+ ) -> Poll < Option < Result < RecordBatch > > > {
80
+ match & mut self . state {
81
+ SpillReaderStreamState :: Uninitialized ( _) => {
82
+ // Temporarily replace with `Done` to be able to pass the file to the task.
83
+ let SpillReaderStreamState :: Uninitialized ( spill_file) =
84
+ std:: mem:: replace ( & mut self . state , SpillReaderStreamState :: Done )
85
+ else {
86
+ unreachable ! ( )
87
+ } ;
88
+
89
+ let task = SpawnedTask :: spawn_blocking ( move || {
90
+ let file = BufReader :: new ( File :: open ( spill_file. path ( ) ) ?) ;
91
+ // SAFETY: DataFusion's spill writer strictly follows Arrow IPC specifications
92
+ // with validated schemas and buffers. Skip redundant validation during read
93
+ // to speedup read operation. This is safe for DataFusion as input guaranteed to be correct when written.
94
+ let mut reader = unsafe {
95
+ StreamReader :: try_new ( file, None ) ?. with_skip_validation ( true )
96
+ } ;
97
+
98
+ let next_batch = reader. next ( ) . transpose ( ) ?;
99
+
100
+ Ok ( ( reader, next_batch) )
101
+ } ) ;
102
+
103
+ self . state = SpillReaderStreamState :: ReadInProgress ( task) ;
104
+
105
+ // Poll again immediately so the inner task is polled and the waker is
106
+ // registered.
107
+ self . poll_next_inner ( cx)
108
+ }
109
+
110
+ SpillReaderStreamState :: ReadInProgress ( task) => {
111
+ let result = futures:: ready!( task. poll_unpin( cx) )
112
+ . unwrap_or_else ( |err| Err ( DataFusionError :: External ( Box :: new ( err) ) ) ) ;
113
+
114
+ match result {
115
+ Ok ( ( reader, batch) ) => {
116
+ match batch {
117
+ Some ( batch) => {
118
+ self . state = SpillReaderStreamState :: Waiting ( reader) ;
119
+
120
+ Poll :: Ready ( Some ( Ok ( batch) ) )
121
+ }
122
+ None => {
123
+ // Stream is done
124
+ self . state = SpillReaderStreamState :: Done ;
125
+
126
+ Poll :: Ready ( None )
127
+ }
128
+ }
129
+ }
130
+ Err ( err) => {
131
+ self . state = SpillReaderStreamState :: Done ;
132
+
133
+ Poll :: Ready ( Some ( Err ( err) ) )
134
+ }
135
+ }
136
+ }
137
+
138
+ SpillReaderStreamState :: Waiting ( _) => {
139
+ // Temporarily replace with `Done` to be able to pass the file to the task.
140
+ let SpillReaderStreamState :: Waiting ( mut reader) =
141
+ std:: mem:: replace ( & mut self . state , SpillReaderStreamState :: Done )
142
+ else {
143
+ unreachable ! ( )
144
+ } ;
145
+
146
+ let task = SpawnedTask :: spawn_blocking ( move || {
147
+ let next_batch = reader. next ( ) . transpose ( ) ?;
148
+
149
+ Ok ( ( reader, next_batch) )
150
+ } ) ;
151
+
152
+ self . state = SpillReaderStreamState :: ReadInProgress ( task) ;
153
+
154
+ // Poll again immediately so the inner task is polled and the waker is
155
+ // registered.
156
+ self . poll_next_inner ( cx)
157
+ }
158
+
159
+ SpillReaderStreamState :: Done => Poll :: Ready ( None ) ,
160
+ }
161
+ }
162
+ }
163
+
164
+ impl Stream for SpillReaderStream {
165
+ type Item = Result < RecordBatch > ;
166
+
167
+ fn poll_next ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
168
+ self . get_mut ( ) . poll_next_inner ( cx)
169
+ }
170
+ }
171
+
172
+ impl RecordBatchStream for SpillReaderStream {
173
+ fn schema ( & self ) -> SchemaRef {
174
+ Arc :: clone ( & self . schema )
46
175
}
47
- Ok ( ( ) )
48
176
}
49
177
50
178
/// Spill the `RecordBatch` to disk as smaller batches
@@ -205,6 +333,7 @@ mod tests {
205
333
use arrow:: record_batch:: RecordBatch ;
206
334
use datafusion_common:: Result ;
207
335
use datafusion_execution:: runtime_env:: RuntimeEnv ;
336
+ use futures:: StreamExt as _;
208
337
209
338
use std:: sync:: Arc ;
210
339
@@ -604,4 +733,42 @@ mod tests {
604
733
605
734
Ok ( ( ) )
606
735
}
736
+
737
+ #[ test]
738
+ fn test_reading_more_spills_than_tokio_blocking_threads ( ) -> Result < ( ) > {
739
+ tokio:: runtime:: Builder :: new_current_thread ( )
740
+ . enable_all ( )
741
+ . max_blocking_threads ( 1 )
742
+ . build ( )
743
+ . unwrap ( )
744
+ . block_on ( async {
745
+ let batch = build_table_i32 (
746
+ ( "a2" , & vec ! [ 0 , 1 , 2 ] ) ,
747
+ ( "b2" , & vec ! [ 3 , 4 , 5 ] ) ,
748
+ ( "c2" , & vec ! [ 4 , 5 , 6 ] ) ,
749
+ ) ;
750
+
751
+ let schema = batch. schema ( ) ;
752
+
753
+ // Construct SpillManager
754
+ let env = Arc :: new ( RuntimeEnv :: default ( ) ) ;
755
+ let metrics = SpillMetrics :: new ( & ExecutionPlanMetricsSet :: new ( ) , 0 ) ;
756
+ let spill_manager = SpillManager :: new ( env, metrics, Arc :: clone ( & schema) ) ;
757
+ let batches: [ _ ; 10 ] = std:: array:: from_fn ( |_| batch. clone ( ) ) ;
758
+
759
+ let spill_file_1 = spill_manager
760
+ . spill_record_batch_and_finish ( & batches, "Test1" ) ?
761
+ . unwrap ( ) ;
762
+ let spill_file_2 = spill_manager
763
+ . spill_record_batch_and_finish ( & batches, "Test2" ) ?
764
+ . unwrap ( ) ;
765
+
766
+ let mut stream_1 = spill_manager. read_spill_as_stream ( spill_file_1) ?;
767
+ let mut stream_2 = spill_manager. read_spill_as_stream ( spill_file_2) ?;
768
+ stream_1. next ( ) . await ;
769
+ stream_2. next ( ) . await ;
770
+
771
+ Ok ( ( ) )
772
+ } )
773
+ }
607
774
}
0 commit comments