Skip to content

Commit

Permalink
Add new action state.
Browse files Browse the repository at this point in the history
  • Loading branch information
milesj committed Jan 23, 2025
1 parent 0ff8a0c commit 378d7ba
Show file tree
Hide file tree
Showing 6 changed files with 226 additions and 181 deletions.
134 changes: 134 additions & 0 deletions crates/remote/src/action_state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
use crate::fs_digest::*;
use bazel_remote_apis::build::bazel::remote::execution::v2::{
command, platform, Action, ActionResult, Command, Digest, ExecutedActionMetadata,
};
use miette::IntoDiagnostic;
use moon_action::Operation;
use moon_task::Task;
use std::collections::BTreeMap;
use std::path::Path;

pub struct ActionState<'task> {
task: &'task Task,

// RE API
pub action: Action,
pub action_result: Option<ActionResult>,
pub command: Command,

// To upload
pub blobs: Vec<Blob>,
}

impl<'task> ActionState<'task> {
pub fn new(digest: Digest, task: &Task) -> ActionState<'_> {
let mut action = Action {
command_digest: Some(digest),
do_not_cache: !task.options.cache,
input_root_digest: None, // TODO?
..Default::default()
};

// https://github.com/bazelbuild/remote-apis/blob/main/build/bazel/remote/execution/v2/platform.md
if let Some(os_list) = &task.options.os {
let platform = action.platform.get_or_insert_default();

for os in os_list {
platform.properties.push(platform::Property {
name: "OSFamily".into(),
value: os.to_string(),
});
}
}

// Since we don't support (or plan to) remote execution,
// then we can ignore all the working directory logic
let mut command = Command {
arguments: vec![task.command.clone()],
output_paths: vec![], // TODO
..Default::default()
};

command.arguments.extend(task.args.clone());

for (name, value) in BTreeMap::from_iter(task.env.clone()) {
command
.environment_variables
.push(command::EnvironmentVariable { name, value });
}

ActionState {
task,
action,
action_result: None,
command,
blobs: vec![],
}
}

pub fn create_action_result_from_operation(
&mut self,
operation: &Operation,
) -> miette::Result<()> {
let mut result = ActionResult {
execution_metadata: Some(ExecutedActionMetadata {
worker: "moon".into(),
execution_start_timestamp: create_timestamp_from_naive(operation.started_at),
execution_completed_timestamp: operation
.finished_at
.and_then(create_timestamp_from_naive),
..Default::default()
}),
..Default::default()
};

if let Some(exec) = operation.get_output() {
result.exit_code = exec.exit_code.unwrap_or_default();

if let Some(stderr) = &exec.stderr {
let blob = Blob::new(stderr.as_bytes().to_owned());

result.stderr_digest = Some(blob.digest.clone());
self.blobs.push(blob);
}

if let Some(stdout) = &exec.stdout {
let blob = Blob::new(stdout.as_bytes().to_owned());

result.stdout_digest = Some(blob.digest.clone());
self.blobs.push(blob);
}
}

self.action_result = Some(result);

Ok(())
}

pub fn compute_outputs(&mut self, workspace_root: &Path) -> miette::Result<()> {
let mut outputs = OutputDigests::default();

for path in self.task.get_output_files(workspace_root, true)? {
outputs.insert_relative_path(path, workspace_root)?;
}

if let Some(result) = &mut self.action_result {
result.output_files = outputs.files;
result.output_symlinks = outputs.symlinks;
result.output_directories = outputs.dirs;
self.blobs.extend(outputs.blobs);
}

Ok(())
}

pub fn get_command_as_bytes(&self) -> miette::Result<Vec<u8>> {
bincode::serialize(&self.command).into_diagnostic()
}

pub fn prepare_for_upload(&mut self) -> Option<(ActionResult, Vec<Blob>)> {
self.action_result
.take()
.map(|result| (result, self.blobs.drain(0..).collect::<Vec<_>>()))
}
}
14 changes: 0 additions & 14 deletions crates/remote/src/fs_digest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,20 +157,6 @@ impl OutputDigests {
}
}

#[instrument]
pub fn compute_digests_for_outputs(
paths: Vec<WorkspaceRelativePathBuf>,
workspace_root: &Path,
) -> miette::Result<OutputDigests> {
let mut result = OutputDigests::default();

for path in paths {
result.insert_relative_path(path, workspace_root)?;
}

Ok(result)
}

