|
1 | 1 | //! This module contains all code sporting `gitoxide` for operations on `git` repositories and it mirrors
|
2 | 2 | //! `utils` closely for now. One day it can be renamed into `utils` once `git2` isn't required anymore.
|
3 | 3 |
|
4 |
| -use crate::util::{network, Progress}; |
| 4 | +use crate::util::{human_readable_bytes, network, MetricsCounter, Progress}; |
5 | 5 | use crate::{CargoResult, Config};
|
6 | 6 | use git_repository as git;
|
7 |
| -use std::sync::atomic::AtomicBool; |
| 7 | +use std::sync::atomic::{AtomicBool, Ordering}; |
8 | 8 | use std::sync::Arc;
|
9 |
| -use std::time::Duration; |
| 9 | +use std::time::{Duration, Instant}; |
10 | 10 |
|
11 | 11 | /// For the time being, `repo_path` makes it easy to instantiate a gitoxide repo just for fetching.
|
12 | 12 | /// In future this may change to be the gitoxide repository itself.
|
@@ -49,22 +49,78 @@ pub fn with_retry_and_progress(
|
49 | 49 | .ok()
|
50 | 50 | .unwrap_or_default()
|
51 | 51 | .auto_deregister();
|
52 |
| - let should_interrupt = AtomicBool::new(false); |
53 | 52 | let mut progress_bar = Progress::new("Fetch", config);
|
54 | 53 | std::thread::scope(move |s| {
|
55 | 54 | s.spawn({
|
56 | 55 | let root = Arc::downgrade(&progress_root);
|
57 | 56 | move || -> CargoResult<()> {
|
| 57 | + const READ_PACK_BYTES: [u8; 4] = *b"BWRB"; |
| 58 | + const DELTA_INDEX_OBJECTS: [u8; 4] = *b"IWIO"; |
| 59 | + const RESOLVE_OBJECTS: [u8; 4] = *b"IWRO"; |
| 60 | + |
| 61 | + // We choose `N=10` here to make a `300ms * 10slots ~= 3000ms` |
| 62 | + // sliding window for tracking the data transfer rate (in bytes/s). |
| 63 | + let mut last_update = Instant::now(); |
| 64 | + let mut counter = MetricsCounter::<10>::new(0, last_update); |
| 65 | + |
58 | 66 | let mut tasks = Vec::with_capacity(10);
|
59 | 67 | while let Some(root) = root.upgrade() {
|
60 | 68 | root.sorted_snapshot(&mut tasks);
|
61 |
| - progress_bar.tick(0, 10, "TBD")?; |
62 |
| - // dbg!(&tasks); |
| 69 | + let counters = tasks |
| 70 | + .iter() |
| 71 | + .find_map(|(_, t)| { |
| 72 | + (t.id == READ_PACK_BYTES).then(|| t.progress.as_ref().expect("set")) |
| 73 | + }) |
| 74 | + .and_then(|read| { |
| 75 | + tasks.iter().find_map(|(_, t)| { |
| 76 | + (t.id == DELTA_INDEX_OBJECTS) |
| 77 | + .then(|| (t.progress.as_ref().expect("set"), Some(read))) |
| 78 | + }) |
| 79 | + }) |
| 80 | + .or_else(|| { |
| 81 | + tasks.iter().find_map(|(_, t)| { |
| 82 | + (t.id == RESOLVE_OBJECTS) |
| 83 | + .then(|| (t.progress.as_ref().expect("set"), None)) |
| 84 | + }) |
| 85 | + }); |
| 86 | + let progress_info = match counters { |
| 87 | + Some((objs, None)) => { |
| 88 | + // Resolving deltas. |
| 89 | + let objects = objs.step.load(Ordering::Relaxed); |
| 90 | + let total_objects = objs.done_at.expect("known amount of objects"); |
| 91 | + Some(( |
| 92 | + objects, |
| 93 | + total_objects, |
| 94 | + format!(", ({}/{}) resolving deltas", objects, total_objects), |
| 95 | + )) |
| 96 | + } |
| 97 | + Some((objs, Some(read_pack))) => { |
| 98 | + // Receiving objects. |
| 99 | + let objects = objs.step.load(Ordering::Relaxed); |
| 100 | + let total_objects = objs.done_at.expect("known amount of objects"); |
| 101 | + let received_bytes = read_pack.step.load(Ordering::Relaxed); |
| 102 | + |
| 103 | + let now = Instant::now(); |
| 104 | + counter.add(received_bytes, now); |
| 105 | + last_update = now; |
| 106 | + let (rate, unit) = human_readable_bytes(counter.rate() as u64); |
| 107 | + Some((objects, total_objects, format!(", {:.2}{}/s", rate, unit))) |
| 108 | + } |
| 109 | + None => { |
| 110 | + counter = MetricsCounter::<10>::new(0, last_update); |
| 111 | + None |
| 112 | + } |
| 113 | + }; |
| 114 | + if let Some((objects, total_objects, msg)) = progress_info { |
| 115 | + progress_bar.tick(objects, total_objects, &msg)?; |
| 116 | + } |
63 | 117 | std::thread::sleep(Duration::from_millis(300));
|
64 | 118 | }
|
65 | 119 | Ok(())
|
66 | 120 | }
|
67 | 121 | });
|
68 |
| - network::with_retry(config, || cb(&repo, &should_interrupt, &mut progress)) |
| 122 | + network::with_retry(config, || { |
| 123 | + cb(&repo, &git::interrupt::IS_INTERRUPTED, &mut progress) |
| 124 | + }) |
69 | 125 | })
|
70 | 126 | }
|
0 commit comments