Skip to content

Commit d657388

Browse files
authored
Merge pull request #413 from AOSC-Dev/add-download-threads-arg
feat: add `--download-threads` / `-t` arg to control download threads
2 parents b255414 + 80ea3e1 commit d657388

File tree

18 files changed

+155
-44
lines changed

18 files changed

+155
-44
lines changed

i18n/en-US/oma.ftl

+1
Original file line numberDiff line numberDiff line change
@@ -280,3 +280,4 @@ path-not-exist = Path { $path } does not exist.
280280
not-allow-oma-pending-piped = oma cannot run with piped output in interactive mode.
281281
space-warn = You seem to be running out of space. Consider using { $cmd } to purge the package cache, which will free up { $size } in storage space; Please also consider cleaning up your rarely used software or personal data.
282282
space-warn-with-zero = You seem to be running out of space, Please consider cleaning up your rarely used software or personal data.
283+
wrong-thread-count = The specified number of threads { $count } is not allowed (min: 1, max: 255).

i18n/zh-CN/oma.ftl

+1
Original file line numberDiff line numberDiff line change
@@ -269,3 +269,4 @@ path-not-exist = 路径 { $path } 不存在。
269269
not-allow-oma-pending-piped = 不允许搭配管道使用 oma 的交互模式界面。
270270
space-warn = 检测到您的存储空间告急,您可以使用 { $cmd } 清除软件安装包缓存,可释放 { $size } 存储空间;也请考虑删除不必要的软件包或个人数据。
271271
space-warn-with-zero = 检测到您的存储空间告急,请考虑删除不必要的软件包或个人数据。
272+
wrong-thread-count = 您指定的线程数 { $count } 不合法(最小:1,最大:255)。

oma-fetch/src/download.rs

+40-14
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use std::{
88
};
99

1010
use async_compression::futures::bufread::{BzDecoder, GzipDecoder, XzDecoder, ZstdDecoder};
11-
use bon::Builder;
11+
use bon::bon;
1212
use futures::{AsyncRead, TryStreamExt, io::BufReader};
1313
use reqwest::{
1414
Client, Method, RequestBuilder,
@@ -29,22 +29,16 @@ use crate::{DownloadEntry, DownloadSourceType};
2929
const READ_FILE_BUFSIZE: usize = 65536;
3030
const DOWNLOAD_BUFSIZE: usize = 8192;
3131

32-
#[derive(Snafu, Debug)]
33-
#[snafu(display("source list is empty"))]
34-
pub struct EmptySource {
35-
file_name: String,
32+
#[derive(Debug, Snafu)]
33+
pub enum BuilderError {
34+
#[snafu(display("Download task {file_name} sources is empty"))]
35+
EmptySource { file_name: String },
36+
#[snafu(display("Not allow set illegal download threads: {count}"))]
37+
IllegalDownloadThread { count: usize },
3638
}
3739

38-
#[derive(Builder)]
3940
pub(crate) struct SingleDownloader<'a> {
4041
client: &'a Client,
41-
#[builder(with = |entry: &'a DownloadEntry| -> Result<_, EmptySource> {
42-
if entry.source.is_empty() {
43-
return Err(EmptySource { file_name: entry.filename.to_string() });
44-
} else {
45-
return Ok(entry);
46-
}
47-
})]
4842
pub entry: &'a DownloadEntry,
4943
progress: (usize, usize),
5044
retry_times: usize,
@@ -99,7 +93,39 @@ pub enum SingleDownloadError {
9993
ChecksumMismatch,
10094
}
10195

