-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmithril_turbo_downloader.rs
391 lines (334 loc) · 14.5 KB
/
mithril_turbo_downloader.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
//! Turbo Downloads for Mithril Snapshots.
use std::{
cmp,
ffi::OsStr,
io::{BufReader, Read},
path::{Path, PathBuf},
// process::Stdio,
sync::{
atomic::{AtomicU64, Ordering},
Arc, OnceLock,
},
};
use anyhow::{anyhow, bail};
use async_trait::async_trait;
use catalyst_types::{conversion::from_saturating, mmap_file::MemoryMapFile};
use dashmap::DashSet;
use memx::memcmp;
use mithril_client::{
common::CompressionAlgorithm, snapshot_downloader::SnapshotDownloader, MithrilResult,
};
use tar::{Archive, EntryType};
use tokio::{fs::create_dir_all, task::spawn_blocking};
use tracing::{debug, error};
use zstd::Decoder;
use crate::{
mithril_snapshot_config::MithrilSnapshotConfig,
mithril_snapshot_data::latest_mithril_snapshot_data,
stats::{self},
turbo_downloader::ParallelDownloadProcessor,
};
/// A snapshot downloader that accelerates Download using `aria2`.
pub struct Inner {
/// Configuration for the snapshot sync.
cfg: MithrilSnapshotConfig,
/// Last hashmap/list of changed chunks from the previous download
new_chunks: Arc<DashSet<PathBuf>>,
/// The number of files that were new in this download.
new_files: AtomicU64,
/// The number of files that changed in this download.
chg_files: AtomicU64,
/// The total number of files in the download.
tot_files: AtomicU64,
/// The total size of the files extracted in the download.
ext_size: AtomicU64,
/// The total size of the files we deduplicated.
dedup_size: AtomicU64,
/// The download processor for the current file download.
dl_handler: std::sync::OnceLock<ParallelDownloadProcessor>,
}
/// This macro is what happens every time the file is different from previous.
macro_rules! changed_file {
($self:ident, $rel_file:ident, $abs_file:ident, $new_size:ident) => {
$self.chg_files.fetch_add(1, Ordering::SeqCst);
if $abs_file.extension() == Some(OsStr::new("chunk")) {
$self.new_chunks.insert($abs_file);
}
};
}
/// This macro is what happens every time we decide the file can't be deduplicated.
macro_rules! new_file {
($self:ident, $rel_file:ident, $abs_file:ident, $new_size:ident) => {
$self.new_files.fetch_add(1, Ordering::SeqCst);
if $abs_file.extension() == Some(OsStr::new("chunk")) {
$self.new_chunks.insert($abs_file);
}
};
}
impl Inner {
/// Synchronous Download and Dedup archive.
///
/// Stream Downloads and Decompresses files, and deduplicates them as they are
/// extracted from the embedded tar archive.
///
/// Per Entry:
/// If the file is NOT to be deduplicated, OR A previous file with the same name and
/// size does not exist, then just extract it where its supposed to go.
///
/// To Dedup, the original file is mam-mapped.
/// The new file is extracted to an in-memory buffer.
/// If they compare the same, the original file is `HardLinked` to the new file name.
/// Otherwise the new file buffer is saved to disk with the new file name.
fn dl_and_dedup(&self, _location: &str, _target_dir: &Path) -> MithrilResult<()> {
let mut archive = self.create_archive_extractor()?;
// Iterate the files in the archive.
let entries = match archive.entries() {
Ok(entries) => entries,
Err(error) => bail!("Failed to get entries from the archive: {error}"),
};
let tmp_dir = self.cfg.tmp_path();
let latest_snapshot = latest_mithril_snapshot_data(self.cfg.chain);
for entry in entries {
let mut entry = match entry {
Ok(entry) => entry,
Err(error) => bail!("Failed to get an entry from the archive: {error}"),
};
let rel_file = entry.path()?.to_path_buf();
let entry_size = entry.size();
// debug!(chain = %self.cfg.chain, "DeDup : Extracting {}:{} loc {location} target {}",
// rel_file.to_string_lossy(), entry_size, target_dir.to_string_lossy());
// Check if we need to extract this path or not.
if !self.check_for_extract(&rel_file, entry.header().entry_type()) {
continue;
}
// Count total files processed.
self.tot_files.fetch_add(1, Ordering::SeqCst);
let mut abs_file = tmp_dir.clone();
abs_file.push(rel_file.clone());
let mut prev_file = latest_snapshot.id().path_if_exists();
if let Some(prev_file) = &mut prev_file {
prev_file.push(rel_file.clone());
}
// debug!(chain = %self.cfg.chain, "DeDup : tmp_dir {} abs_file {} prev_file
// {prev_file:?}", tmp_dir.to_string_lossy(), abs_file.to_string_lossy() );
self.ext_size.fetch_add(entry_size, Ordering::SeqCst);
// Try and deduplicate the file if we can, otherwise just extract it.
if let Ok(prev_mmap) = Self::can_deduplicate(&rel_file, entry_size, prev_file.as_ref())
{
let expected_file_size = from_saturating(entry_size);
let mut buf: Vec<u8> = Vec::with_capacity(expected_file_size);
if entry.read_to_end(&mut buf)? != expected_file_size {
bail!(
"Failed to read file {} of size {} got {}",
rel_file.display(),
entry_size,
buf.len()
);
}
// Got the full file and its the expected size. Is it different?
if memcmp(prev_mmap.as_slice(), buf.as_slice()) == cmp::Ordering::Equal {
// Same so lets Hardlink it, and throw away the temp buffer.
// Make sure our big mmap get dropped.
drop(prev_mmap);
// File is the same, so dedup it.
if self.cfg.dedup_tmp(&abs_file, &latest_snapshot).is_ok() {
self.dedup_size.fetch_add(entry_size, Ordering::SeqCst);
changed_file!(self, rel_file, abs_file, entry_size);
drop(buf);
continue;
}
}
if let Err(error) = std::fs::write(&abs_file, buf) {
error!(chain = %self.cfg.chain, "Failed to write file {} got {}", abs_file.display(), error);
bail!("Failed to write file {} got {}", abs_file.display(), error);
}
} else {
// No dedup, just extract it into the tmp directory as-is.
entry.unpack_in(&tmp_dir)?;
debug!(chain = %self.cfg.chain, "DeDup: Extracted file {rel_file:?}:{entry_size}");
}
new_file!(self, rel_file, abs_file, entry_size);
}
let Some(dl_handler) = self.dl_handler.get() else {
bail!("Failed to get the Parallel Download processor!");
};
debug!(chain = %self.cfg.chain, "Download {} bytes", dl_handler.dl_size());
stats::mithril_dl_finished(self.cfg.chain, Some(dl_handler.dl_size()));
Ok(())
}
/// Create a TAR archive extractor from the downloading file and a zstd decompressor.
fn create_archive_extractor(
&self,
) -> MithrilResult<Archive<Decoder<'static, BufReader<BufReader<ParallelDownloadProcessor>>>>>
{
let Some(dl_handler) = self.dl_handler.get() else {
bail!("Failed to get the Parallel Download processor!");
};
let buf_reader = BufReader::new(dl_handler.clone());
let decoder = match zstd::Decoder::new(buf_reader) {
Ok(decoder) => decoder,
Err(error) => bail!("Failed to create ZSTD decoder: {error}"),
};
Ok(tar::Archive::new(decoder))
}
/// Check if we are supposed to extract this file from the archive or not.
fn check_for_extract(&self, path: &Path, extract_type: EntryType) -> bool {
if path.is_absolute() {
error!(chain = %self.cfg.chain, "DeDup : Cannot extract an absolute path: {:?}", path);
return false;
}
if extract_type.is_dir() {
// We don't do anything with just a path, so skip it.
return false;
}
if !extract_type.is_file() {
error!(chain = %self.cfg.chain, "DeDup : Cannot extract a non-file: {:?}:{:?}", path, extract_type);
return false;
}
true
}
/// Check if a given path from the archive is able to be deduplicated.
fn can_deduplicate(
rel_file: &Path, file_size: u64, prev_file: Option<&PathBuf>,
) -> MithrilResult<MemoryMapFile> {
// Can't dedup if the current file is not de-dupable (must be immutable)
if rel_file.starts_with("immutable") {
// Can't dedup if we don't have a previous file to dedup against.
if let Some(prev_file) = prev_file {
if let Some(current_size) = get_file_size_sync(prev_file) {
// If the current file is not exactly the same as the previous file size, we
// can't dedup.
if file_size == current_size {
if let Ok(pref_file_loaded) = Self::mmap_open_sync(prev_file) {
if pref_file_loaded.size() == file_size {
return Ok(pref_file_loaded);
}
}
}
}
}
}
bail!("Can not deduplicate.");
}
/// Open a file using mmap for performance.
fn mmap_open_sync(path: &Path) -> MithrilResult<MemoryMapFile> {
match MemoryMapFile::try_from(path) {
Ok(mmap_file) => Ok(mmap_file),
Err(error) => {
error!(error=%error, file=%path.to_string_lossy(), "Failed to open file");
Err(error.into())
},
}
}
}
/// A snapshot downloader that accelerates Download using `aria2`.
pub struct MithrilTurboDownloader {
/// inner arc wrapped configuration
inner: Arc<Inner>,
}
impl MithrilTurboDownloader {
/// Constructs a new `HttpSnapshotDownloader`.
pub fn new(cfg: MithrilSnapshotConfig) -> Self {
// Test if the HTTP Client can properly be created.
let dl_config = cfg.dl_config.clone().unwrap_or_default();
let cfg = cfg.with_dl_config(dl_config);
Self {
inner: Arc::new(Inner {
cfg,
new_chunks: Arc::new(DashSet::new()),
new_files: AtomicU64::new(0),
chg_files: AtomicU64::new(0),
tot_files: AtomicU64::new(0),
ext_size: AtomicU64::new(0),
dedup_size: AtomicU64::new(0),
dl_handler: OnceLock::new(),
}),
}
}
/// Take the hashmap for the previous download.
pub fn get_new_chunks(&self) -> Arc<DashSet<PathBuf>> {
self.inner.new_chunks.clone()
}
/// Create directories required to exist for download to succeed.
async fn create_directories(&self, target_dir: &Path) -> MithrilResult<()> {
if let Err(error) = create_dir_all(target_dir).await {
let msg = format!(
"Target directory {} could not be created: {}",
target_dir.to_string_lossy(),
error
);
Err(anyhow!(msg.clone()).context(msg))?;
}
Ok(())
}
/// Parallel Download, Extract and Dedup the Mithril Archive.
async fn dl_and_dedup(&self, location: &str, target_dir: &Path) -> MithrilResult<()> {
// Get a copy of the inner data to use in the sync download task.
let inner = self.inner.clone();
let location = location.to_owned();
let target_dir = target_dir.to_owned();
// This is fully synchronous IO, so do it on a sync thread.
let result = spawn_blocking(move || {
stats::start_thread(
inner.cfg.chain,
stats::thread::name::MITHRIL_DL_DEDUP,
false,
);
let result = inner.dl_and_dedup(&location, &target_dir);
stats::stop_thread(inner.cfg.chain, stats::thread::name::MITHRIL_DL_DEDUP);
result
})
.await;
if let Ok(result) = result {
return result;
}
stats::mithril_dl_finished(self.inner.cfg.chain, None);
bail!("Download and Dedup task failed");
}
}
/// Get the size of a particular file. None = failed to get size (doesn't matter why).
fn get_file_size_sync(file: &Path) -> Option<u64> {
let Ok(metadata) = file.metadata() else {
return None;
};
Some(metadata.len())
}
#[async_trait]
impl SnapshotDownloader for MithrilTurboDownloader {
async fn download_unpack(
&self, location: &str, target_dir: &Path, _compression_algorithm: CompressionAlgorithm,
_download_id: &str, _snapshot_size: u64,
) -> MithrilResult<()> {
self.create_directories(target_dir).await?;
// DL Start stats set after DL actually started inside the probe call.
self.dl_and_dedup(location, target_dir).await?;
let tot_files = self.inner.tot_files.load(Ordering::SeqCst);
let chg_files = self.inner.chg_files.load(Ordering::SeqCst);
let new_files = self.inner.new_files.load(Ordering::SeqCst);
stats::mithril_extract_finished(
self.inner.cfg.chain,
Some(self.inner.ext_size.load(Ordering::SeqCst)),
self.inner.dedup_size.load(Ordering::SeqCst),
tot_files - (chg_files + new_files),
chg_files,
new_files,
);
debug!("Download and Unpack finished='{location}' to '{target_dir:?}'.");
Ok(())
}
async fn probe(&self, location: &str) -> MithrilResult<()> {
debug!("Probe Snapshot location='{location}'.");
let dl_config = self.inner.cfg.dl_config.clone().unwrap_or_default();
let dl_processor =
ParallelDownloadProcessor::new(location, dl_config, self.inner.cfg.chain).await?;
// Decompress and extract and de-dupe each file in the archive.
stats::mithril_extract_started(self.inner.cfg.chain);
// We also immediately start downloading now.
stats::mithril_dl_started(self.inner.cfg.chain);
// Save the DownloadProcessor in the inner struct for use to process the downloaded data.
if let Err(_error) = self.inner.dl_handler.set(dl_processor) {
bail!("Failed to set the inner dl_handler. Must already be set?");
}
Ok(())
}
}