Skip to content

Commit 11b587c

Browse files
run "load new crates" and "build crates from queue" in different threads
1 parent a31865e commit 11b587c

File tree

3 files changed

+178
-50
lines changed

3 files changed

+178
-50
lines changed

src/docbuilder/mod.rs

+5
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,11 @@ impl DocBuilder {
110110
Ok(())
111111
}
112112

113+
/// Checks for the lock file and returns whether it currently exists.
114+
pub fn is_locked(&self) -> bool {
115+
self.lock_path().exists()
116+
}
117+
113118
/// Returns a reference of options
114119
pub fn options(&self) -> &DocBuilderOptions {
115120
&self.options

src/docbuilder/queue.rs

+48-6
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,11 @@ use crates_index_diff::{ChangeKind, Index};
1010
impl DocBuilder {
1111
/// Updates crates.io-index repository and adds new crates into build queue.
1212
/// Returns size of queue
13-
pub fn get_new_crates(&mut self) -> Result<i64> {
13+
pub fn get_new_crates(&mut self) -> Result<usize> {
1414
let conn = try!(connect_db());
1515
let index = try!(Index::from_path_or_cloned(&self.options.crates_io_index_path));
1616
let mut changes = try!(index.fetch_changes());
17+
let mut add_count: usize = 0;
1718

1819
// I belive this will fix ordering of queue if we get more than one crate from changes
1920
changes.reverse();
@@ -23,17 +24,20 @@ impl DocBuilder {
2324
&[&krate.name, &krate.version])
2425
.ok();
2526
debug!("{}-{} added into build queue", krate.name, krate.version);
27+
add_count += 1;
2628
}
2729

28-
let queue_count = conn.query("SELECT COUNT(*) FROM queue WHERE attempt < 5", &[])
30+
Ok(add_count)
31+
}
32+
33+
pub fn get_queue_count(&self) -> Result<i64> {
34+
let conn = try!(connect_db());
35+
Ok(conn.query("SELECT COUNT(*) FROM queue WHERE attempt < 5", &[])
2936
.unwrap()
3037
.get(0)
31-
.get(0);
32-
33-
Ok(queue_count)
38+
.get(0))
3439
}
3540

36-
3741
/// Builds packages from queue
3842
pub fn build_packages_queue(&mut self) -> Result<usize> {
3943
let conn = try!(connect_db());
@@ -67,6 +71,44 @@ impl DocBuilder {
6771

6872
Ok(build_count)
6973
}
74+
75+
/// Builds the top package from the queue. Returns whether the queue was empty.
76+
pub fn build_next_queue_package(&mut self) -> Result<bool> {
77+
let conn = try!(connect_db());
78+
79+
let query = try!(conn.query("SELECT id, name, version
80+
FROM queue
81+
WHERE attempt < 5
82+
ORDER BY attempt ASC, id ASC
83+
LIMIT 1",
84+
&[]));
85+
86+
if query.is_empty() {
87+
// nothing in the queue; bail
88+
return Ok(false);
89+
}
90+
91+
let id: i32 = query.get(0).get(0);
92+
let name: String = query.get(0).get(1);
93+
let version: String = query.get(0).get(2);
94+
95+
match self.build_package(&name[..], &version[..]) {
96+
Ok(_) => {
97+
let _ = conn.execute("DELETE FROM queue WHERE id = $1", &[&id]);
98+
}
99+
Err(e) => {
100+
// Increase attempt count
101+
let _ = conn.execute("UPDATE queue SET attempt = attempt + 1 WHERE id = $1",
102+
&[&id]);
103+
error!("Failed to build package {}-{} from queue: {}",
104+
name,
105+
version,
106+
e)
107+
}
108+
}
109+
110+
Ok(true)
111+
}
70112
}
71113

72114
#[cfg(test)]

src/utils/daemon.rs

+125-44
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
55

66
use std::{env, thread};
7+
use std::panic::{catch_unwind, AssertUnwindSafe};
78
use std::process::exit;
89
use std::fs::File;
910
use std::io::Write;
@@ -44,80 +45,160 @@ pub fn start_daemon() {
4445
exit(0);
4546
}
4647

47-
4848
// check new crates every minute
4949
thread::spawn(move || {
50+
// space this out to prevent it from clashing against the queue-builder thread on launch
51+
thread::sleep(Duration::from_secs(30));
5052
loop {
53+
let opts = opts();
54+
let mut doc_builder = DocBuilder::new(opts);
55+
56+
debug!("Checking new crates");
57+
match doc_builder.get_new_crates() {
58+
Ok(n) => debug!("{} crates added to queue", n),
59+
Err(e) => error!("Failed to get new crates: {}", e),
60+
}
61+
5162
thread::sleep(Duration::from_secs(60));
63+
}
64+
});
65+
66+
// build new crates every minute
67+
thread::spawn(move || {
68+
let mut opts = opts();
69+
opts.skip_if_exists = true;
70+
let mut doc_builder = DocBuilder::new(opts);
71+
72+
/// Represents the current state of the builder thread.
73+
enum BuilderState {
74+
/// The builder thread has just started, and hasn't built any crates yet.
75+
Fresh,
76+
/// The builder has just seen an empty build queue.
77+
EmptyQueue,
78+
/// The builder has just seen the lock file.
79+
Locked,
80+
/// The builder has just finished building a crate. The enclosed count is the number of
81+
/// crates built since the caches have been refreshed.
82+
QueueInProgress(usize),
83+
}
5284

53-
let mut opts = opts();
54-
opts.skip_if_exists = true;
85+
let mut status = BuilderState::Fresh;
86+
87+
loop {
88+
if !status.is_in_progress() {
89+
thread::sleep(Duration::from_secs(60));
90+
}
5591

5692
// check lock file
57-
if opts.prefix.join("cratesfyi.lock").exists() {
93+
if doc_builder.is_locked() {
5894
warn!("Lock file exits, skipping building new crates");
95+
status = BuilderState::Locked;
5996
continue;
6097
}
6198

62-
let mut doc_builder = DocBuilder::new(opts);
99+
if status.count() > 10 {
100+
// periodically, we need to flush our caches and ping the hubs
101+
debug!("10 builds in a row; flushing caches");
102+
status = BuilderState::QueueInProgress(0);
63103

64-
debug!("Checking new crates");
65-
let queue_count = match doc_builder.get_new_crates() {
66-
Ok(size) => size,
67-
Err(e) => {
68-
error!("Failed to get new crates: {}", e);
69-
continue;
104+
match pubsubhubbub::ping_hubs() {
105+
Err(e) => error!("Failed to ping hub: {}", e),
106+
Ok(n) => debug!("Succesfully pinged {} hubs", n)
70107
}
71-
};
72108

73-
// Only build crates if there is any
74-
if queue_count == 0 {
75-
debug!("Queue is empty, going back to sleep");
76-
continue;
77-
}
109+
if let Err(e) = doc_builder.load_cache() {
110+
error!("Failed to load cache: {}", e);
111+
}
78112

79-
info!("Building {} crates from queue", queue_count);
113+
if let Err(e) = doc_builder.save_cache() {
114+
error!("Failed to save cache: {}", e);
115+
}
80116

81-
// update index
82-
if let Err(e) = update_sources() {
83-
error!("Failed to update sources: {}", e);
84-
continue;
117+
if let Err(e) = update_sources() {
118+
error!("Failed to update sources: {}", e);
119+
continue;
120+
}
85121
}
86122

87-
if let Err(e) = doc_builder.load_cache() {
88-
error!("Failed to load cache: {}", e);
89-
continue;
123+
// Only build crates if there are any to build
124+
debug!("Checking build queue");
125+
match doc_builder.get_queue_count() {
126+
Err(e) => {
127+
error!("Failed to read the number of crates in the queue: {}", e);
128+
continue;
129+
}
130+
Ok(0) => {
131+
if status.is_in_progress() {
132+
// ping the hubs before continuing
133+
match pubsubhubbub::ping_hubs() {
134+
Err(e) => error!("Failed to ping hub: {}", e),
135+
Ok(n) => debug!("Succesfully pinged {} hubs", n)
136+
}
137+
138+
if let Err(e) = doc_builder.save_cache() {
139+
error!("Failed to save cache: {}", e);
140+
}
141+
}
142+
debug!("Queue is empty, going back to sleep");
143+
status = BuilderState::EmptyQueue;
144+
continue;
145+
}
146+
Ok(queue_count) => {
147+
info!("Starting build with {} crates in queue (currently on a {} crate streak)",
148+
queue_count, status.count());
149+
}
90150
}
91151

152+
// if we're starting a new batch, reload our caches and sources
153+
if !status.is_in_progress() {
154+
if let Err(e) = doc_builder.load_cache() {
155+
error!("Failed to load cache: {}", e);
156+
continue;
157+
}
158+
159+
if let Err(e) = update_sources() {
160+
error!("Failed to update sources: {}", e);
161+
continue;
162+
}
163+
}
92164

93-
// Run build_packages_queue in it's own thread to catch panics
165+
// Run build_packages_queue under `catch_unwind` to catch panics
94166
// This only panicked twice in the last 6 months but its just a better
95167
// idea to do this.
96-
let res = thread::spawn(move || {
97-
match doc_builder.build_packages_queue() {
98-
Err(e) => error!("Failed build new crates: {}", e),
99-
Ok(n) => {
100-
if n > 0 {
101-
match pubsubhubbub::ping_hubs() {
102-
Err(e) => error!("Failed to ping hub: {}", e),
103-
Ok(n) => debug!("Succesfully pinged {} hubs", n)
104-
}
105-
}
106-
}
107-
}
108-
109-
if let Err(e) = doc_builder.save_cache() {
110-
error!("Failed to save cache: {}", e);
168+
let res = catch_unwind(AssertUnwindSafe(|| {
169+
match doc_builder.build_next_queue_package() {
170+
Err(e) => error!("Failed to build crate from queue: {}", e),
171+
Ok(crate_built) => if crate_built {
172+
status.increment();
111173
}
112174

113-
debug!("Finished building new crates, going back to sleep");
114-
})
115-
.join();
175+
}
176+
}));
116177

117178
if let Err(e) = res {
118179
error!("GRAVE ERROR Building new crates panicked: {:?}", e);
119180
}
120181
}
182+
183+
impl BuilderState {
184+
fn count(&self) -> usize {
185+
match *self {
186+
BuilderState::QueueInProgress(n) => n,
187+
_ => 0,
188+
}
189+
}
190+
191+
fn is_in_progress(&self) -> bool {
192+
match *self {
193+
BuilderState::QueueInProgress(_) => true,
194+
_ => false,
195+
}
196+
}
197+
198+
fn increment(&mut self) {
199+
*self = BuilderState::QueueInProgress(self.count() + 1);
200+
}
201+
}
121202
});
122203

123204

0 commit comments

Comments
 (0)