@@ -23,28 +23,161 @@ pub(crate) mod spill_manager;
23
23
use std:: fs:: File ;
24
24
use std:: io:: BufReader ;
25
25
use std:: path:: { Path , PathBuf } ;
26
+ use std:: pin:: Pin ;
26
27
use std:: ptr:: NonNull ;
28
+ use std:: sync:: Arc ;
29
+ use std:: task:: { Context , Poll } ;
27
30
28
31
use arrow:: array:: ArrayData ;
29
32
use arrow:: datatypes:: { Schema , SchemaRef } ;
30
33
use arrow:: ipc:: { reader:: StreamReader , writer:: StreamWriter } ;
31
34
use arrow:: record_batch:: RecordBatch ;
32
- use tokio:: sync:: mpsc:: Sender ;
33
-
34
- use datafusion_common:: { exec_datafusion_err, HashSet , Result } ;
35
-
36
- fn read_spill ( sender : Sender < Result < RecordBatch > > , path : & Path ) -> Result < ( ) > {
37
- let file = BufReader :: new ( File :: open ( path) ?) ;
38
- // SAFETY: DataFusion's spill writer strictly follows Arrow IPC specifications
39
- // with validated schemas and buffers. Skip redundant validation during read
40
- // to speedup read operation. This is safe for DataFusion as input guaranteed to be correct when written.
41
- let reader = unsafe { StreamReader :: try_new ( file, None ) ?. with_skip_validation ( true ) } ;
42
- for batch in reader {
43
- sender
44
- . blocking_send ( batch. map_err ( Into :: into) )
45
- . map_err ( |e| exec_datafusion_err ! ( "{e}" ) ) ?;
35
+
36
+ use datafusion_common:: { exec_datafusion_err, DataFusionError , HashSet , Result } ;
37
+ use datafusion_common_runtime:: SpawnedTask ;
38
+ use datafusion_execution:: disk_manager:: RefCountedTempFile ;
39
+ use datafusion_execution:: RecordBatchStream ;
40
+ use futures:: { FutureExt as _, Stream } ;
41
+
42
+ /// Stream that reads spill files from disk where each batch is read in a spawned blocking task
43
+ /// It will read one batch at a time and will not do any buffering, to buffer data use [`crate::common::spawn_buffered`]
44
+ ///
45
+ /// A simpler solution would be spawning a long-running blocking task for each
46
+ /// file read (instead of each batch). This approach does not work because when
47
+ /// the number of concurrent reads exceeds the Tokio thread pool limit,
48
+ /// deadlocks can occur and block progress.
49
+ struct SpillReaderStream {
50
+ schema : SchemaRef ,
51
+ state : SpillReaderStreamState ,
52
+ }
53
+
54
+ /// When we poll for the next batch, we will get back both the batch and the reader,
55
+ /// so we can call `next` again.
56
+ type NextRecordBatchResult = Result < ( StreamReader < BufReader < File > > , Option < RecordBatch > ) > ;
57
+
58
+ enum SpillReaderStreamState {
59
+ /// Initial state: the stream was not initialized yet
60
+ /// and the file was not opened
61
+ Uninitialized ( RefCountedTempFile ) ,
62
+
63
+ /// A read is in progress in a spawned blocking task for which we hold the handle.
64
+ ReadInProgress ( SpawnedTask < NextRecordBatchResult > ) ,
65
+
66
+ /// A read has finished and we wait for being polled again in order to start reading the next batch.
67
+ Waiting ( StreamReader < BufReader < File > > ) ,
68
+
69
+ /// The stream has finished, successfully or not.
70
+ Done ,
71
+ }
72
+
73
+ impl SpillReaderStream {
74
+ fn new ( schema : SchemaRef , spill_file : RefCountedTempFile ) -> Self {
75
+ Self {
76
+ schema,
77
+ state : SpillReaderStreamState :: Uninitialized ( spill_file) ,
78
+ }
79
+ }
80
+
81
+ fn poll_next_inner (
82
+ & mut self ,
83
+ cx : & mut Context < ' _ > ,
84
+ ) -> Poll < Option < Result < RecordBatch > > > {
85
+ match & mut self . state {
86
+ SpillReaderStreamState :: Uninitialized ( _) => {
87
+ // Temporarily replace with `Done` to be able to pass the file to the task.
88
+ let SpillReaderStreamState :: Uninitialized ( spill_file) =
89
+ std:: mem:: replace ( & mut self . state , SpillReaderStreamState :: Done )
90
+ else {
91
+ unreachable ! ( )
92
+ } ;
93
+
94
+ let task = SpawnedTask :: spawn_blocking ( move || {
95
+ let file = BufReader :: new ( File :: open ( spill_file. path ( ) ) ?) ;
96
+ // SAFETY: DataFusion's spill writer strictly follows Arrow IPC specifications
97
+ // with validated schemas and buffers. Skip redundant validation during read
98
+ // to speedup read operation. This is safe for DataFusion as input guaranteed to be correct when written.
99
+ let mut reader = unsafe {
100
+ StreamReader :: try_new ( file, None ) ?. with_skip_validation ( true )
101
+ } ;
102
+
103
+ let next_batch = reader. next ( ) . transpose ( ) ?;
104
+
105
+ Ok ( ( reader, next_batch) )
106
+ } ) ;
107
+
108
+ self . state = SpillReaderStreamState :: ReadInProgress ( task) ;
109
+
110
+ // Poll again immediately so the inner task is polled and the waker is
111
+ // registered.
112
+ self . poll_next_inner ( cx)
113
+ }
114
+
115
+ SpillReaderStreamState :: ReadInProgress ( task) => {
116
+ let result = futures:: ready!( task. poll_unpin( cx) )
117
+ . unwrap_or_else ( |err| Err ( DataFusionError :: External ( Box :: new ( err) ) ) ) ;
118
+
119
+ match result {
120
+ Ok ( ( reader, batch) ) => {
121
+ match batch {
122
+ Some ( batch) => {
123
+ self . state = SpillReaderStreamState :: Waiting ( reader) ;
124
+
125
+ Poll :: Ready ( Some ( Ok ( batch) ) )
126
+ }
127
+ None => {
128
+ // Stream is done
129
+ self . state = SpillReaderStreamState :: Done ;
130
+
131
+ Poll :: Ready ( None )
132
+ }
133
+ }
134
+ }
135
+ Err ( err) => {
136
+ self . state = SpillReaderStreamState :: Done ;
137
+
138
+ Poll :: Ready ( Some ( Err ( err) ) )
139
+ }
140
+ }
141
+ }
142
+
143
+ SpillReaderStreamState :: Waiting ( _) => {
144
+ // Temporarily replace with `Done` to be able to pass the file to the task.
145
+ let SpillReaderStreamState :: Waiting ( mut reader) =
146
+ std:: mem:: replace ( & mut self . state , SpillReaderStreamState :: Done )
147
+ else {
148
+ unreachable ! ( )
149
+ } ;
150
+
151
+ let task = SpawnedTask :: spawn_blocking ( move || {
152
+ let next_batch = reader. next ( ) . transpose ( ) ?;
153
+
154
+ Ok ( ( reader, next_batch) )
155
+ } ) ;
156
+
157
+ self . state = SpillReaderStreamState :: ReadInProgress ( task) ;
158
+
159
+ // Poll again immediately so the inner task is polled and the waker is
160
+ // registered.
161
+ self . poll_next_inner ( cx)
162
+ }
163
+
164
+ SpillReaderStreamState :: Done => Poll :: Ready ( None ) ,
165
+ }
166
+ }
167
+ }
168
+
169
+ impl Stream for SpillReaderStream {
170
+ type Item = Result < RecordBatch > ;
171
+
172
+ fn poll_next ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
173
+ self . get_mut ( ) . poll_next_inner ( cx)
174
+ }
175
+ }
176
+
177
+ impl RecordBatchStream for SpillReaderStream {
178
+ fn schema ( & self ) -> SchemaRef {
179
+ Arc :: clone ( & self . schema )
46
180
}
47
- Ok ( ( ) )
48
181
}
49
182
50
183
/// Spill the `RecordBatch` to disk as smaller batches
@@ -205,6 +338,7 @@ mod tests {
205
338
use arrow:: record_batch:: RecordBatch ;
206
339
use datafusion_common:: Result ;
207
340
use datafusion_execution:: runtime_env:: RuntimeEnv ;
341
+ use futures:: StreamExt as _;
208
342
209
343
use std:: sync:: Arc ;
210
344
@@ -604,4 +738,42 @@ mod tests {
604
738
605
739
Ok ( ( ) )
606
740
}
741
+
742
+ #[ test]
743
+ fn test_reading_more_spills_than_tokio_blocking_threads ( ) -> Result < ( ) > {
744
+ tokio:: runtime:: Builder :: new_current_thread ( )
745
+ . enable_all ( )
746
+ . max_blocking_threads ( 1 )
747
+ . build ( )
748
+ . unwrap ( )
749
+ . block_on ( async {
750
+ let batch = build_table_i32 (
751
+ ( "a2" , & vec ! [ 0 , 1 , 2 ] ) ,
752
+ ( "b2" , & vec ! [ 3 , 4 , 5 ] ) ,
753
+ ( "c2" , & vec ! [ 4 , 5 , 6 ] ) ,
754
+ ) ;
755
+
756
+ let schema = batch. schema ( ) ;
757
+
758
+ // Construct SpillManager
759
+ let env = Arc :: new ( RuntimeEnv :: default ( ) ) ;
760
+ let metrics = SpillMetrics :: new ( & ExecutionPlanMetricsSet :: new ( ) , 0 ) ;
761
+ let spill_manager = SpillManager :: new ( env, metrics, Arc :: clone ( & schema) ) ;
762
+ let batches: [ _ ; 10 ] = std:: array:: from_fn ( |_| batch. clone ( ) ) ;
763
+
764
+ let spill_file_1 = spill_manager
765
+ . spill_record_batch_and_finish ( & batches, "Test1" ) ?
766
+ . unwrap ( ) ;
767
+ let spill_file_2 = spill_manager
768
+ . spill_record_batch_and_finish ( & batches, "Test2" ) ?
769
+ . unwrap ( ) ;
770
+
771
+ let mut stream_1 = spill_manager. read_spill_as_stream ( spill_file_1) ?;
772
+ let mut stream_2 = spill_manager. read_spill_as_stream ( spill_file_2) ?;
773
+ stream_1. next ( ) . await ;
774
+ stream_2. next ( ) . await ;
775
+
776
+ Ok ( ( ) )
777
+ } )
778
+ }
607
779
}
0 commit comments