Skip to content

Commit a514531

Browse files
committed
parallel phd! test run in 40 seconds or your money back
1 parent 6b5f2af commit a514531

File tree

5 files changed

+215
-113
lines changed

5 files changed

+215
-113
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

phd-tests/runner/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ anyhow.workspace = true
1414
backtrace.workspace = true
1515
camino.workspace = true
1616
clap = { workspace = true, features = ["derive"] }
17+
crossbeam-channel.workspace = true
1718
phd-framework.workspace = true
1819
phd-tests.workspace = true
1920
tokio = { workspace = true, features = ["full"] }

phd-tests/runner/src/config.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,12 @@ pub struct RunOptions {
156156
#[clap(long, default_value = "file")]
157157
pub server_logging_mode: ServerLogMode,
158158

159+
/// The parallelism with which to run PHD tests. If not provided, phd-runner
160+
/// will guess a reasonable number from the test environment's number of
161+
/// CPUs and available memory.
162+
#[clap(long, value_parser)]
163+
pub parallelism: Option<u16>,
164+
159165
/// The number of CPUs to assign to the guest in tests where the test is
160166
/// using the default machine configuration.
161167
#[clap(long, value_parser, default_value = "2")]

phd-tests/runner/src/execute.rs

Lines changed: 114 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
// License, v. 2.0. If a copy of the MPL was not distributed with this
33
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
44

5-
use std::sync::Arc;
5+
use std::sync::{Arc, Mutex};
66
use std::time::{Duration, Instant};
77

88
use phd_tests::phd_testcase::{Framework, TestCase, TestOutcome};
@@ -37,21 +37,9 @@ pub struct ExecutionStats {
3737
pub failed_test_cases: Vec<&'static TestCase>,
3838
}
3939

40-
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
41-
enum Status {
42-
Ran(TestOutcome),
43-
NotRun,
44-
}
45-
46-
struct Execution {
47-
tc: &'static TestCase,
48-
status: Status,
49-
}
50-
5140
/// Executes a set of tests using the supplied test context.
5241
pub async fn run_tests_with_ctx(
53-
ctx: &Arc<Framework>,
54-
mut fixtures: TestFixtures,
42+
ctx: &mut Vec<(Arc<Framework>, TestFixtures)>,
5543
run_opts: &RunOptions,
5644
) -> ExecutionStats {
5745
let mut executions = Vec::new();
@@ -60,10 +48,10 @@ pub async fn run_tests_with_ctx(
6048
&run_opts.include_filter,
6149
&run_opts.exclude_filter,
6250
) {
63-
executions.push(Execution { tc, status: Status::NotRun });
51+
executions.push(tc);
6452
}
6553

66-
let mut stats = ExecutionStats {
54+
let stats = ExecutionStats {
6755
tests_passed: 0,
6856
tests_failed: 0,
6957
tests_skipped: 0,
@@ -77,90 +65,128 @@ pub async fn run_tests_with_ctx(
7765
return stats;
7866
}
7967

80-
fixtures.execution_setup().unwrap();
81-
let sigint_rx = set_sigint_handler();
82-
info!("Running {} test(s)", executions.len());
83-
let start_time = Instant::now();
84-
for execution in &mut executions {
85-
if *sigint_rx.borrow() {
86-
info!("Test run interrupted by SIGINT");
87-
break;
88-
}
68+
let stats = Arc::new(Mutex::new(stats));
8969

90-
info!("Starting test {}", execution.tc.fully_qualified_name());
70+
async fn run_tests(
71+
execution_rx: crossbeam_channel::Receiver<&'static TestCase>,
72+
test_ctx: Arc<Framework>,
73+
mut fixtures: TestFixtures,
74+
stats: Arc<Mutex<ExecutionStats>>,
75+
sigint_rx: watch::Receiver<bool>,
76+
) -> Result<(), ()> {
77+
fixtures.execution_setup().unwrap();
9178

92-
// Failure to run a setup fixture is fatal to the rest of the run, but
93-
// it's still possible to report results, so return gracefully instead
94-
// of panicking.
95-
if let Err(e) = fixtures.test_setup() {
96-
error!("Error running test setup fixture: {}", e);
97-
break;
98-
}
79+
loop {
80+
// Check for SIGINT only at the top of the loop because while
81+
// waiting for a new testcase is theoretically a blocking
82+
// operation, it won't be in a meaningful way for our use. The
83+
// recv() will return immediately because either there are more
84+
// testcases to run or the sender is closed. The only long
85+
// blocking operation to check against in this loop is the test
86+
// run itself.
87+
if *sigint_rx.borrow() {
88+
info!("Test run interrupted by SIGINT");
89+
break;
90+
}
9991

100-
stats.tests_not_run -= 1;
101-
let test_ctx = ctx.clone();
102-
let tc = execution.tc;
103-
let mut sigint_rx_task = sigint_rx.clone();
104-
let test_outcome = tokio::spawn(async move {
105-
tokio::select! {
106-
// Ensure interrupt signals are always handled instead of
107-
// continuing to run the test.
108-
biased;
109-
result = sigint_rx_task.changed() => {
110-
assert!(
111-
result.is_ok(),
112-
"SIGINT channel shouldn't drop while tests are running"
113-
);
114-
115-
TestOutcome::Failed(
116-
Some("test interrupted by SIGINT".to_string())
117-
)
92+
let tc = match execution_rx.recv() {
93+
Ok(tc) => tc,
94+
Err(_) => {
95+
// RecvError means the channel is closed, so we're all
96+
// done.
97+
break;
11898
}
119-
outcome = tc.run(test_ctx.as_ref()) => outcome
99+
};
100+
101+
info!("Starting test {}", tc.fully_qualified_name());
102+
103+
// Failure to run a setup fixture is fatal to the rest of the
104+
// run, but it's still possible to report results, so return
105+
// gracefully instead of panicking.
106+
if let Err(e) = fixtures.test_setup() {
107+
error!("Error running test setup fixture: {}", e);
108+
// TODO: set this on stats too
109+
break;
120110
}
121-
})
122-
.await
123-
.unwrap_or_else(|_| {
124-
TestOutcome::Failed(Some(
125-
"test task panicked, see test logs".to_string(),
126-
))
127-
});
128-
129-
info!(
130-
"test {} ... {}{}",
131-
execution.tc.fully_qualified_name(),
132-
match test_outcome {
133-
TestOutcome::Passed => "ok",
134-
TestOutcome::Failed(_) => "FAILED: ",
135-
TestOutcome::Skipped(_) => "skipped: ",
136-
},
137-
match &test_outcome {
138-
TestOutcome::Failed(Some(s))
139-
| TestOutcome::Skipped(Some(s)) => s,
140-
TestOutcome::Failed(None) | TestOutcome::Skipped(None) =>
141-
"[no message]",
142-
_ => "",
111+
112+
{
113+
let mut stats = stats.lock().unwrap();
114+
stats.tests_not_run -= 1;
143115
}
144-
);
145116

146-
match test_outcome {
147-
TestOutcome::Passed => stats.tests_passed += 1,
148-
TestOutcome::Failed(_) => {
149-
stats.tests_failed += 1;
150-
stats.failed_test_cases.push(execution.tc);
117+
let test_outcome = tc.run(test_ctx.as_ref()).await;
118+
119+
info!(
120+
"test {} ... {}{}",
121+
tc.fully_qualified_name(),
122+
match test_outcome {
123+
TestOutcome::Passed => "ok",
124+
TestOutcome::Failed(_) => "FAILED: ",
125+
TestOutcome::Skipped(_) => "skipped: ",
126+
},
127+
match &test_outcome {
128+
TestOutcome::Failed(Some(s))
129+
| TestOutcome::Skipped(Some(s)) => s,
130+
TestOutcome::Failed(None) | TestOutcome::Skipped(None) =>
131+
"[no message]",
132+
_ => "",
133+
}
134+
);
135+
136+
{
137+
let mut stats = stats.lock().unwrap();
138+
match test_outcome {
139+
TestOutcome::Passed => stats.tests_passed += 1,
140+
TestOutcome::Failed(_) => {
141+
stats.tests_failed += 1;
142+
stats.failed_test_cases.push(tc);
143+
}
144+
TestOutcome::Skipped(_) => stats.tests_skipped += 1,
145+
}
151146
}
152-
TestOutcome::Skipped(_) => stats.tests_skipped += 1,
153-
}
154147

155-
execution.status = Status::Ran(test_outcome);
156-
if let Err(e) = fixtures.test_cleanup().await {
157-
error!("Error running cleanup fixture: {}", e);
158-
break;
148+
if let Err(e) = fixtures.test_cleanup().await {
149+
error!("Error running cleanup fixture: {}", e);
150+
// TODO: set this on stats
151+
break;
152+
}
159153
}
154+
155+
fixtures.execution_cleanup().unwrap();
156+
157+
Ok(())
160158
}
161-
stats.duration = start_time.elapsed();
162159

163-
fixtures.execution_cleanup().unwrap();
160+
let sigint_rx = set_sigint_handler();
161+
info!("Running {} test(s)", executions.len());
162+
let start_time = Instant::now();
163+
164+
let (execution_tx, execution_rx) =
165+
crossbeam_channel::unbounded::<&'static TestCase>();
166+
167+
let mut test_runners = tokio::task::JoinSet::new();
168+
169+
for (ctx, fixtures) in ctx.drain(..) {
170+
test_runners.spawn(run_tests(
171+
execution_rx.clone(),
172+
ctx,
173+
fixtures,
174+
Arc::clone(&stats),
175+
sigint_rx.clone(),
176+
));
177+
}
178+
179+
for execution in &mut executions {
180+
execution_tx.send(execution).expect("ok");
181+
}
182+
std::mem::drop(execution_tx);
183+
184+
let _ = test_runners.join_all().await;
185+
186+
let mut stats =
187+
Mutex::into_inner(Arc::into_inner(stats).expect("only one ref"))
188+
.expect("lock not panicked");
189+
stats.duration = start_time.elapsed();
164190

165191
stats
166192
}

0 commit comments

Comments
 (0)