20
20
use std:: {
21
21
fs:: { remove_file, File } ,
22
22
io:: BufReader ,
23
- path:: PathBuf ,
23
+ path:: { Path , PathBuf } ,
24
24
sync:: Arc ,
25
25
} ;
26
26
27
27
use arrow_array:: { RecordBatch , TimestampMillisecondArray } ;
28
28
use arrow_ipc:: reader:: FileReader ;
29
- use arrow_schema:: Schema ;
29
+ use arrow_schema:: { ArrowError , Schema , SchemaRef } ;
30
30
use itertools:: kmerge_by;
31
31
use tracing:: error;
32
32
@@ -35,28 +35,58 @@ use crate::{
35
35
utils:: arrow:: { adapt_batch, reverse} ,
36
36
} ;
37
37
38
+ #[ derive( Debug ) ]
39
+ pub struct ReverseReader {
40
+ inner : FileReader < BufReader < File > > ,
41
+ idx : usize ,
42
+ }
43
+
44
+ impl ReverseReader {
45
+ fn try_new ( path : impl AsRef < Path > ) -> Result < Self , ArrowError > {
46
+ let inner = FileReader :: try_new ( BufReader :: new ( File :: open ( path) . unwrap ( ) ) , None ) ?;
47
+ let idx = inner. num_batches ( ) ;
48
+
49
+ Ok ( Self { inner, idx } )
50
+ }
51
+
52
+ fn schema ( & self ) -> SchemaRef {
53
+ self . inner . schema ( )
54
+ }
55
+ }
56
+
57
+ impl Iterator for ReverseReader {
58
+ type Item = Result < RecordBatch , ArrowError > ;
59
+
60
+ fn next ( & mut self ) -> Option < Self :: Item > {
61
+ if self . idx <= 0 {
62
+ return None ;
63
+ }
64
+
65
+ self . idx -= 1 ;
66
+ if let Err ( e) = self . inner . set_index ( self . idx ) {
67
+ return Some ( Err ( e) ) ;
68
+ }
69
+
70
+ self . inner . next ( )
71
+ }
72
+ }
73
+
38
74
#[ derive( Debug ) ]
39
75
pub struct MergedRecordReader {
40
- pub readers : Vec < FileReader < BufReader < File > > > ,
76
+ pub readers : Vec < ReverseReader > ,
41
77
}
42
78
43
79
impl MergedRecordReader {
44
- pub fn new ( files : & [ PathBuf ] ) -> Self {
45
- let mut readers = Vec :: with_capacity ( files . len ( ) ) ;
80
+ pub fn new ( paths : & [ PathBuf ] ) -> Self {
81
+ let mut readers = Vec :: with_capacity ( paths . len ( ) ) ;
46
82
47
- for file in files {
83
+ for path in paths {
48
84
//remove empty files before reading
49
- if file . metadata ( ) . unwrap ( ) . len ( ) == 0 {
50
- error ! ( "Invalid file detected, removing it: {:?}" , file ) ;
51
- remove_file ( file ) . unwrap ( ) ;
85
+ if path . metadata ( ) . unwrap ( ) . len ( ) == 0 {
86
+ error ! ( "Invalid file detected, removing it: {path :?}" ) ;
87
+ remove_file ( path ) . unwrap ( ) ;
52
88
} else {
53
- let Ok ( reader) =
54
- FileReader :: try_new ( BufReader :: new ( File :: open ( file) . unwrap ( ) ) , None )
55
- else {
56
- error ! ( "Invalid file detected, ignoring it: {:?}" , file) ;
57
- continue ;
58
- } ;
59
-
89
+ let reader = ReverseReader :: try_new ( path) . unwrap ( ) ;
60
90
readers. push ( reader) ;
61
91
}
62
92
}
0 commit comments