Skip to content

feat: add --download-threads / -t arg to control download threads #413

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions i18n/en-US/oma.ftl
Original file line number Diff line number Diff line change
Expand Up @@ -280,3 +280,4 @@ path-not-exist = Path { $path } does not exist.
not-allow-oma-pending-piped = oma cannot run with piped output in interactive mode.
space-warn = You seem to be running out of space. Consider using { $cmd } to purge the package cache, which will free up { $size } in storage space; Please also consider cleaning up your rarely used software or personal data.
space-warn-with-zero = You seem to be running out of space, Please consider cleaning up your rarely used software or personal data.
wrong-thread-count = The specified number of threads { $count } is not allowed (min: 1, max: 255).
1 change: 1 addition & 0 deletions i18n/zh-CN/oma.ftl
Original file line number Diff line number Diff line change
Expand Up @@ -269,3 +269,4 @@ path-not-exist = 路径 { $path } 不存在。
not-allow-oma-pending-piped = 不允许搭配管道使用 oma 的交互模式界面。
space-warn = 检测到您的存储空间告急,您可以使用 { $cmd } 清除软件安装包缓存,可释放 { $size } 存储空间;也请考虑删除不必要的软件包或个人数据。
space-warn-with-zero = 检测到您的存储空间告急,请考虑删除不必要的软件包或个人数据。
wrong-thread-count = 您指定的线程数 { $count } 不合法(最小:1,最大:255)。
54 changes: 40 additions & 14 deletions oma-fetch/src/download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{
};

use async_compression::futures::bufread::{BzDecoder, GzipDecoder, XzDecoder, ZstdDecoder};
use bon::Builder;
use bon::bon;
use futures::{AsyncRead, TryStreamExt, io::BufReader};
use reqwest::{
Client, Method, RequestBuilder,
Expand All @@ -29,22 +29,16 @@ use crate::{DownloadEntry, DownloadSourceType};
const READ_FILE_BUFSIZE: usize = 65536;
const DOWNLOAD_BUFSIZE: usize = 8192;

#[derive(Snafu, Debug)]
#[snafu(display("source list is empty"))]
pub struct EmptySource {
file_name: String,
#[derive(Debug, Snafu)]
pub enum BuilderError {
#[snafu(display("Download task {file_name} sources is empty"))]
EmptySource { file_name: String },
#[snafu(display("Not allow set illegal download threads: {count}"))]
IllegalDownloadThread { count: usize },
}

#[derive(Builder)]
pub(crate) struct SingleDownloader<'a> {
client: &'a Client,
#[builder(with = |entry: &'a DownloadEntry| -> Result<_, EmptySource> {
if entry.source.is_empty() {
return Err(EmptySource { file_name: entry.filename.to_string() });
} else {
return Ok(entry);
}
})]
pub entry: &'a DownloadEntry,
progress: (usize, usize),
retry_times: usize,
Expand Down Expand Up @@ -99,7 +93,39 @@ pub enum SingleDownloadError {
ChecksumMismatch,
}

impl SingleDownloader<'_> {
#[bon]
impl<'a> SingleDownloader<'a> {
#[builder]
pub(crate) fn new(
client: &'a Client,
entry: &'a DownloadEntry,
progress: (usize, usize),
retry_times: usize,
msg: Option<String>,
download_list_index: usize,
file_type: CompressFile,
set_permission: Option<u32>,
timeout: Duration,
) -> Result<SingleDownloader<'a>, BuilderError> {
if entry.source.is_empty() {
return Err(BuilderError::EmptySource {
file_name: entry.filename.to_string(),
});
}

Ok(Self {
client,
entry,
progress,
retry_times,
msg,
download_list_index,
file_type,
set_permission,
timeout,
})
}

