Skip to content

add "priority" to the build queue, and decouple builds from reading new crates #344

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 30 additions & 2 deletions src/bin/cratesfyi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ use std::path::PathBuf;

use clap::{Arg, App, SubCommand};
use cratesfyi::{DocBuilder, DocBuilderOptions, db};
use cratesfyi::utils::build_doc;
use cratesfyi::utils::{build_doc, add_crate_to_queue};
use cratesfyi::start_web_server;
use cratesfyi::db::add_path_into_database;
use cratesfyi::db::{add_path_into_database, connect_db};


pub fn main() {
Expand Down Expand Up @@ -130,6 +130,23 @@ pub fn main() {
chart")
.subcommand(SubCommand::with_name("update-search-index"))
.about("Updates search index"))
.subcommand(SubCommand::with_name("queue")
.about("Interactions with the build queue")
.subcommand(SubCommand::with_name("add")
.about("Add a crate to the build queue")
.arg(Arg::with_name("CRATE_NAME")
.index(1)
.required(true)
.help("Name of crate to build"))
.arg(Arg::with_name("CRATE_VERSION")
.index(2)
.required(true)
.help("Version of crate to build"))
.arg(Arg::with_name("BUILD_PRIORITY")
.short("p")
.long("priority")
.help("Priority of build (default: 5) (new crate builds get priority 0)")
.takes_value(true))))
.get_matches();


Expand Down Expand Up @@ -227,6 +244,17 @@ pub fn main() {
start_web_server(Some(matches.value_of("SOCKET_ADDR").unwrap_or("0.0.0.0:3000")));
} else if let Some(_) = matches.subcommand_matches("daemon") {
cratesfyi::utils::start_daemon();
} else if let Some(matches) = matches.subcommand_matches("queue") {
if let Some(matches) = matches.subcommand_matches("add") {
let priority = matches.value_of("BUILD_PRIORITY").unwrap_or("5");
let priority: i32 = priority.parse().expect("--priority was not a number");
let conn = connect_db().expect("Could not connect to database");

add_crate_to_queue(&conn,
matches.value_of("CRATE_NAME").unwrap(),
matches.value_of("CRATE_VERSION").unwrap(),
priority).expect("Could not add crate to queue");
}
} else {
println!("{}", matches.usage());
}
Expand Down
10 changes: 10 additions & 0 deletions src/db/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,16 @@ pub fn migrate(version: Option<Version>) -> CratesfyiResult<()> {
"DROP TABLE authors, author_rels, keyword_rels, keywords, owner_rels,
owners, releases, crates, builds, queue, files, config;"
),
migration!(
// version
2,
// description
"Added priority column to build queue",
// upgrade query
"ALTER TABLE queue ADD COLUMN priority INT DEFAULT 0;",
// downgrade query
"ALTER TABLE queue DROP COLUMN priority;"
),
];

