Skip to content

Commit 3f50d6b

Browse files
committed
use spawn_blocking in storage store_all & store_all_in_archive
1 parent 29dbb4c commit 3f50d6b

File tree

1 file changed

+96
-72
lines changed

1 file changed

+96
-72
lines changed

src/storage/mod.rs

Lines changed: 96 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -367,47 +367,64 @@ impl AsyncStorage {
367367
archive_path: &str,
368368
root_dir: &Path,
369369
) -> Result<(HashMap<PathBuf, String>, CompressionAlgorithm)> {
370-
let mut file_paths = HashMap::new();
371-
372-
// We are only using the `zip` library to create the archives and the matching
373-
// index-file. The ZIP format allows more compression formats, and these can even be mixed
374-
// in a single archive.
375-
//
376-
// Decompression happens by fetching only the part of the remote archive that contains
377-
// the compressed stream of the object we put into the archive.
378-
// For decompression we are sharing the compression algorithms defined in
379-
// `storage::compression`. So every new algorithm to be used inside ZIP archives
380-
// also has to be added as supported algorithm for storage compression, together
381-
// with a mapping in `storage::archive_index::Index::new_from_zip`.
382-
383-
let options =
384-
zip::write::FileOptions::default().compression_method(zip::CompressionMethod::Bzip2);
385-
386-
let mut zip = zip::ZipWriter::new(io::Cursor::new(Vec::new()));
387-
for file_path in get_file_list(root_dir)? {
388-
let mut file = fs::File::open(root_dir.join(&file_path))?;
389-
390-
zip.start_file(file_path.to_str().unwrap(), options)?;
391-
io::copy(&mut file, &mut zip)?;
392-
393-
let mime = detect_mime(&file_path);
394-
file_paths.insert(file_path, mime.to_string());
395-
}
396-
397-
let mut zip_content = zip.finish()?.into_inner();
398-
399-
let remote_index_path = format!("{}.index", &archive_path);
400-
401-
let local_index_path = self
402-
.config
403-
.local_archive_cache_path
404-
.join(&remote_index_path);
405-
fs::create_dir_all(local_index_path.parent().unwrap())?;
406-
archive_index::create(&mut io::Cursor::new(&mut zip_content), &local_index_path)?;
370+
let (zip_content, compressed_index_content, alg, remote_index_path, file_paths) =
371+
spawn_blocking({
372+
let archive_path = archive_path.to_owned();
373+
let root_dir = root_dir.to_owned();
374+
let local_archive_cache_path = self.config.local_archive_cache_path.clone();
375+
376+
move || {
377+
let mut file_paths = HashMap::new();
378+
379+
// We are only using the `zip` library to create the archives and the matching
380+
// index-file. The ZIP format allows more compression formats, and these can even be mixed
381+
// in a single archive.
382+
//
383+
// Decompression happens by fetching only the part of the remote archive that contains
384+
// the compressed stream of the object we put into the archive.
385+
// For decompression we are sharing the compression algorithms defined in
386+
// `storage::compression`. So every new algorithm to be used inside ZIP archives
387+
// also has to be added as supported algorithm for storage compression, together
388+
// with a mapping in `storage::archive_index::Index::new_from_zip`.
389+
390+
let options = zip::write::FileOptions::default()
391+
.compression_method(zip::CompressionMethod::Bzip2);
392+
393+
let mut zip = zip::ZipWriter::new(io::Cursor::new(Vec::new()));
394+
for file_path in get_file_list(&root_dir)? {
395+
let mut file = fs::File::open(&root_dir.join(&file_path))?;
396+
397+
zip.start_file(file_path.to_str().unwrap(), options)?;
398+
io::copy(&mut file, &mut zip)?;
399+
400+
let mime = detect_mime(&file_path);
401+
file_paths.insert(file_path, mime.to_string());
402+
}
407403

408-
let alg = CompressionAlgorithm::default();
409-
let compressed_index_content =
410-
compress(BufReader::new(fs::File::open(&local_index_path)?), alg)?;
404+
let mut zip_content = zip.finish()?.into_inner();
405+
406+
let remote_index_path = format!("{}.index", &archive_path);
407+
408+
let local_index_path = local_archive_cache_path.join(&remote_index_path);
409+
fs::create_dir_all(local_index_path.parent().unwrap())?;
410+
archive_index::create(
411+
&mut io::Cursor::new(&mut zip_content),
412+
&local_index_path,
413+
)?;
414+
415+
let alg = CompressionAlgorithm::default();
416+
let compressed_index_content =
417+
compress(BufReader::new(fs::File::open(&local_index_path)?), alg)?;
418+
Ok((
419+
zip_content,
420+
compressed_index_content,
421+
alg,
422+
remote_index_path,
423+
file_paths,
424+
))
425+
}
426+
})
427+
.await?;
411428

412429
self.store_inner(vec![
413430
Blob {
@@ -439,38 +456,45 @@ impl AsyncStorage {
439456
prefix: &Path,
440457
root_dir: &Path,
441458
) -> Result<(HashMap<PathBuf, String>, HashSet<CompressionAlgorithm>)> {
442-
let mut file_paths_and_mimes = HashMap::new();
443-
let mut algs = HashSet::with_capacity(1);
444-
445-
let blobs: Vec<_> = get_file_list(root_dir)?
446-
.into_iter()
447-
.filter_map(|file_path| {
448-
// Some files have insufficient permissions
449-
// (like .lock file created by cargo in documentation directory).
450-
// Skip these files.
451-
fs::File::open(root_dir.join(&file_path))
452-
.ok()
453-
.map(|file| (file_path, file))
454-
})
455-
.map(|(file_path, file)| -> Result<_> {
456-
let alg = CompressionAlgorithm::default();
457-
let content = compress(file, alg)?;
458-
let bucket_path = prefix.join(&file_path).to_slash().unwrap().to_string();
459-
460-
let mime = detect_mime(&file_path);
461-
file_paths_and_mimes.insert(file_path, mime.to_string());
462-
algs.insert(alg);
463-
464-
Ok(Blob {
465-
path: bucket_path,
466-
mime: mime.to_string(),
467-
content,
468-
compression: Some(alg),
469-
// this field is ignored by the backend
470-
date_updated: Utc::now(),
471-
})
472-
})
473-
.collect::<Result<Vec<_>>>()?;
459+
let (blobs, file_paths_and_mimes, algs) = spawn_blocking({
460+
let prefix = prefix.to_owned();
461+
let root_dir = root_dir.to_owned();
462+
move || {
463+
let mut file_paths_and_mimes = HashMap::new();
464+
let mut algs = HashSet::with_capacity(1);
465+
let blobs: Vec<_> = get_file_list(&root_dir)?
466+
.into_iter()
467+
.filter_map(|file_path| {
468+
// Some files have insufficient permissions
469+
// (like .lock file created by cargo in documentation directory).
470+
// Skip these files.
471+
fs::File::open(root_dir.join(&file_path))
472+
.ok()
473+
.map(|file| (file_path, file))
474+
})
475+
.map(|(file_path, file)| -> Result<_> {
476+
let alg = CompressionAlgorithm::default();
477+
let content = compress(file, alg)?;
478+
let bucket_path = prefix.join(&file_path).to_slash().unwrap().to_string();
479+
480+
let mime = detect_mime(&file_path);
481+
file_paths_and_mimes.insert(file_path, mime.to_string());
482+
algs.insert(alg);
483+
484+
Ok(Blob {
485+
path: bucket_path,
486+
mime: mime.to_string(),
487+
content,
488+
compression: Some(alg),
489+
// this field is ignored by the backend
490+
date_updated: Utc::now(),
491+
})
492+
})
493+
.collect::<Result<Vec<_>>>()?;
494+
Ok((blobs, file_paths_and_mimes, algs))
495+
}
496+
})
497+
.await?;
474498

475499
self.store_inner(blobs).await?;
476500
Ok((file_paths_and_mimes, algs))

0 commit comments

Comments
 (0)