Skip to content

install: add progress-fd for install #1431

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 84 additions & 1 deletion crates/lib/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,14 @@ use crate::spec::ImageReference;
use crate::utils::sigpolicy_from_opt;

/// Shared progress options
#[derive(Debug, Parser, PartialEq, Eq)]
#[derive(Debug, Parser, PartialEq, Eq, Clone, Serialize, Deserialize)]
pub(crate) struct ProgressOptions {
/// File descriptor number which must refer to an open pipe (anonymous or named).
///
/// Interactive progress will be written to this file descriptor as "JSON lines"
/// format, where each value is separated by a newline.
#[clap(long, hide = true)]
#[serde(default)]
pub(crate) progress_fd: Option<RawProgressFd>,
}

Expand Down Expand Up @@ -1422,4 +1423,86 @@ mod tests {
]));
assert_eq!(args.as_slice(), ["container", "image", "pull"]);
}

#[test]
fn test_progress_fd_install_parsing() {
// Test install to-disk with progress-fd
let opts = Opt::try_parse_from([
"bootc",
"install",
"to-disk",
"--progress-fd",
"3",
"/dev/sda",
])
.unwrap();

if let Opt::Install(crate::cli::InstallOpts::ToDisk(install_opts)) = opts {
assert_eq!(install_opts.progress.progress_fd.unwrap().get_raw_fd(), 3);
} else {
panic!("Expected install to-disk command");
}

// Test install to-filesystem with progress-fd
let opts = Opt::try_parse_from([
"bootc",
"install",
"to-filesystem",
"--progress-fd",
"4",
"/mnt/root",
])
.unwrap();

if let Opt::Install(crate::cli::InstallOpts::ToFilesystem(install_opts)) = opts {
assert_eq!(install_opts.progress.progress_fd.unwrap().get_raw_fd(), 4);
} else {
panic!("Expected install to-filesystem command");
}

// Test install to-existing-root with progress-fd
let opts =
Opt::try_parse_from(["bootc", "install", "to-existing-root", "--progress-fd", "5"])
.unwrap();

if let Opt::Install(crate::cli::InstallOpts::ToExistingRoot(install_opts)) = opts {
assert_eq!(install_opts.progress.progress_fd.unwrap().get_raw_fd(), 5);
} else {
panic!("Expected install to-existing-root command");
}
}

#[test]
fn test_progress_fd_validation() {
// Test that invalid FD values are rejected
let result = Opt::try_parse_from([
"bootc",
"install",
"to-disk",
"--progress-fd",
"1",
"/dev/sda",
]);
assert!(result.is_err());

let result = Opt::try_parse_from([
"bootc",
"install",
"to-disk",
"--progress-fd",
"2",
"/dev/sda",
]);
assert!(result.is_err());

let result = Opt::try_parse_from([
"bootc",
"install",
"to-disk",
"--progress-fd",
"invalid",
"/dev/sda",
]);
assert!(result.is_err());
}
}
103 changes: 47 additions & 56 deletions crates/lib/src/deploy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ use ostree_ext::ostree::{self, Sysroot};
use ostree_ext::sysroot::SysrootLock;
use ostree_ext::tokio_util::spawn_blocking_cancellable_flatten;

use crate::progress_aggregator::ProgressAggregatorBuilder;
use crate::progress_jsonl::{Event, ProgressWriter, SubTaskBytes, SubTaskStep};
use crate::progress_renderer::ProgressFilter;
use crate::spec::ImageReference;
use crate::spec::{BootOrder, HostSpec};
use crate::status::labels_of_config;
Expand Down Expand Up @@ -138,7 +140,7 @@ fn prefix_of_progress(p: &ImportProgress) -> &'static str {
}
}