102-
impl SingleDownloader<'_> {
96+
#[bon]
97+
impl<'a> SingleDownloader<'a> {
98+
#[builder]
99+
pub(crate) fn new(
100+
client: &'a Client,
101+
entry: &'a DownloadEntry,
102+
progress: (usize, usize),
103+
retry_times: usize,
104+
msg: Option<String>,
105+
download_list_index: usize,
106+
file_type: CompressFile,
107+
set_permission: Option<u32>,
108+
timeout: Duration,
109+
) -> Result<SingleDownloader<'a>, BuilderError> {
110+
if entry.source.is_empty() {
111+
return Err(BuilderError::EmptySource {
112+
file_name: entry.filename.to_string(),
113+
});
114+
}
115+
116+
Ok(Self {
117+
client,
118+
entry,
119+
progress,
120+
retry_times,
121+
msg,
122+
download_list_index,
123+
file_type,
124+
set_permission,
125+
timeout,
126+
})
127+
}
128+
103129
pub(crate) async fn try_download(self, callback: &impl AsyncFn(Event)) -> DownloadResult {
104130
let mut sources = self.entry.source.clone();
105131
assert!(!sources.is_empty());

oma-fetch/src/lib.rs

+11-5
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,14 @@ use std::{cmp::Ordering, path::PathBuf, time::Duration};
22

33
use bon::{Builder, builder};
44
use checksum::Checksum;
5-
use download::{EmptySource, SingleDownloader, SuccessSummary};
5+
use download::{BuilderError, SingleDownloader, SuccessSummary};
66
use futures::StreamExt;
77

88
use reqwest::{Client, Method, RequestBuilder};
99
use tracing::debug;
1010

1111
pub mod checksum;
12-
mod download;
12+
pub mod download;
1313
pub use crate::download::SingleDownloadError;
1414

1515
pub use reqwest;
@@ -206,7 +206,13 @@ impl DownloadManager<'_> {
206206
pub async fn start_download(
207207
&self,
208208
callback: impl AsyncFn(Event),
209-
) -> Result<Summary, EmptySource> {
209+
) -> Result<Summary, BuilderError> {
210+
if self.threads == 0 || self.threads > 255 {
211+
return Err(BuilderError::IllegalDownloadThread {
212+
count: self.threads,
213+
});
214+
}
215+
210216
let mut tasks = Vec::new();
211217
let mut list = vec![];
212218
for (i, c) in self.download_list.iter().enumerate() {
@@ -215,13 +221,13 @@ impl DownloadManager<'_> {
215221
.client(self.client)
216222
.maybe_msg(msg)
217223
.download_list_index(i)
218-
.entry(c)?
224+
.entry(c)
219225
.progress((i + 1, self.download_list.len()))
220226
.retry_times(self.retry_times)
221227
.file_type(c.file_type)
222228
.maybe_set_permission(self.set_permission)
223229
.timeout(self.timeout)
224-
.build();
230+
.build()?;
225231

226232
list.push(single);
227233
}

oma-refresh/src/db.rs

+10-1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use oma_apt_sources_lists::SourcesListError;
2525
use oma_fetch::{
2626
CompressFile, DownloadEntry, DownloadManager, DownloadSource, DownloadSourceType,
2727
checksum::{Checksum, ChecksumError},
28+
download::BuilderError,
2829
reqwest::{
2930
Client, Response,
3031
header::{CONTENT_LENGTH, HeaderValue},
@@ -100,6 +101,10 @@ pub enum RefreshError {
100101
SourceListsEmpty,
101102
#[error("Failed to operate file: {0}")]
102103
OperateFile(PathBuf, std::io::Error),
104+
#[error("thread count is not illegal: {0}")]
105+
WrongThreadCount(usize),
106+
#[error("Failed to build download manager")]
107+
DownloadManagerBuilderError(BuilderError),
103108
}
104109

105110
type Result<T> = std::result::Result<T, RefreshError>;
@@ -189,6 +194,10 @@ pub enum Event {
189194

190195
impl<'a> OmaRefresh<'a> {
191196
pub async fn start(mut self, callback: impl AsyncFn(Event)) -> Result<()> {
197+
if self.threads == 0 || self.threads > 255 {
198+
return Err(RefreshError::WrongThreadCount(self.threads));
199+
}
200+
192201
let arch = dpkg_arch(&self.source)?;
193202
let sourcelist = scan_sources_lists(&self.source, &arch, self.apt_config, &callback)
194203
.await
@@ -272,7 +281,7 @@ impl<'a> OmaRefresh<'a> {
272281
callback(Event::DownloadEvent(event)).await;
273282
})
274283
.await
275-
.unwrap();
284+
.map_err(RefreshError::DownloadManagerBuilderError)?;
276285

277286
if !res.is_download_success() {
278287
return Err(RefreshError::DownloadFailed(None));

src/error.rs

+15
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use oma_console::writer::{Writeln, Writer};
88
use oma_contents::OmaContentsError;
99
use oma_fetch::SingleDownloadError;
1010
use oma_fetch::checksum::ChecksumError;
11+
use oma_fetch::download::BuilderError;
1112
use oma_history::HistoryError;
1213

1314
#[cfg(feature = "aosc")]
@@ -437,6 +438,20 @@ impl From<RefreshError> for OutputError {
437438
description: fl!("failed-to-operate-path", p = path.display().to_string()),
438439
source: Some(Box::new(error)),
439440
},
441+
RefreshError::WrongThreadCount(count) => Self {
442+
description: fl!("wrong-thread-count", count = count),
443+
source: None,
444+
},
445+
RefreshError::DownloadManagerBuilderError(builder_error) => match builder_error {
446+
BuilderError::EmptySource { file_name } => Self {
447+
description: format!("BUG: task {} should is not empty", file_name),
448+
source: None,
449+
},
450+
BuilderError::IllegalDownloadThread { count } => Self {
451+
description: fl!("wrong-thread-count", count = count),
452+
source: None,
453+
},
454+
},
440455
}
441456
}
442457
}

src/main.rs

+3
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,9 @@ pub struct GlobalOptions {
124124
#[arg(long, global = true, env = "OMA_NO_BELL", value_parser = FalseyValueParser::new()
125125
)]
126126
no_bell: bool,
127+
/// Setup download threads (default as 4)
128+
#[arg(long, short = 't', global = true, env = "OMA_DOWNLOAD_THREADS")]
129+
download_threads: Option<usize>,
127130
}
128131

129132
fn main() {

src/subcommand/download.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ pub struct Download {
2929
/// Run oma in "dry-run" mode. Useful for testing changes and operations without making changes to the system
3030
#[arg(from_global)]
3131
dry_run: bool,
32+
/// Setup download threads (default as 4)
33+
#[arg(from_global)]
34+
download_threads: Option<usize>,
3235
}
3336

3437
impl CliExecuter for Download {
@@ -37,6 +40,7 @@ impl CliExecuter for Download {
3740
packages,
3841
path,
3942
dry_run,
43+
download_threads,
4044
} = self;
4145

4246
let path = path.canonicalize().map_err(|e| OutputError {
@@ -73,7 +77,7 @@ impl CliExecuter for Download {
7377
&HTTP_CLIENT,
7478
pkgs,
7579
DownloadConfig {
76-
network_thread: Some(config.network_thread()),
80+
network_thread: Some(download_threads.unwrap_or_else(|| config.network_thread())),
7781
download_dir: Some(&path),
7882
auth: auth_config("/").as_ref(),
7983
},

src/subcommand/fix_broken.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@ pub struct FixBroken {
4444
/// Set apt options
4545
#[arg(from_global)]
4646
apt_options: Vec<String>,
47+
/// Setup download threads (default as 4)
48+
#[arg(from_global)]
49+
download_threads: Option<usize>,
4750
}
4851

4952
impl CliExecuter for FixBroken {
@@ -62,6 +65,7 @@ impl CliExecuter for FixBroken {
6265
sysroot,
6366
apt_options,
6467
no_fix_dpkg_status,
68+
download_threads,
6569
} = self;
6670

6771
let mut _fds = None;
@@ -97,7 +101,7 @@ impl CliExecuter for FixBroken {
97101
.autoremove(autoremove)
98102
.remove_config(remove_config)
99103
.maybe_auth_config(auth_config)
100-
.network_thread(config.network_thread())
104+
.network_thread(download_threads.unwrap_or_else(|| config.network_thread()))
101105
.build()
102106
.run()
103107
}

src/subcommand/history.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,9 @@ pub struct Undo {
103103
/// Set apt options
104104
#[arg(from_global)]
105105
apt_options: Vec<String>,
106+
/// Setup download threads (default as 4)
107+
#[arg(from_global)]
108+
download_threads: Option<usize>,
106109
}
107110

108111
impl CliExecuter for Undo {
@@ -122,6 +125,7 @@ impl CliExecuter for Undo {
122125
sysroot,
123126
apt_options,
124127
no_fix_dpkg_status,
128+
download_threads,
125129
} = self;
126130

127131
let _fds = if !no_check_dbus && !config.no_check_dbus() && !dry_run {
@@ -243,7 +247,7 @@ impl CliExecuter for Undo {
243247
.yes(false)
244248
.remove_config(remove_config)
245249
.autoremove(autoremove)
246-
.network_thread(config.network_thread())
250+
.network_thread(download_threads.unwrap_or_else(|| config.network_thread()))
247251
.maybe_auth_config(auth_config)
248252
.build()
249253
.run()?;

src/subcommand/install.rs

+6-2
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,9 @@ pub struct Install {
9292
/// Remove package(s) also remove configuration file(s), like apt purge
9393
#[arg(long, visible_alias = "purge")]
9494
remove_config: bool,
95+
/// Setup download threads (default as 4)
96+
#[arg(from_global)]
97+
download_threads: Option<usize>,
9598
}
9699

97100
impl CliExecuter for Install {
@@ -119,6 +122,7 @@ impl CliExecuter for Install {
119122
autoremove,
120123
remove_config,
121124
no_fix_dpkg_status,
125+
download_threads,
122126
} = self;
123127

124128
if !dry_run {
@@ -144,7 +148,7 @@ impl CliExecuter for Install {
144148
.client(&HTTP_CLIENT)
145149
.dry_run(dry_run)
146150
.no_progress(no_progress)
147-
.network_thread(config.network_thread())
151+
.network_thread(download_threads.unwrap_or_else(|| config.network_thread()))
148152
.sysroot(&sysroot)
149153
.config(&apt_config)
150154
.maybe_auth_config(auth_config);
@@ -219,7 +223,7 @@ impl CliExecuter for Install {
219223
.yes(yes)
220224
.remove_config(remove_config)
221225
.autoremove(autoremove)
222-
.network_thread(config.network_thread())
226+
.network_thread(download_threads.unwrap_or_else(|| config.network_thread()))
223227
.maybe_auth_config(auth_config)
224228
.fix_dpkg_status(!no_fix_dpkg_status)
225229
.build()

0 commit comments

Comments
 (0)