Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Toggle event processing through the filesystem #30

Merged
merged 1 commit into from
Dec 21, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
239 changes: 213 additions & 26 deletions ci-bench-runner/Cargo.lock
2 changes: 2 additions & 0 deletions ci-bench-runner/Cargo.toml
Original file line number Diff line number Diff line change
@@ -12,6 +12,7 @@ hex = "0.4.3"
hmac = "0.12.1"
hyper = { version = "0.14.27", default-features = false }
jsonwebtoken = "9.1.0"
notify = "6.1.1"
octocrab = "0.32.0"
sentry = { version = "0.31.7", features = ["tracing", "ureq", "rustls"], default-features = false }
sentry-tracing = "0.31.7"
@@ -29,5 +30,6 @@ uuid = { version = "1.4.1", features = ["v4", "serde"] }

[dev-dependencies]
ctor = "0.2.5"
libc = "0.2.151"
reqwest = { version = "0.11.22", default-features = false, features = ["rustls-tls-webpki-roots"] }
wiremock = "0.5.19"
119 changes: 114 additions & 5 deletions ci-bench-runner/src/event_queue.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use std::fmt::{Debug, Formatter};
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use std::time::Duration;

use anyhow::Context;
use axum::body::Bytes;
use notify::{EventKind, RecommendedWatcher, RecursiveMode, Watcher};
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
@@ -25,6 +27,11 @@ pub struct EventQueue {
active_job_id: Arc<Mutex<Option<Uuid>>>,
/// A sender indicating that a new event has been enqueued
event_enqueued_tx: UnboundedSender<()>,
/// Keeps track of whether incoming events should be processed.
///
/// Note: when event processing gets disabled, we still let the currently active job run to
/// completion.
process_events_toggler: ProcessEventsToggler,
/// Database handle, used to persist events and recover in case of crashes
db: Db,
/// Bencher.dev client
@@ -42,22 +49,24 @@ impl EventQueue {
db: Db,
bench_runner: Arc<dyn BenchRunner>,
octocrab: CachedOctocrab,
) -> Self {
) -> anyhow::Result<Self> {
let (worker_tx, event_enqueued_rx) = tokio::sync::mpsc::unbounded_channel();

let queue = Self {
active_job_id: Arc::new(Mutex::new(None)),
event_enqueued_tx: worker_tx,
process_events_toggler: ProcessEventsToggler::new()
.context("failed to initialize ProcessEventsToggler")?,
db,
bencher_dev: config.bencher.clone().map(BencherDev::new),
};

queue.start_and_supervise_queue_processing(
Ok(queue.start_and_supervise_queue_processing(
event_enqueued_rx,
config,
bench_runner,
octocrab,
)
))
}

/// Starts and supervises the background queue processing task
@@ -71,11 +80,14 @@ impl EventQueue {
let active_job_id = self.active_job_id.clone();
let queue = self.clone();
let event_enqueued_rx = Arc::new(tokio::sync::Mutex::new(event_enqueued_rx));
let toggler = self.process_events_toggler.clone();

tokio::spawn(async move {
loop {
let background_task = queue.process_queued_events_in_background(
event_enqueued_rx.clone(),
config.clone(),
toggler.clone(),
bench_runner.clone(),
octocrab.clone(),
);
@@ -118,6 +130,7 @@ impl EventQueue {
&self,
event_enqueued_rx: Arc<tokio::sync::Mutex<UnboundedReceiver<()>>>,
config: Arc<AppConfig>,
mut toggler: ProcessEventsToggler,
bench_runner: Arc<dyn BenchRunner>,
octocrab: CachedOctocrab,
) -> JoinHandle<anyhow::Result<()>> {
@@ -140,7 +153,10 @@ impl EventQueue {
break;
}

// Now get it from the database
// Postpone event processing if requested
toggler.wait_for_processing_enabled().await;

// Get the next event from the database
let event = db.next_queued_event().await?;

let Some(github_event) = AllowedEvent::from_event_string(&event.event) else {
@@ -222,6 +238,16 @@ impl EventQueue {
Ok(Some(event_id))
}

/// Returns the active job's id, if there is an active job
pub fn active_job_id(&self) -> Option<Uuid> {
*self.active_job_id.lock().unwrap()
}

/// Returns whether event processing is currently enabled
pub fn event_processing_enabled(&self) -> bool {
self.process_events_toggler.processing_enabled()
}

/// Returns a user-facing view of the given job id, or `None` if the job could not be found
pub async fn job_view(&self, job_id: Uuid) -> anyhow::Result<Option<JobView>> {
let Some(job) = self.db.maybe_job(job_id).await? else {
@@ -320,3 +346,86 @@ pub enum JobStatus {
Success,
Failure,
}

/// Watches the filesystem to toggle event processing.
///
/// Event processing is enabled by default, but can be disabled by creating a file called `pause`
/// in the program's working directory. If the file is present upon startup or gets created while
/// the application runs, processing will be disabled until the file is deleted.
struct ProcessEventsToggler {
/// Filesystem watcher
watcher: Arc<RecommendedWatcher>,
/// Receiver tracking the current state of the toggle (enabled / disabled)
processing_enabled_rx: tokio::sync::watch::Receiver<bool>,
}

impl ProcessEventsToggler {
fn new() -> anyhow::Result<Self> {
let pause_file_path = Path::new("pause");
let process_events = pause_file_path.try_exists().ok() != Some(true);
let (processing_enabled_tx, processing_enabled_rx) =
tokio::sync::watch::channel(process_events);

let mut watcher =
notify::recommended_watcher(move |r: notify::Result<notify::Event>| match r {
Ok(event) => {
if event
.paths
.iter()
.any(|p| p.file_name() == Some(pause_file_path.as_os_str()))
{
match event.kind {
EventKind::Create(_) => {
info!("event processing disabled");
processing_enabled_tx.send_replace(false);
}
EventKind::Remove(_) => {
info!("event processing enabled");
processing_enabled_tx.send_replace(true);
}
_ => {}
}
}
}
Err(e) => error!("error watching pause file: {:?}", e),
})?;

let watched_path = Path::new(".");
info!(
"watching for file creation/deletion under {}",
watched_path.canonicalize()?.display()
);
watcher.watch(watched_path, RecursiveMode::NonRecursive)?;

Ok(ProcessEventsToggler {
watcher: Arc::new(watcher),
processing_enabled_rx,
})
}

fn processing_enabled(&self) -> bool {
*self.processing_enabled_rx.borrow()
}

fn processing_disabled(&self) -> bool {
!self.processing_enabled()
}

async fn wait_for_processing_enabled(&mut self) {
if self.processing_disabled() {
info!("event handling postponed until processing gets enabled");
}

self.processing_enabled_rx.wait_for(|&b| b).await.unwrap();
assert!(*self.processing_enabled_rx.borrow());
}
}

impl Clone for ProcessEventsToggler {
fn clone(&self) -> Self {
Self {
watcher: self.watcher.clone(),
processing_enabled_rx: self.processing_enabled_rx.clone(),
}
}
}
6 changes: 4 additions & 2 deletions ci-bench-runner/src/lib.rs
Original file line number Diff line number Diff line change
@@ -104,7 +104,7 @@ pub async fn server(
// Set up dependencies
let octocrab = CachedOctocrab::new(&config).await?;
let db = Db::with_connection(sqlite);
let event_queue = EventQueue::new(config.clone(), db.clone(), bench_runner, octocrab);
let event_queue = EventQueue::new(config.clone(), db.clone(), bench_runner, octocrab)?;

// Create the application's state, accessible when handling requests
let state = Arc::new(AppState {
@@ -134,10 +134,12 @@ pub async fn server(
}

/// Returns git commit information about the binary that is currently deployed
async fn get_server_info() -> Json<serde_json::Value> {
async fn get_server_info(State(state): State<Arc<AppState>>) -> Json<serde_json::Value> {
Json(json!({
"git_commit_sha": env!("GIT_HEAD_SHA").to_string(),
"git_commit_message": env!("GIT_HEAD_COMMIT_MESSAGE").to_string(),
"active_job_id": state.event_queue.active_job_id(),
"event_processing_enabled": state.event_queue.event_processing_enabled(),
}))
}

85 changes: 85 additions & 0 deletions ci-bench-runner/src/test/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::fs;
use std::fs::File;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
@@ -258,6 +259,79 @@ async fn test_issue_comment_happy_path() {
mock_github.server.verify().await;
}

#[cfg(target_os = "linux")]
#[tokio::test]
async fn test_issue_comment_postponed_processing() {
let tempdir = tempfile::tempdir().unwrap();
let client = reqwest::Client::default();

// This is necessary to create a "pause" file without interfering with other tests
// See https://stackoverflow.com/a/73867506/2110623 for details on `unshare`
unsafe { libc::unshare(libc::CLONE_FS) };
std::env::set_current_dir(tempdir.path()).unwrap();

// Mock HTTP responses from GitHub
let mock_github = MockGitHub::start().await;
let _get_pr = mock_github.mock_get_pr().await;
let _post_comment = mock_github.mock_post_comment().await;
let update_status = mock_github.mock_post_status().await;

// Run the job server
let server = TestServer::start(&mock_github).await;

// Sanity check: event processing is enabled
let response = get_info(&client, &server.base_url).await;
assert_eq!(
response.get("event_processing_enabled"),
Some(&serde_json::Value::Bool(true))
);

// Disable event processing
let pause_file_path = tempdir.path().join("pause");
File::create(&pause_file_path).unwrap();

// Post the webhook event
let event = webhook::comment("@rustls-benchmarking bench", "created", "OWNER");
post_webhook(
&client,
&server.base_url,
&server.config.webhook_secret,
event,
"issue_comment",
)
.await;

// Ensure there is no active job after one second and event processing is disabled
tokio::time::sleep(Duration::from_secs(1)).await;
let response = get_info(&client, &server.base_url).await;
assert_eq!(
response.get("active_job_id"),
Some(&serde_json::Value::Null)
);
assert_eq!(
response.get("event_processing_enabled"),
Some(&serde_json::Value::Bool(false))
);

// Re-enable event processing
fs::remove_file(&pause_file_path).unwrap();

// Wait for our mock endpoints to have been called
tokio::time::timeout(Duration::from_secs(5), update_status.wait_until_satisfied())
.await
.ok();

// Assert that the mocks were used and report any errors
mock_github.server.verify().await;

// Sanity check: event processing is enabled
let response = get_info(&client, &server.base_url).await;
assert_eq!(
response.get("event_processing_enabled"),
Some(&serde_json::Value::Bool(true))
);
}

#[tokio::test]
async fn test_pr_opened_happy_path_with_comment_reuse() {
// Mock HTTP responses from GitHub
@@ -685,6 +759,17 @@ async fn post_webhook(
assert_eq!(response.status(), StatusCode::OK);
}

async fn get_info(client: &reqwest::Client, base_url: &str) -> serde_json::Value {
client
.get(format!("{base_url}/info"))
.send()
.await
.unwrap()
.json()
.await
.unwrap()
}

async fn ensure_webhook_handled(server: &TestServer) {
tokio::time::sleep(Duration::from_secs(1)).await;
let events = server.db.queued_events().await.unwrap();
4 changes: 4 additions & 0 deletions readme.md
Original file line number Diff line number Diff line change
@@ -64,6 +64,10 @@ The following features are supported:
body. This can be used as a fallback mechanism when the triggers mentioned above are not enough.
- Report comparison results in a comment to the relevant PR, reusing the same comment when new
results are available.
- Pause event processing by creating a file called `pause` in the application's working directory.
- Show information about the application through the `/info` endpoint. Includes the hash of the
deployed commit, the id of the active job (if any) and whether event processing is currently
enabled.

Interesting ideas for later: