Skip to content

Commit 28a793b

Browse files
committed
feat: add memory executor
1 parent 47d5bb0 commit 28a793b

File tree

9 files changed

+200
-0
lines changed

9 files changed

+200
-0
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ memmap2 = "0.9.5"
5757
nix = { version = "0.29.0", features = ["fs", "time", "user"] }
5858
futures = "0.3.31"
5959
runner-shared = { path = "crates/runner-shared" }
60+
heaptrack = { path = "crates/heaptrack" }
61+
ipc-channel = "0.18"
6062
shellexpand = { version = "3.1.1", features = ["tilde"] }
6163
addr2line = "0.25"
6264
gimli = "0.32"

src/run/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ pub enum RunnerMode {
148148
Instrumentation,
149149
Simulation,
150150
Walltime,
151+
Memory,
151152
}
152153

153154
#[derive(ValueEnum, Clone, Debug, PartialEq)]

src/run/runner/helpers/env.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ pub fn get_base_injected_env(
1515
#[allow(deprecated)]
1616
RunnerMode::Instrumentation | RunnerMode::Simulation => "instrumentation",
1717
RunnerMode::Walltime => "walltime",
18+
RunnerMode::Memory => "memory",
1819
};
1920
HashMap::from([
2021
("PYTHONHASHSEED", "0".into()),

src/run/runner/interfaces.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ pub struct RunData {
1010
pub enum ExecutorName {
1111
Valgrind,
1212
WallTime,
13+
Memory,
1314
}
1415

1516
#[allow(clippy::to_string_trait_impl)]
@@ -18,6 +19,7 @@ impl ToString for ExecutorName {
1819
match self {
1920
ExecutorName::Valgrind => "valgrind".to_string(),
2021
ExecutorName::WallTime => "walltime".to_string(),
22+
ExecutorName::Memory => "memory".to_string(),
2123
}
2224
}
2325
}

src/run/runner/memory/executor.rs

Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
use crate::prelude::*;
2+
use crate::run::instruments::mongo_tracer::MongoTracer;
3+
use crate::run::runner::executor::Executor;
4+
use crate::run::runner::helpers::command::CommandBuilder;
5+
use crate::run::runner::helpers::get_bench_command::get_bench_command;
6+
use crate::run::runner::helpers::run_command_with_log_pipe::run_command_with_log_pipe_and_callback;
7+
use crate::run::runner::helpers::run_with_sudo::wrap_with_sudo;
8+
use crate::run::runner::shared::fifo::RunnerFifo;
9+
use crate::run::runner::{ExecutorName, RunData};
10+
use crate::run::{check_system::SystemInfo, config::Config};
11+
use async_trait::async_trait;
12+
use heaptrack::HeaptrackIpcClient;
13+
use heaptrack::HeaptrackIpcServer;
14+
use ipc_channel::ipc;
15+
use runner_shared::benchmark_results::{BenchmarkResultExt, MarkerResult};
16+
use runner_shared::fifo::Command as FifoCommand;
17+
use runner_shared::fifo::IntegrationMode;
18+
use std::path::Path;
19+
use std::process::Command;
20+
use std::rc::Rc;
21+
22+
pub struct MemoryExecutor;
23+
24+
impl MemoryExecutor {
25+
fn build_heaptrack_command(
26+
config: &Config,
27+
run_data: &RunData,
28+
) -> Result<(HeaptrackIpcServer, CommandBuilder)> {
29+
// FIXME: We only support native languages for now
30+
31+
// Find heaptrack binary - check env variable or use default command name
32+
let heaptrack_path = std::env::var("CODSPEED_HEAPTRACK_BINARY")
33+
.unwrap_or_else(|_| "codspeed-heaptrack".to_string());
34+
35+
// Always use env to preserve LD_LIBRARY_PATH and other environment variables
36+
let mut cmd_builder = CommandBuilder::new("env");
37+
38+
// Preserve LD_LIBRARY_PATH from the current environment if it exists
39+
if let Ok(ld_library_path) = std::env::var("LD_LIBRARY_PATH") {
40+
cmd_builder.arg(format!("LD_LIBRARY_PATH={ld_library_path}"));
41+
}
42+
43+
cmd_builder.arg(&heaptrack_path);
44+
cmd_builder.arg("track");
45+
cmd_builder.arg(get_bench_command(config)?);
46+
cmd_builder.arg("--output");
47+
cmd_builder.arg(run_data.profile_folder.join("results"));
48+
49+
// Setup heaptrack IPC server
50+
let (ipc_server, server_name) = ipc::IpcOneShotServer::new()?;
51+
cmd_builder.arg("--ipc-server");
52+
cmd_builder.arg(server_name);
53+
54+
Ok((ipc_server, cmd_builder))
55+
}
56+
}
57+
58+
#[async_trait(?Send)]
59+
impl Executor for MemoryExecutor {
60+
fn name(&self) -> ExecutorName {
61+
ExecutorName::Memory
62+
}
63+
64+
async fn setup(
65+
&self,
66+
_system_info: &SystemInfo,
67+
_setup_cache_dir: Option<&Path>,
68+
) -> Result<()> {
69+
// Validate that the codspeed-heaptrack command is available
70+
let heaptrack_path = std::env::var("CODSPEED_HEAPTRACK_BINARY")
71+
.unwrap_or_else(|_| "codspeed-heaptrack".to_string());
72+
73+
info!("Validating heaptrack binary at path: {}", heaptrack_path);
74+
let output = Command::new(&heaptrack_path).arg("--version").output()?;
75+
if !output.status.success() {
76+
bail!("codspeed-heaptrack command is not available or failed to execute");
77+
}
78+
79+
Ok(())
80+
}
81+
82+
async fn run(
83+
&self,
84+
config: &Config,
85+
_system_info: &SystemInfo,
86+
run_data: &RunData,
87+
_mongo_tracer: &Option<MongoTracer>,
88+
) -> Result<()> {
89+
// Create the results/ directory inside the profile folder to avoid having heaptrack create it with wrong permissions
90+
std::fs::create_dir_all(run_data.profile_folder.join("results"))?;
91+
92+
let (ipc, cmd_builder) = Self::build_heaptrack_command(config, run_data)?;
93+
let cmd = wrap_with_sudo(cmd_builder)?.build();
94+
debug!("cmd: {cmd:?}");
95+
96+
let runner_fifo = RunnerFifo::new()?;
97+
let on_process_started = async |pid| -> anyhow::Result<()> {
98+
let marker_result = Self::handle_fifo(runner_fifo, pid, ipc).await?;
99+
100+
// Directly write to the profile folder, to avoid having to define another field
101+
marker_result
102+
.save_to(run_data.profile_folder.join("results"))
103+
.unwrap();
104+
105+
Ok(())
106+
};
107+
108+
let status = run_command_with_log_pipe_and_callback(cmd, on_process_started).await?;
109+
debug!("cmd exit status: {:?}", status);
110+
111+
if !status.success() {
112+
bail!("failed to execute memory tracker process: {status}");
113+
}
114+
115+
Ok(())
116+
}
117+
118+
async fn teardown(
119+
&self,
120+
_config: &Config,
121+
_system_info: &SystemInfo,
122+
_run_data: &RunData,
123+
) -> Result<()> {
124+
Ok(())
125+
}
126+
}
127+
128+
impl MemoryExecutor {
129+
async fn handle_fifo(
130+
mut runner_fifo: RunnerFifo,
131+
pid: u32,
132+
ipc: HeaptrackIpcServer,
133+
) -> anyhow::Result<MarkerResult> {
134+
debug!("handle_fifo called with PID {pid}");
135+
136+
// Accept the IPC connection from heaptrack and get the sender it sends us
137+
let (_, heaptrack_sender) = ipc.accept()?;
138+
let ipc_client = Rc::new(HeaptrackIpcClient::from_accepted(heaptrack_sender));
139+
140+
let ipc_client_health = Rc::clone(&ipc_client);
141+
let health_check = async move || {
142+
// Ping heaptrack via IPC to check if it's still responding
143+
match ipc_client_health.ping() {
144+
Ok(()) => Ok(true),
145+
Err(_) => Ok(false),
146+
}
147+
};
148+
149+
let on_cmd = async move |cmd: &FifoCommand| {
150+
match cmd {
151+
FifoCommand::StartBenchmark => {
152+
debug!("Enabling heaptrack via IPC");
153+
if let Err(e) = ipc_client.enable() {
154+
error!("Failed to enable heaptrack: {e}");
155+
return Ok(FifoCommand::Err);
156+
}
157+
}
158+
FifoCommand::StopBenchmark => {
159+
debug!("Disabling heaptrack via IPC");
160+
if let Err(e) = ipc_client.disable() {
161+
// There's a chance that heaptrack has already exited here, so just log as debug
162+
debug!("Failed to disable heaptrack: {e}");
163+
return Ok(FifoCommand::Err);
164+
}
165+
}
166+
FifoCommand::GetIntegrationMode => {
167+
return Ok(FifoCommand::IntegrationModeResponse(
168+
IntegrationMode::Analysis,
169+
));
170+
}
171+
_ => {
172+
warn!("Unhandled FIFO command: {cmd:?}");
173+
return Ok(FifoCommand::Err);
174+
}
175+
}
176+
177+
Ok(FifoCommand::Ack)
178+
};
179+
180+
let (marker_result, _) = runner_fifo
181+
.handle_fifo_messages(health_check, on_cmd)
182+
.await?;
183+
Ok(marker_result)
184+
}
185+
}

src/run/runner/memory/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
pub mod executor;

src/run/runner/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use super::{RunnerMode, config::Config};
77
mod executor;
88
mod helpers;
99
mod interfaces;
10+
mod memory;
1011
mod shared;
1112
#[cfg(test)]
1213
mod tests;
@@ -16,6 +17,7 @@ mod wall_time;
1617
use executor::Executor;
1718
use helpers::profile_folder::create_profile_folder;
1819
pub use interfaces::{ExecutorName, RunData};
20+
use memory::executor::MemoryExecutor;
1921
use valgrind::executor::ValgrindExecutor;
2022
use wall_time::executor::WallTimeExecutor;
2123

@@ -26,6 +28,7 @@ impl Display for RunnerMode {
2628
RunnerMode::Instrumentation => write!(f, "instrumentation"),
2729
RunnerMode::Simulation => write!(f, "simulation"),
2830
RunnerMode::Walltime => write!(f, "walltime"),
31+
RunnerMode::Memory => write!(f, "memory"),
2932
}
3033
}
3134
}
@@ -37,13 +40,15 @@ pub fn get_executor_from_mode(mode: &RunnerMode) -> Box<dyn Executor> {
3740
#[allow(deprecated)]
3841
RunnerMode::Instrumentation | RunnerMode::Simulation => Box::new(ValgrindExecutor),
3942
RunnerMode::Walltime => Box::new(WallTimeExecutor::new()),
43+
RunnerMode::Memory => Box::new(MemoryExecutor),
4044
}
4145
}
4246

4347
pub fn get_all_executors() -> Vec<Box<dyn Executor>> {
4448
vec![
4549
Box::new(ValgrindExecutor),
4650
Box::new(WallTimeExecutor::new()),
51+
Box::new(MemoryExecutor),
4752
]
4853
}
4954

src/run/uploader/upload.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ async fn create_profile_archive(
5858
) -> Result<ProfileArchive> {
5959
let time_start = std::time::Instant::now();
6060
let profile_archive = match executor_name {
61+
ExecutorName::Memory => todo!(),
6162
ExecutorName::Valgrind => {
6263
debug!("Creating compressed tar archive for Valgrind");
6364
let enc = GzipEncoder::new(Vec::new());

0 commit comments

Comments
 (0)