/// Write container fetch progress to standard output.
/// Write container fetch progress using JSON-first architecture.
async fn handle_layer_progress_print(
mut layers: tokio::sync::mpsc::Receiver<ostree_container::store::ImportProgress>,
mut layer_bytes: tokio::sync::watch::Receiver<Option<ostree_container::store::LayerProgress>>,
Expand All @@ -152,35 +154,26 @@ async fn handle_layer_progress_print(
) -> ProgressWriter {
let start = std::time::Instant::now();
let mut total_read = 0u64;
let bar = indicatif::MultiProgress::new();
if quiet {
bar.set_draw_target(indicatif::ProgressDrawTarget::hidden());
}
let layers_bar = bar.add(indicatif::ProgressBar::new(
n_layers_to_fetch.try_into().unwrap(),
));
let byte_bar = bar.add(indicatif::ProgressBar::new(0));
// let byte_bar = indicatif::ProgressBar::new(0);
// byte_bar.set_draw_target(indicatif::ProgressDrawTarget::hidden());
layers_bar.set_style(
indicatif::ProgressStyle::default_bar()
.template("{prefix} {bar} {pos}/{len} {wide_msg}")
.unwrap(),
);
let taskname = "Fetching layers";
layers_bar.set_prefix(taskname);
layers_bar.set_message("");
byte_bar.set_prefix("Fetching");
byte_bar.set_style(
indicatif::ProgressStyle::default_bar()
.template(
" └ {prefix} {bar} {binary_bytes}/{binary_total_bytes} ({binary_bytes_per_sec}) {wide_msg}",
)
.unwrap()
);

// Create JSON-first progress aggregator for pulling tasks
let visual_filter = if quiet {
None
} else {
Some(ProgressFilter::TasksMatching(vec!["pulling".to_string()]))
};

let mut aggregator = {
let mut builder = ProgressAggregatorBuilder::new().with_json(prog.clone());
if let Some(filter) = visual_filter {
builder = builder.with_visual(filter);
}
builder.build()
};

let mut subtasks = vec![];
let mut subtask: SubTaskBytes = Default::default();
let mut current_layer_step = 0u64;

loop {
tokio::select! {
// Always handle layer changes first.
Expand All @@ -192,12 +185,6 @@ async fn handle_layer_progress_print(
let short_digest = &layer.digest().digest()[0..21];
let layer_size = layer.size();
if l.is_starting() {
// Reset the progress bar
byte_bar.reset_elapsed();
byte_bar.reset_eta();
byte_bar.set_length(layer_size);
byte_bar.set_message(format!("{layer_type} {short_digest}"));

subtask = SubTaskBytes {
subtask: layer_type.into(),
description: format!("{layer_type}: {short_digest}").clone().into(),
Expand All @@ -207,24 +194,26 @@ async fn handle_layer_progress_print(
bytes_total: layer_size,
};
} else {
byte_bar.set_position(layer_size);
layers_bar.inc(1);
total_read = total_read.saturating_add(layer_size);
current_layer_step += 1;
// Emit an event where bytes == total to signal completion.
subtask.bytes = layer_size;
subtasks.push(subtask.clone());
prog.send(Event::ProgressBytes {

// Send progress event via JSON-first aggregator
let event = Event::ProgressBytes {
task: "pulling".into(),
description: format!("Pulling Image: {digest}").into(),
id: (*digest).into(),
bytes_cached: bytes_total - bytes_to_download,
bytes: total_read,
bytes_total: bytes_to_download,
steps_cached: (layers_total - n_layers_to_fetch) as u64,
steps: layers_bar.position(),
steps: current_layer_step,
steps_total: n_layers_to_fetch as u64,
subtasks: subtasks.clone(),
}).await;
};
let _ = aggregator.send_event(event).await;
}
} else {
// If the receiver is disconnected, then we're done
Expand All @@ -241,40 +230,42 @@ async fn handle_layer_progress_print(
bytes.as_ref().cloned()
};
if let Some(bytes) = bytes {
byte_bar.set_position(bytes.fetched);
subtask.bytes = byte_bar.position();
prog.send_lossy(Event::ProgressBytes {
subtask.bytes = bytes.fetched;

// Send lossy progress event via JSON-first aggregator
let event = Event::ProgressBytes {
task: "pulling".into(),
description: format!("Pulling Image: {digest}").into(),
id: (*digest).into(),
bytes_cached: bytes_total - bytes_to_download,
bytes: total_read + byte_bar.position(),
bytes: total_read + bytes.fetched,
bytes_total: bytes_to_download,
steps_cached: (layers_total - n_layers_to_fetch) as u64,
steps: layers_bar.position(),
steps: current_layer_step,
steps_total: n_layers_to_fetch as u64,
subtasks: subtasks.clone().into_iter().chain([subtask.clone()]).collect(),
}).await;
};
let _ = aggregator.send_event(event).await;
}
}
}
}
byte_bar.finish_and_clear();
layers_bar.finish_and_clear();
if let Err(e) = bar.clear() {
tracing::warn!("clearing bar: {e}");
}

// Finish progress aggregator
aggregator.finish();

let end = std::time::Instant::now();
let elapsed = end.duration_since(start);
let persec = total_read as f64 / elapsed.as_secs_f64();
let persec = indicatif::HumanBytes(persec as u64);
if let Err(e) = bar.println(&format!(
"Fetched layers: {} in {} ({}/s)",
indicatif::HumanBytes(total_read),
indicatif::HumanDuration(elapsed),
persec,
)) {
tracing::warn!("writing to stdout: {e}");

if !quiet {
println!(
"Fetched layers: {} in {} ({}/s)",
indicatif::HumanBytes(total_read),
indicatif::HumanDuration(elapsed),
persec,
);
}

// Since the progress notifier closed, we know import has started
Expand Down
Loading
Loading