for migration in migrations {
Expand Down
5 changes: 5 additions & 0 deletions src/docbuilder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ impl DocBuilder {
Ok(())
}

/// Checks for the lock file and returns whether it currently exists.
pub fn is_locked(&self) -> bool {
self.lock_path().exists()
}

/// Returns a reference of options
pub fn options(&self) -> &DocBuilderOptions {
&self.options
Expand Down
61 changes: 51 additions & 10 deletions src/docbuilder/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,35 +5,38 @@ use super::DocBuilder;
use db::connect_db;
use error::Result;
use crates_index_diff::{ChangeKind, Index};
use utils::add_crate_to_queue;


impl DocBuilder {
/// Updates crates.io-index repository and adds new crates into build queue.
/// Returns size of queue
pub fn get_new_crates(&mut self) -> Result<i64> {
pub fn get_new_crates(&mut self) -> Result<usize> {
let conn = try!(connect_db());
let index = try!(Index::from_path_or_cloned(&self.options.crates_io_index_path));
let mut changes = try!(index.fetch_changes());
let mut add_count: usize = 0;

// I belive this will fix ordering of queue if we get more than one crate from changes
changes.reverse();

for krate in changes.iter().filter(|k| k.kind != ChangeKind::Yanked) {
conn.execute("INSERT INTO queue (name, version) VALUES ($1, $2)",
&[&krate.name, &krate.version])
.ok();
add_crate_to_queue(&conn, &krate.name, &krate.version, 0).ok();
debug!("{}-{} added into build queue", krate.name, krate.version);
add_count += 1;
}

let queue_count = conn.query("SELECT COUNT(*) FROM queue WHERE attempt < 5", &[])
Ok(add_count)
}

pub fn get_queue_count(&self) -> Result<i64> {
let conn = try!(connect_db());
Ok(conn.query("SELECT COUNT(*) FROM queue WHERE attempt < 5", &[])
.unwrap()
.get(0)
.get(0);

Ok(queue_count)
.get(0))
}


/// Builds packages from queue
pub fn build_packages_queue(&mut self) -> Result<usize> {
let conn = try!(connect_db());
Expand All @@ -42,7 +45,7 @@ impl DocBuilder {
for row in &try!(conn.query("SELECT id, name, version
FROM queue
WHERE attempt < 5
ORDER BY id ASC",
ORDER BY priority ASC, attempt ASC, id ASC",
&[])) {
let id: i32 = row.get(0);
let name: String = row.get(1);
Expand All @@ -67,6 +70,44 @@ impl DocBuilder {

Ok(build_count)
}

/// Builds the top package from the queue. Returns whether the queue was empty.
pub fn build_next_queue_package(&mut self) -> Result<bool> {
let conn = try!(connect_db());

let query = try!(conn.query("SELECT id, name, version
FROM queue
WHERE attempt < 5
ORDER BY priority ASC, attempt ASC, id ASC
LIMIT 1",
&[]));

if query.is_empty() {
// nothing in the queue; bail
return Ok(false);
}

let id: i32 = query.get(0).get(0);
let name: String = query.get(0).get(1);
let version: String = query.get(0).get(2);

match self.build_package(&name[..], &version[..]) {
Ok(_) => {
let _ = conn.execute("DELETE FROM queue WHERE id = $1", &[&id]);
}
Err(e) => {
// Increase attempt count
let _ = conn.execute("UPDATE queue SET attempt = attempt + 1 WHERE id = $1",
&[&id]);
error!("Failed to build package {}-{} from queue: {}",
name,
version,
e)
}
}

Ok(true)
}
}

#[cfg(test)]
Expand Down
168 changes: 124 additions & 44 deletions src/utils/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@


use std::{env, thread};
use std::panic::{catch_unwind, AssertUnwindSafe};
use std::process::exit;
use std::fs::File;
use std::io::Write;
Expand Down Expand Up @@ -44,80 +45,159 @@ pub fn start_daemon() {
exit(0);
}


// check new crates every minute
thread::spawn(move || {
// space this out to prevent it from clashing against the queue-builder thread on launch
thread::sleep(Duration::from_secs(30));
loop {
let opts = opts();
let mut doc_builder = DocBuilder::new(opts);

debug!("Checking new crates");
match doc_builder.get_new_crates() {
Ok(n) => debug!("{} crates added to queue", n),
Err(e) => error!("Failed to get new crates: {}", e),
}

thread::sleep(Duration::from_secs(60));
}
});

// build new crates every minute
thread::spawn(move || {
let opts = opts();
let mut doc_builder = DocBuilder::new(opts);

/// Represents the current state of the builder thread.
enum BuilderState {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When i initially wrote this enum, i had thought that the logic would be more complicated than it wound up being. This can probably be reduced to an Option<usize>, but i left this state enum in to make the usage code easier to read.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: I tried to reduce this to a usize and then to an Option<usize>, and proceeded to introduce bugs into the implementation. I think i'll keep this state enum as-is, since it provides a more readable implementation, IMO. (It'll take up the same space on the stack as an Option<usize> regardless.)

/// The builder thread has just started, and hasn't built any crates yet.
Fresh,
/// The builder has just seen an empty build queue.
EmptyQueue,
/// The builder has just seen the lock file.
Locked,
/// The builder has just finished building a crate. The enclosed count is the number of
/// crates built since the caches have been refreshed.
QueueInProgress(usize),
}

let mut opts = opts();
opts.skip_if_exists = true;
let mut status = BuilderState::Fresh;

loop {
if !status.is_in_progress() {
thread::sleep(Duration::from_secs(60));
}

// check lock file
if opts.prefix.join("cratesfyi.lock").exists() {
if doc_builder.is_locked() {
warn!("Lock file exits, skipping building new crates");
status = BuilderState::Locked;
continue;
}

let mut doc_builder = DocBuilder::new(opts);
if status.count() > 10 {
// periodically, we need to flush our caches and ping the hubs
debug!("10 builds in a row; flushing caches");
status = BuilderState::QueueInProgress(0);

debug!("Checking new crates");
let queue_count = match doc_builder.get_new_crates() {
Ok(size) => size,
Err(e) => {
error!("Failed to get new crates: {}", e);
continue;
match pubsubhubbub::ping_hubs() {
Err(e) => error!("Failed to ping hub: {}", e),
Ok(n) => debug!("Succesfully pinged {} hubs", n)
}
};

// Only build crates if there is any
if queue_count == 0 {
debug!("Queue is empty, going back to sleep");
continue;
}
if let Err(e) = doc_builder.load_cache() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This load/save dance is here in case the on-disk cache has been updated since the builder thread has last saved it. Since it's represented in-memory as a BTreeSet, reloading everything in doesn't create any duplicates in the cache when it's saved into disk later.

error!("Failed to load cache: {}", e);
}

info!("Building {} crates from queue", queue_count);
if let Err(e) = doc_builder.save_cache() {
error!("Failed to save cache: {}", e);
}

// update index
if let Err(e) = update_sources() {
error!("Failed to update sources: {}", e);
continue;
if let Err(e) = update_sources() {
error!("Failed to update sources: {}", e);
continue;
}
}

if let Err(e) = doc_builder.load_cache() {
error!("Failed to load cache: {}", e);
continue;
// Only build crates if there are any to build
debug!("Checking build queue");
match doc_builder.get_queue_count() {
Err(e) => {
error!("Failed to read the number of crates in the queue: {}", e);
continue;
}
Ok(0) => {
if status.is_in_progress() {
// ping the hubs before continuing
match pubsubhubbub::ping_hubs() {
Err(e) => error!("Failed to ping hub: {}", e),
Ok(n) => debug!("Succesfully pinged {} hubs", n)
}

if let Err(e) = doc_builder.save_cache() {
error!("Failed to save cache: {}", e);
}
}
debug!("Queue is empty, going back to sleep");
status = BuilderState::EmptyQueue;
continue;
}
Ok(queue_count) => {
info!("Starting build with {} crates in queue (currently on a {} crate streak)",
queue_count, status.count());
}
}

// if we're starting a new batch, reload our caches and sources
if !status.is_in_progress() {
if let Err(e) = doc_builder.load_cache() {
error!("Failed to load cache: {}", e);
continue;
}

if let Err(e) = update_sources() {
error!("Failed to update sources: {}", e);
continue;
}
}

// Run build_packages_queue in it's own thread to catch panics
// Run build_packages_queue under `catch_unwind` to catch panics
// This only panicked twice in the last 6 months but its just a better
// idea to do this.
let res = thread::spawn(move || {
match doc_builder.build_packages_queue() {
Err(e) => error!("Failed build new crates: {}", e),
Ok(n) => {
if n > 0 {
match pubsubhubbub::ping_hubs() {
Err(e) => error!("Failed to ping hub: {}", e),
Ok(n) => debug!("Succesfully pinged {} hubs", n)
}
}
}
}

if let Err(e) = doc_builder.save_cache() {
error!("Failed to save cache: {}", e);
let res = catch_unwind(AssertUnwindSafe(|| {
match doc_builder.build_next_queue_package() {
Err(e) => error!("Failed to build crate from queue: {}", e),
Ok(crate_built) => if crate_built {
status.increment();
}

debug!("Finished building new crates, going back to sleep");
})
.join();
}
}));

if let Err(e) = res {
error!("GRAVE ERROR Building new crates panicked: {:?}", e);
}
}

impl BuilderState {
fn count(&self) -> usize {
match *self {
BuilderState::QueueInProgress(n) => n,
_ => 0,
}
}

fn is_in_progress(&self) -> bool {
match *self {
BuilderState::QueueInProgress(_) => true,
_ => false,
}
}

fn increment(&mut self) {
*self = BuilderState::QueueInProgress(self.count() + 1);
}
}
});


Expand Down
Loading