pub(crate) async fn try_download(self, callback: &impl AsyncFn(Event)) -> DownloadResult {
let mut sources = self.entry.source.clone();
assert!(!sources.is_empty());
Expand Down
16 changes: 11 additions & 5 deletions oma-fetch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ use std::{cmp::Ordering, path::PathBuf, time::Duration};

use bon::{Builder, builder};
use checksum::Checksum;
use download::{EmptySource, SingleDownloader, SuccessSummary};
use download::{BuilderError, SingleDownloader, SuccessSummary};
use futures::StreamExt;

use reqwest::{Client, Method, RequestBuilder};
use tracing::debug;

pub mod checksum;
mod download;
pub mod download;
pub use crate::download::SingleDownloadError;

pub use reqwest;
Expand Down Expand Up @@ -206,7 +206,13 @@ impl DownloadManager<'_> {
pub async fn start_download(
&self,
callback: impl AsyncFn(Event),
) -> Result<Summary, EmptySource> {
) -> Result<Summary, BuilderError> {
if self.threads == 0 || self.threads > 255 {
return Err(BuilderError::IllegalDownloadThread {
count: self.threads,
});
}

let mut tasks = Vec::new();
let mut list = vec![];
for (i, c) in self.download_list.iter().enumerate() {
Expand All @@ -215,13 +221,13 @@ impl DownloadManager<'_> {
.client(self.client)
.maybe_msg(msg)
.download_list_index(i)
.entry(c)?
.entry(c)
.progress((i + 1, self.download_list.len()))
.retry_times(self.retry_times)
.file_type(c.file_type)
.maybe_set_permission(self.set_permission)
.timeout(self.timeout)
.build();
.build()?;

list.push(single);
}
Expand Down
11 changes: 10 additions & 1 deletion oma-refresh/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use oma_apt_sources_lists::SourcesListError;
use oma_fetch::{
CompressFile, DownloadEntry, DownloadManager, DownloadSource, DownloadSourceType,
checksum::{Checksum, ChecksumError},
download::BuilderError,
reqwest::{
Client, Response,
header::{CONTENT_LENGTH, HeaderValue},
Expand Down Expand Up @@ -100,6 +101,10 @@ pub enum RefreshError {
SourceListsEmpty,
#[error("Failed to operate file: {0}")]
OperateFile(PathBuf, std::io::Error),
#[error("thread count is not illegal: {0}")]
WrongThreadCount(usize),
#[error("Failed to build download manager")]
DownloadManagerBuilderError(BuilderError),
}

type Result<T> = std::result::Result<T, RefreshError>;
Expand Down Expand Up @@ -192,6 +197,10 @@ pub enum Event {

impl<'a> OmaRefresh<'a> {
pub async fn start(mut self, callback: impl AsyncFn(Event)) -> Result<()> {
if self.threads == 0 || self.threads > 255 {
return Err(RefreshError::WrongThreadCount(self.threads));
}

let arch = dpkg_arch(&self.source)?;
let sourcelist = scan_sources_lists(&self.source, &arch, self.apt_config, &callback)
.await
Expand Down Expand Up @@ -275,7 +284,7 @@ impl<'a> OmaRefresh<'a> {
callback(Event::DownloadEvent(event)).await;
})
.await
.unwrap();
.map_err(RefreshError::DownloadManagerBuilderError)?;

if !res.is_download_success() {
return Err(RefreshError::DownloadFailed(None));
Expand Down
15 changes: 15 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use oma_console::writer::{Writeln, Writer};
use oma_contents::OmaContentsError;
use oma_fetch::SingleDownloadError;
use oma_fetch::checksum::ChecksumError;
use oma_fetch::download::BuilderError;
use oma_history::HistoryError;

#[cfg(feature = "aosc")]
Expand Down Expand Up @@ -437,6 +438,20 @@ impl From<RefreshError> for OutputError {
description: fl!("failed-to-operate-path", p = path.display().to_string()),
source: Some(Box::new(error)),
},
RefreshError::WrongThreadCount(count) => Self {
description: fl!("wrong-thread-count", count = count),
source: None,
},
RefreshError::DownloadManagerBuilderError(builder_error) => match builder_error {
BuilderError::EmptySource { file_name } => Self {
description: format!("BUG: task {} should is not empty", file_name),
source: None,
},
BuilderError::IllegalDownloadThread { count } => Self {
description: fl!("wrong-thread-count", count = count),
source: None,
},
},
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ pub struct GlobalOptions {
#[arg(long, global = true, env = "OMA_NO_BELL", value_parser = FalseyValueParser::new()
)]
no_bell: bool,
/// Setup download threads (default as 4)
#[arg(long, short = 't', global = true, env = "OMA_DOWNLOAD_THREADS")]
download_threads: Option<usize>,
}

fn main() {
Expand Down
6 changes: 5 additions & 1 deletion src/subcommand/download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ pub struct Download {
/// Run oma in "dry-run" mode. Useful for testing changes and operations without making changes to the system
#[arg(from_global)]
dry_run: bool,
/// Setup download threads (default as 4)
#[arg(from_global)]
download_threads: Option<usize>,
}

impl CliExecuter for Download {
Expand All @@ -35,6 +38,7 @@ impl CliExecuter for Download {
packages,
path,
dry_run,
download_threads,
} = self;

let path = path.canonicalize().map_err(|e| OutputError {
Expand Down Expand Up @@ -71,7 +75,7 @@ impl CliExecuter for Download {
&HTTP_CLIENT,
pkgs,
DownloadConfig {
network_thread: Some(config.network_thread()),
network_thread: Some(download_threads.unwrap_or_else(|| config.network_thread())),
download_dir: Some(&path),
auth: auth_config("/").as_ref(),
},
Expand Down
6 changes: 5 additions & 1 deletion src/subcommand/fix_broken.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ pub struct FixBroken {
/// Set apt options
#[arg(from_global)]
apt_options: Vec<String>,
/// Setup download threads (default as 4)
#[arg(from_global)]
download_threads: Option<usize>,
}

impl CliExecuter for FixBroken {
Expand All @@ -62,6 +65,7 @@ impl CliExecuter for FixBroken {
sysroot,
apt_options,
no_fix_dpkg_status,
download_threads,
} = self;

let mut _fds = None;
Expand Down Expand Up @@ -97,7 +101,7 @@ impl CliExecuter for FixBroken {
.autoremove(autoremove)
.remove_config(remove_config)
.maybe_auth_config(auth_config)
.network_thread(config.network_thread())
.network_thread(download_threads.unwrap_or_else(|| config.network_thread()))
.build()
.run()
}
Expand Down
6 changes: 5 additions & 1 deletion src/subcommand/history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ pub struct Undo {
/// Set apt options
#[arg(from_global)]
apt_options: Vec<String>,
/// Setup download threads (default as 4)
#[arg(from_global)]
download_threads: Option<usize>,
}

impl CliExecuter for Undo {
Expand All @@ -122,6 +125,7 @@ impl CliExecuter for Undo {
sysroot,
apt_options,
no_fix_dpkg_status,
download_threads,
} = self;

let _fds = if !no_check_dbus && !config.no_check_dbus() && !dry_run {
Expand Down Expand Up @@ -243,7 +247,7 @@ impl CliExecuter for Undo {
.yes(false)
.remove_config(remove_config)
.autoremove(autoremove)
.network_thread(config.network_thread())
.network_thread(download_threads.unwrap_or_else(|| config.network_thread()))
.maybe_auth_config(auth_config)
.build()
.run()?;
Expand Down
8 changes: 6 additions & 2 deletions src/subcommand/install.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ pub struct Install {
/// Remove package(s) also remove configuration file(s), like apt purge
#[arg(long, visible_alias = "purge")]
remove_config: bool,
/// Setup download threads (default as 4)
#[arg(from_global)]
download_threads: Option<usize>,
}

impl CliExecuter for Install {
Expand Down Expand Up @@ -116,6 +119,7 @@ impl CliExecuter for Install {
autoremove,
remove_config,
no_fix_dpkg_status,
download_threads,
} = self;

if !dry_run {
Expand All @@ -141,7 +145,7 @@ impl CliExecuter for Install {
.client(&HTTP_CLIENT)
.dry_run(dry_run)
.no_progress(no_progress)
.network_thread(config.network_thread())
.network_thread(download_threads.unwrap_or_else(|| config.network_thread()))
.sysroot(&sysroot)
.config(&apt_config)
.maybe_auth_config(auth_config);
Expand Down Expand Up @@ -216,7 +220,7 @@ impl CliExecuter for Install {
.yes(yes)
.remove_config(remove_config)
.autoremove(autoremove)
.network_thread(config.network_thread())
.network_thread(download_threads.unwrap_or_else(|| config.network_thread()))
.maybe_auth_config(auth_config)
.fix_dpkg_status(!no_fix_dpkg_status)
.build()
Expand Down
Loading
Loading