1
1
use crate :: error:: Result ;
2
2
use crate :: storage:: { FileRange , compression:: CompressionAlgorithm } ;
3
3
use anyhow:: { Context as _, bail} ;
4
- use rusqlite:: { Connection , OpenFlags , OptionalExtension } ;
4
+ use itertools:: Itertools as _;
5
+ use sqlx:: { Acquire as _, QueryBuilder , Row as _, Sqlite } ;
5
6
use std:: { fs, io, path:: Path } ;
6
7
use tracing:: instrument;
7
8
@@ -20,97 +21,158 @@ impl FileInfo {
20
21
}
21
22
}
22
23
24
+ /// crates a new empty SQLite database, and returns a configured connection
25
+ /// pool to connect to the DB.
26
+ /// Any existing DB at the given path will be deleted first.
27
+ async fn sqlite_create < P : AsRef < Path > > ( path : P ) -> Result < sqlx:: SqlitePool > {
28
+ let path = path. as_ref ( ) ;
29
+ if path. exists ( ) {
30
+ fs:: remove_file ( path) ?;
31
+ }
32
+
33
+ sqlx:: SqlitePool :: connect_with (
34
+ sqlx:: sqlite:: SqliteConnectOptions :: new ( )
35
+ . filename ( path)
36
+ . read_only ( false )
37
+ . pragma ( "synchronous" , "full" )
38
+ . create_if_missing ( true ) ,
39
+ )
40
+ . await
41
+ . map_err ( Into :: into)
42
+ }
43
+
44
+ /// open existing SQLite database, return a configured connection poll
45
+ /// to connect to the DB.
46
+ /// Will error when the database doesn't exist at that path.
47
+ async fn sqlite_open < P : AsRef < Path > > ( path : P ) -> Result < sqlx:: SqlitePool > {
48
+ sqlx:: SqlitePool :: connect_with (
49
+ sqlx:: sqlite:: SqliteConnectOptions :: new ( )
50
+ . filename ( path)
51
+ . read_only ( true )
52
+ . pragma ( "synchronous" , "off" ) // not needed for readonly db
53
+ . serialized ( false ) // same as OPEN_NOMUTEX
54
+ . create_if_missing ( false ) ,
55
+ )
56
+ . await
57
+ . map_err ( Into :: into)
58
+ }
59
+
23
60
/// create an archive index based on a zipfile.
24
61
///
25
62
/// Will delete the destination file if it already exists.
26
63
#[ instrument( skip( zipfile) ) ]
27
- pub ( crate ) fn create < R : io:: Read + io:: Seek , P : AsRef < Path > + std:: fmt:: Debug > (
64
+ pub ( crate ) async fn create < R : io:: Read + io:: Seek , P : AsRef < Path > + std:: fmt:: Debug > (
28
65
zipfile : & mut R ,
29
66
destination : P ,
30
67
) -> Result < ( ) > {
31
- let destination = destination. as_ref ( ) ;
32
- if destination. exists ( ) {
33
- fs:: remove_file ( destination) ?;
34
- }
68
+ let pool = sqlite_create ( destination) . await ?;
69
+ let mut conn = pool. acquire ( ) . await ?;
70
+ let mut tx = conn. begin ( ) . await ?;
35
71
36
- let conn = rusqlite:: Connection :: open ( destination) ?;
37
- conn. execute ( "PRAGMA synchronous = FULL" , ( ) ) ?;
38
- conn. execute ( "BEGIN" , ( ) ) ?;
39
- conn. execute (
40
- "
72
+ sqlx:: query (
73
+ r#"
41
74
CREATE TABLE files (
42
75
id INTEGER PRIMARY KEY,
43
76
path TEXT UNIQUE,
44
77
start INTEGER,
45
78
end INTEGER,
46
79
compression INTEGER
47
80
);
48
- " ,
49
- ( ) ,
50
- ) ?;
81
+ "# ,
82
+ )
83
+ . execute ( & mut * tx)
84
+ . await ?;
51
85
52
86
let mut archive = zip:: ZipArchive :: new ( zipfile) ?;
53
87
let compression_bzip = CompressionAlgorithm :: Bzip2 as i32 ;
54
88
55
- for i in 0 ..archive. len ( ) {
56
- let zf = archive. by_index ( i) ?;
57
-
58
- conn. execute (
59
- "INSERT INTO files (path, start, end, compression) VALUES (?, ?, ?, ?)" ,
60
- (
61
- zf. name ( ) ,
62
- zf. data_start ( ) ,
63
- zf. data_start ( ) + zf. compressed_size ( ) - 1 ,
64
- match zf. compression ( ) {
65
- zip:: CompressionMethod :: Bzip2 => compression_bzip,
66
- c => bail ! ( "unsupported compression algorithm {} in zip-file" , c) ,
67
- } ,
68
- ) ,
69
- ) ?;
89
+ const CHUNKS : usize = 1000 ;
90
+ for chunk in & ( 0 ..archive. len ( ) ) . chunks ( CHUNKS ) {
91
+ for i in chunk {
92
+ let mut insert_stmt =
93
+ QueryBuilder :: < Sqlite > :: new ( "INSERT INTO files (path, start, end, compression) " ) ;
94
+
95
+ let entry = archive. by_index ( i) ?;
96
+
97
+ let start = entry. data_start ( ) as i64 ;
98
+ let end = ( entry. data_start ( ) + entry. compressed_size ( ) - 1 ) as i64 ;
99
+ let compression_raw = match entry. compression ( ) {
100
+ zip:: CompressionMethod :: Bzip2 => compression_bzip,
101
+ c => bail ! ( "unsupported compression algorithm {} in zip-file" , c) ,
102
+ } ;
103
+
104
+ insert_stmt. push_values ( [ ( ) ] , |mut b, _| {
105
+ b. push_bind ( entry. name ( ) )
106
+ . push_bind ( start)
107
+ . push_bind ( end)
108
+ . push_bind ( compression_raw) ;
109
+ } ) ;
110
+ insert_stmt
111
+ . build ( )
112
+ . persistent ( false )
113
+ . execute ( & mut * tx)
114
+ . await ?;
115
+ }
70
116
}
71
- conn. execute ( "CREATE INDEX idx_files_path ON files (path);" , ( ) ) ?;
72
- conn. execute ( "END" , ( ) ) ?;
73
- conn. execute ( "VACUUM" , ( ) ) ?;
117
+
118
+ sqlx:: query ( "CREATE INDEX idx_files_path ON files (path);" )
119
+ . execute ( & mut * tx)
120
+ . await ?;
121
+
122
+ // Commit the transaction before VACUUM (VACUUM cannot run inside a transaction)
123
+ tx. commit ( ) . await ?;
124
+
125
+ // VACUUM outside the transaction
126
+ sqlx:: query ( "VACUUM" ) . execute ( & mut * conn) . await ?;
127
+
74
128
Ok ( ( ) )
75
129
}
76
130
77
- fn find_in_sqlite_index ( conn : & Connection , search_for : & str ) -> Result < Option < FileInfo > > {
78
- let mut stmt = conn. prepare (
131
+ async fn find_in_sqlite_index < ' e , E > ( executor : E , search_for : & str ) -> Result < Option < FileInfo > >
132
+ where
133
+ E : sqlx:: Executor < ' e , Database = sqlx:: Sqlite > ,
134
+ {
135
+ let row = sqlx:: query (
79
136
"
80
137
SELECT start, end, compression
81
138
FROM files
82
139
WHERE path = ?
83
140
" ,
84
- ) ?;
85
-
86
- stmt. query_row ( ( search_for, ) , |row| {
87
- let compression: i32 = row. get ( 2 ) ?;
88
-
89
- Ok ( FileInfo {
90
- range : row. get ( 0 ) ?..=row. get ( 1 ) ?,
91
- compression : compression. try_into ( ) . map_err ( |value| {
92
- rusqlite:: Error :: FromSqlConversionFailure (
93
- 2 ,
94
- rusqlite:: types:: Type :: Integer ,
95
- format ! ( "invalid compression algorithm '{value}' in database" ) . into ( ) ,
96
- )
141
+ )
142
+ . bind ( search_for)
143
+ . fetch_optional ( executor)
144
+ . await
145
+ . context ( "error fetching SQLite data" ) ?;
146
+
147
+ if let Some ( row) = row {
148
+ let start: u64 = row. try_get ( 0 ) ?;
149
+ let end: u64 = row. try_get ( 1 ) ?;
150
+ let compression_raw: i32 = row. try_get ( 2 ) ?;
151
+
152
+ Ok ( Some ( FileInfo {
153
+ range : start..=end,
154
+ compression : compression_raw. try_into ( ) . map_err ( |value| {
155
+ anyhow:: anyhow!( format!(
156
+ "invalid compression algorithm '{value}' in database"
157
+ ) )
97
158
} ) ?,
98
- } )
99
- } )
100
- . optional ( )
101
- . context ( "error fetching SQLite data" )
159
+ } ) )
160
+ } else {
161
+ Ok ( None )
162
+ }
102
163
}
103
164
104
165
#[ instrument]
105
- pub ( crate ) fn find_in_file < P : AsRef < Path > + std:: fmt:: Debug > (
166
+ pub ( crate ) async fn find_in_file < P : AsRef < Path > + std:: fmt:: Debug > (
106
167
archive_index_path : P ,
107
168
search_for : & str ,
108
169
) -> Result < Option < FileInfo > > {
109
- let connection = Connection :: open_with_flags (
110
- archive_index_path,
111
- OpenFlags :: SQLITE_OPEN_READ_ONLY | OpenFlags :: SQLITE_OPEN_NO_MUTEX ,
112
- ) ?;
113
- find_in_sqlite_index ( & connection, search_for)
170
+ let path = archive_index_path. as_ref ( ) ;
171
+
172
+ let pool = sqlite_open ( path) . await ?;
173
+ let mut conn = pool. acquire ( ) . await ?;
174
+
175
+ find_in_sqlite_index ( & mut * conn, search_for) . await
114
176
}
115
177
116
178
#[ cfg( test) ]
@@ -138,43 +200,38 @@ mod tests {
138
200
tf
139
201
}
140
202
141
- #[ test]
142
- fn index_create_save_load_sqlite ( ) {
203
+ #[ tokio :: test]
204
+ async fn index_create_save_load_sqlite ( ) -> Result < ( ) > {
143
205
let mut tf = create_test_archive ( 1 ) ;
144
206
145
207
let tempfile = tempfile:: NamedTempFile :: new ( ) . unwrap ( ) . into_temp_path ( ) ;
146
- create ( & mut tf, & tempfile) . unwrap ( ) ;
208
+ create ( & mut tf, & tempfile) . await ? ;
147
209
148
- let fi = find_in_file ( & tempfile, "testfile0" ) . unwrap ( ) . unwrap ( ) ;
210
+ let fi = find_in_file ( & tempfile, "testfile0" ) . await ? . unwrap ( ) ;
149
211
150
212
assert_eq ! ( fi. range, FileRange :: new( 39 , 459 ) ) ;
151
213
assert_eq ! ( fi. compression, CompressionAlgorithm :: Bzip2 ) ;
152
214
153
- assert ! (
154
- find_in_file( & tempfile, "some_other_file" , )
155
- . unwrap( )
156
- . is_none( )
157
- ) ;
215
+ assert ! ( find_in_file( & tempfile, "some_other_file" , ) . await ?. is_none( ) ) ;
216
+ Ok ( ( ) )
158
217
}
159
218
160
- #[ test]
161
- fn archive_with_more_than_65k_files ( ) {
219
+ #[ tokio :: test]
220
+ async fn archive_with_more_than_65k_files ( ) -> Result < ( ) > {
162
221
let mut tf = create_test_archive ( 100_000 ) ;
163
222
164
- let tempfile = tempfile:: NamedTempFile :: new ( ) . unwrap ( ) . into_temp_path ( ) ;
165
- create ( & mut tf, & tempfile) . unwrap ( ) ;
166
-
167
- let connection = Connection :: open_with_flags (
168
- tempfile,
169
- OpenFlags :: SQLITE_OPEN_READ_ONLY | OpenFlags :: SQLITE_OPEN_NO_MUTEX ,
170
- )
171
- . unwrap ( ) ;
172
- let mut stmt = connection. prepare ( "SELECT count(*) FROM files" ) . unwrap ( ) ;
173
-
174
- let count = stmt
175
- . query_row ( [ ] , |row| Ok ( row. get :: < _ , usize > ( 0 ) ) )
176
- . unwrap ( )
177
- . unwrap ( ) ;
178
- assert_eq ! ( count, 100_000 ) ;
223
+ let tempfile = tempfile:: NamedTempFile :: new ( ) ?. into_temp_path ( ) ;
224
+ create ( & mut tf, & tempfile) . await ?;
225
+
226
+ let pool = sqlite_open ( & tempfile) . await ?;
227
+ let mut conn = pool. acquire ( ) . await ?;
228
+
229
+ let row = sqlx:: query ( "SELECT count(*) FROM files" )
230
+ . fetch_one ( & mut * conn)
231
+ . await ?;
232
+
233
+ assert_eq ! ( row. get:: <i64 , _>( 0 ) , 100_000 ) ;
234
+
235
+ Ok ( ( ) )
179
236
}
180
237
}
0 commit comments