fn apply_node_properties(path: &Path, props: &NodeProperties) -> miette::Result<()> {
if let Some(mtime) = &props.mtime {
let modified = Duration::new(mtime.seconds as u64, mtime.nanos as u32);
Expand Down
2 changes: 2 additions & 0 deletions crates/remote/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod action_state;
mod compression;
mod fs_digest;
mod grpc_remote_client;
Expand All @@ -8,6 +9,7 @@ mod remote_client;
mod remote_error;
mod remote_service;

pub use action_state::*;
pub use bazel_remote_apis::build::bazel::remote::execution::v2::Digest;
pub use fs_digest::*;
pub use remote_error::*;
Expand Down
137 changes: 34 additions & 103 deletions crates/remote/src/remote_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::compression::*;
use crate::fs_digest::*;
use crate::grpc_remote_client::GrpcRemoteClient;
// use crate::http_remote_client::HttpRemoteClient;
use crate::action_state::ActionState;
use crate::remote_client::RemoteClient;
use bazel_remote_apis::build::bazel::remote::execution::v2::{
command, digest_function, platform::Property, Action, ActionResult, Command, Digest,
Expand Down Expand Up @@ -39,6 +40,10 @@ impl RemoteService {
INSTANCE.get().cloned()
}

pub fn is_enabled() -> bool {
INSTANCE.get().is_some()
}

#[instrument]
pub async fn connect(config: &RemoteConfig, workspace_root: &Path) -> miette::Result<()> {
if is_ci() && config.is_localhost() {
Expand Down Expand Up @@ -199,22 +204,29 @@ impl RemoteService {
Ok(false)
}

#[instrument(skip(self, task))]
pub async fn save_task(&self, digest: &Digest, task: &Task) -> miette::Result<()> {
#[instrument(skip(self, state))]
pub async fn save_action(&self, state: &ActionState<'_>) -> miette::Result<()> {
if !self.cache_enabled {
return Ok(());
}

let Some(digest) = &state.action.command_digest else {
return Ok(());
};

let missing = self
.client
.find_missing_blobs(vec![digest.to_owned()])
.await?;

if missing.contains(digest) {
let _action = self.create_action(digest, task)?;
let command = self.create_command(task)?;
debug!(hash = &digest.hash, "Caching action and command");

self.client
.batch_update_blobs(
digest,
vec![Blob {
bytes: bincode::serialize(&command).unwrap(),
bytes: state.get_command_as_bytes()?,
digest: digest.to_owned(),
}],
)
Expand All @@ -224,78 +236,35 @@ impl RemoteService {
Ok(())
}

#[instrument(skip(self, operation))]
pub async fn save_operation(
&self,
digest: &Digest,
operation: &Operation,
) -> miette::Result<()> {
if !self.cache_enabled || operation.has_failed() {
#[instrument(skip(self, state))]
pub async fn save_action_result(&self, state: &mut ActionState<'_>) -> miette::Result<()> {
if !self.cache_enabled {
return Ok(());
}

let operation_label = operation.label().to_owned();

debug!(
hash = &digest.hash,
"Caching {} operation",
color::muted_light(&operation_label)
);

let result = self.create_action_result_from_operation(operation, None)?;
let digest = digest.to_owned();
let client = Arc::clone(&self.client);

self.upload_requests
.write()
.await
.push(tokio::spawn(async move {
if let Err(error) = client.update_action_result(&digest, result).await {
warn!(
hash = &digest.hash,
"Failed to cache {} operation: {}",
color::muted_light(operation_label),
color::muted_light(error.to_string()),
);
}
}));

Ok(())
}

#[instrument(skip(self, operation, outputs))]
pub async fn save_operation_with_outputs(
&self,
digest: &Digest,
operation: &Operation,
mut outputs: OutputDigests,
) -> miette::Result<()> {
if !self.cache_enabled || operation.has_failed() {
let Some((mut result, blobs)) = state.prepare_for_upload() else {
return Ok(());
}

let operation_label = operation.label().to_owned();
};

debug!(
hash = &digest.hash,
"Caching {} operation with outputs",
color::muted_light(&operation_label)
);
let Some(digest) = &state.action.command_digest else {
return Ok(());
};

let mut result = self.create_action_result_from_operation(operation, Some(&mut outputs))?;
result.output_files = outputs.files;
result.output_symlinks = outputs.symlinks;
result.output_directories = outputs.dirs;
if blobs.is_empty() {
debug!(hash = &digest.hash, "Caching action result");
} else {
debug!(hash = &digest.hash, "Caching action result with blobs");
}

let digest = digest.to_owned();
let client = Arc::clone(&self.client);
let digest = digest.to_owned();
let max_size = self.get_max_batch_size();

self.upload_requests
.write()
.await
.push(tokio::spawn(async move {
if !outputs.blobs.is_empty() {
if !blobs.is_empty() {
if let Some(metadata) = &mut result.execution_metadata {
metadata.output_upload_start_timestamp =
create_timestamp(SystemTime::now());
Expand All @@ -304,7 +273,7 @@ impl RemoteService {
let upload_result = batch_upload_blobs(
client.clone(),
digest.clone(),
outputs.blobs,
blobs,
max_size as usize,
)
.await;
Expand All @@ -322,8 +291,7 @@ impl RemoteService {
if let Err(error) = client.update_action_result(&digest, result).await {
warn!(
hash = &digest.hash,
"Failed to cache {} operation: {}",
color::muted_light(operation_label),
"Failed to cache action result: {}",
color::muted_light(error.to_string()),
);
}
Expand Down Expand Up @@ -406,43 +374,6 @@ impl RemoteService {
}
}

fn create_command(&self, task: &Task) -> miette::Result<Command> {
let mut command = Command {
arguments: vec![task.command.clone()],
..Default::default()
};

command.arguments.extend(task.args.clone());

for (name, value) in BTreeMap::from_iter(task.env.clone()) {
command
.environment_variables
.push(command::EnvironmentVariable { name, value });
}

Ok(command)
}

fn create_action(&self, digest: &Digest, task: &Task) -> miette::Result<Action> {
let mut action = Action {
command_digest: Some(digest.to_owned()),
..Default::default()
};

if let Some(os_list) = &task.options.os {
let platform = action.platform.get_or_insert_default();

for os in os_list {
platform.properties.push(Property {
name: "OSFamily".into(),
value: os.to_string(),
});
}
}

Ok(action)
}

fn create_action_result_from_operation(
&self,
operation: &Operation,
Expand Down
Loading

0 comments on commit 378d7ba

Please sign in to comment.