Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor FsRepository's renders #1159

Draft
wants to merge 4 commits into
base: no-hard-links-for-dir-renders
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions crates/spfs-cli/cmd-render/src/cmd_render.rs
Original file line number Diff line number Diff line change
@@ -4,9 +4,10 @@

use clap::builder::TypedValueParser;
use clap::Parser;
use miette::{Context, Result};
use miette::{Context, IntoDiagnostic, Result};
use spfs::prelude::*;
use spfs::storage::fallback::FallbackProxy;
use spfs::storage::fs::{MaybeRenderStore, RenderStore};
use spfs::{graph, Error, RenderResult};
use spfs_cli_common as cli;
use spfs_cli_common::CommandName;
@@ -74,7 +75,11 @@ impl CmdRender {

let rendered = match &self.target {
Some(target) => self.render_to_dir(fallback, env_spec, target).await?,
None => self.render_to_repo(fallback, env_spec).await?,
None => {
// This path requires a repository that supports renders.
let fallback: FallbackProxy<RenderStore> = fallback.try_into().into_diagnostic()?;
self.render_to_repo(fallback, env_spec).await?
}
};

tracing::debug!("render(s) completed successfully");
@@ -85,7 +90,7 @@ impl CmdRender {

async fn render_to_dir(
&self,
repo: FallbackProxy,
repo: FallbackProxy<MaybeRenderStore>,
env_spec: spfs::tracking::EnvSpec,
target: &std::path::Path,
) -> Result<RenderResult> {
@@ -134,7 +139,7 @@ impl CmdRender {

async fn render_to_repo(
&self,
repo: FallbackProxy,
repo: FallbackProxy<RenderStore>,
env_spec: spfs::tracking::EnvSpec,
) -> Result<RenderResult> {
let mut stack = graph::Stack::default();
4 changes: 2 additions & 2 deletions crates/spfs-cli/common/src/args.rs
Original file line number Diff line number Diff line change
@@ -11,7 +11,7 @@ use miette::{Error, IntoDiagnostic, Result, WrapErr};
#[cfg(feature = "sentry")]
use once_cell::sync::OnceCell;
use spfs::io::Pluralize;
use spfs::storage::LocalRepository;
use spfs::storage::LocalPayloads;
use tracing_subscriber::prelude::*;

const SPFS_LOG: &str = "SPFS_LOG";
@@ -138,7 +138,7 @@ impl Render {
reporter: Reporter,
) -> spfs::storage::fs::Renderer<'repo, Repo, Reporter>
where
Repo: spfs::storage::Repository + LocalRepository,
Repo: spfs::storage::Repository + LocalPayloads,
Reporter: spfs::storage::fs::RenderReporter,
{
spfs::storage::fs::Renderer::new(repo)
3 changes: 2 additions & 1 deletion crates/spfs-cli/main/src/cmd_init.rs
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@ use std::path::PathBuf;

use clap::{Args, Subcommand};
use miette::Result;
use spfs::storage::fs::NoRenderStore;

/// Create an empty filesystem repository
#[derive(Debug, Args)]
@@ -36,7 +37,7 @@ impl InitSubcommand {
pub async fn run(&self, _config: &spfs::Config) -> Result<i32> {
match self {
Self::Repo { path } => {
spfs::storage::fs::MaybeOpenFsRepository::create(&path).await?;
spfs::storage::fs::MaybeOpenFsRepository::<NoRenderStore>::create(&path).await?;
Ok(0)
}
}
6 changes: 5 additions & 1 deletion crates/spfs-cli/main/src/cmd_search.rs
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@
use clap::Args;
use miette::Result;
use spfs::prelude::*;
use spfs::storage::fs::NoRenderStore;
use tokio_stream::StreamExt;

/// Search for available tags by substring
@@ -29,7 +30,10 @@ impl CmdSearch {
};
repos.push(remote);
}
repos.insert(0, config.get_local_repository().await?.into());
repos.insert(
0,
config.get_local_repository::<NoRenderStore>().await?.into(),
);
for repo in repos.into_iter() {
let mut tag_streams = repo.iter_tags();
while let Some(tag) = tag_streams.next().await {
23 changes: 19 additions & 4 deletions crates/spfs-vfs/src/fuse.rs
Original file line number Diff line number Diff line change
@@ -29,7 +29,7 @@ use fuser::{
Request,
};
use spfs::prelude::*;
use spfs::storage::LocalRepository;
use spfs::storage::LocalPayloads;
#[cfg(feature = "fuse-backend-abi-7-31")]
use spfs::tracking::BlobRead;
use spfs::tracking::{Entry, EntryKind, EnvSpec, Manifest};
@@ -376,9 +376,13 @@ impl Filesystem {
#[allow(unused_mut)]
let mut flags = FOPEN_KEEP_CACHE;
for repo in self.repos.iter() {
match &**repo {
spfs::storage::RepositoryHandle::FS(fs_repo) => {
let Ok(fs_repo) = fs_repo.opened().await else {
// XXX: Using a macro here for an easy fix but it would be nicer
// if there was a way to borrow the RepositoryHandle as a
// `&MaybeOpenFsRepository<NoRenderStore>` since this code
// doesn't need to access renders.
macro_rules! read_fs {
($fs_repo:ident) => {
let Ok(fs_repo) = $fs_repo.opened().await else {
reply.error(libc::ENOENT);
return;
};
@@ -393,6 +397,17 @@ impl Filesystem {
}
Err(err) => err!(reply, err),
}
};
}
match &**repo {
spfs::storage::RepositoryHandle::FSWithMaybeRenders(fs_repo) => {
read_fs!(fs_repo);
}
spfs::storage::RepositoryHandle::FSWithRenders(fs_repo) => {
read_fs!(fs_repo);
}
spfs::storage::RepositoryHandle::FSWithoutRenders(fs_repo) => {
read_fs!(fs_repo);
}
#[cfg(feature = "fuse-backend-abi-7-31")]
repo => match repo.open_payload(*digest).await {
23 changes: 19 additions & 4 deletions crates/spfs-vfs/src/winfsp/mount.rs
Original file line number Diff line number Diff line change
@@ -9,7 +9,7 @@ use std::sync::Arc;
use dashmap::DashMap;
use libc::c_void;
use spfs::prelude::*;
use spfs::storage::LocalRepository;
use spfs::storage::LocalPayloads;
use spfs::tracking::{Entry, EntryKind};
use spfs::OsError;
use tokio::io::AsyncReadExt;
@@ -281,9 +281,13 @@ impl winfsp::filesystem::FileSystemContext for Mount {
let digest = entry.object;
self.rt.spawn(async move {
for repo in repos.into_iter() {
match &*repo {
spfs::storage::RepositoryHandle::FS(fs_repo) => {
let Ok(fs_repo) = fs_repo.opened().await else {
// XXX: Using a macro here for an easy fix but it would be nicer
// if there was a way to borrow the RepositoryHandle as a
// `&MaybeOpenFsRepository<NoRenderStore>` since this code
// doesn't need to access renders.
macro_rules! read_fs {
($fs_repo:ident) => {
let Ok(fs_repo) = $fs_repo.opened().await else {
let _ =
send.send(Err(winfsp::FspError::IO(std::io::ErrorKind::NotFound)));
return;
@@ -299,6 +303,17 @@ impl winfsp::filesystem::FileSystemContext for Mount {
}
Err(err) => err!(send, err),
}
};
}
match &*repo {
spfs::storage::RepositoryHandle::FSWithMaybeRenders(fs_repo) => {
read_fs!(fs_repo);
}
spfs::storage::RepositoryHandle::FSWithRenders(fs_repo) => {
read_fs!(fs_repo);
}
spfs::storage::RepositoryHandle::FSWithoutRenders(fs_repo) => {
read_fs!(fs_repo);
}
repo => match repo.open_payload(digest).await {
Ok((stream, _)) => {
9 changes: 6 additions & 3 deletions crates/spfs/benches/spfs_bench.rs
Original file line number Diff line number Diff line change
@@ -9,6 +9,7 @@ use std::time::Duration;

use criterion::{criterion_group, criterion_main, Criterion, Throughput};
use spfs::prelude::*;
use spfs::storage::fs::NoRenderStore;

pub fn commit_benchmark(c: &mut Criterion) {
const NUM_FILES: usize = 1024;
@@ -44,9 +45,11 @@ pub fn commit_benchmark(c: &mut Criterion) {
.expect("create a temp directory for spfs repo");
let repo: Arc<RepositoryHandle> = Arc::new(
tokio_runtime
.block_on(spfs::storage::fs::MaybeOpenFsRepository::create(
repo_path.path().join("repo"),
))
.block_on(
spfs::storage::fs::MaybeOpenFsRepository::<NoRenderStore>::create(
repo_path.path().join("repo"),
),
)
.expect("create spfs repo")
.into(),
);
5 changes: 3 additions & 2 deletions crates/spfs/src/bootstrap_test.rs
Original file line number Diff line number Diff line change
@@ -11,6 +11,7 @@ use super::build_shell_initialized_command;
use crate::fixtures::*;
use crate::resolve::which;
use crate::runtime;
use crate::storage::fs::RenderStore;

#[rstest(
shell,
@@ -37,7 +38,7 @@ async fn test_shell_initialization_startup_scripts(
};
let root = tmpdir.path().to_string_lossy().to_string();
let repo = crate::storage::RepositoryHandle::from(
crate::storage::fs::MaybeOpenFsRepository::create(&root)
crate::storage::fs::MaybeOpenFsRepository::<RenderStore>::create(&root)
.await
.unwrap(),
);
@@ -113,7 +114,7 @@ async fn test_shell_initialization_no_startup_scripts(shell: &str, tmpdir: tempf
};
let root = tmpdir.path().to_string_lossy().to_string();
let repo = crate::storage::RepositoryHandle::from(
crate::storage::fs::MaybeOpenFsRepository::create(&root)
crate::storage::fs::MaybeOpenFsRepository::<RenderStore>::create(&root)
.await
.unwrap(),
);
66 changes: 52 additions & 14 deletions crates/spfs/src/clean.rs
Original file line number Diff line number Diff line change
@@ -7,6 +7,7 @@ use std::fmt::Write;
use std::future::ready;
#[cfg(unix)]
use std::os::linux::fs::MetadataExt;
use std::path::Path;

use chrono::{DateTime, Duration, Local, Utc};
use colored::Colorize;
@@ -18,7 +19,7 @@ use progress_bar_derive_macro::ProgressBar;
use super::prune::PruneParameters;
use crate::prelude::*;
use crate::runtime::makedirs_with_perms;
use crate::storage::fs::FsRepositoryOps;
use crate::storage::fs::{FsRepositoryOps, MaybeOpenFsRepository};
use crate::{encoding, graph, storage, tracking, Error, Result};

#[cfg(test)]
@@ -568,11 +569,19 @@ where
/// This function should only be called once the discovery of all attached
/// objects has completed successfully and with no errors. Otherwise, it may
/// remove data that is still being used
async unsafe fn remove_unvisited_renders_and_proxies(&self) -> Result<CleanResult> {
async unsafe fn remove_unvisited_renders_and_proxies_on_repo<RS>(
&self,
repo: &MaybeOpenFsRepository<RS>,
) -> Result<CleanResult>
where
RS: storage::DefaultRenderStoreCreationPolicy
+ storage::RenderStoreForUser<RenderStore = RS>
+ storage::TryRenderStore
+ Send
+ Sync
+ 'static,
{
let mut result = CleanResult::default();
let storage::RepositoryHandle::FS(repo) = self.repo else {
return Ok(result);
};
let repo = repo.opened().await?;

result += match self
@@ -593,11 +602,16 @@ where
// therefore failing the whole clean attempt before any work is
// performed. The missing proxy directory is likely a symptom of
// some other problem elsewhere.
if !sub_repo.has_renders() {
#[cfg(feature = "sentry")]
tracing::error!(target: "sentry", %username, "Skipping clean of user's renders (NoRenderStorage)");
continue;
}
//
// TODO: simulate this scenario in a test and make it not error if
// the proxy directory is missing; still want to clean any renders
// belonging to the user.
//
//if !sub_repo.has_renders() {
// #[cfg(feature = "sentry")]
// tracing::error!(target: "sentry", %username, "Skipping clean of user's renders (NoRenderStorage)");
// continue;
//}

result += self
.remove_unvisited_renders_and_proxies_for_storage(Some(username.clone()), sub_repo)
@@ -606,6 +620,28 @@ where
Ok(result)
}

/// # Safety
/// This function should only be called once the discovery of all attached
/// objects has completed successfully and with no errors. Otherwise, it may
/// remove data that is still being used
async unsafe fn remove_unvisited_renders_and_proxies(&self) -> Result<CleanResult> {
match self.repo {
storage::RepositoryHandle::FSWithMaybeRenders(repo) => unsafe {
// Convert this repo into one that will not create renders
// on demand. We only want to clean existing renders.
let repo = repo.clone().without_render_creation();

self.remove_unvisited_renders_and_proxies_on_repo(&repo)
.await
},
storage::RepositoryHandle::FSWithRenders(repo) => unsafe {
self.remove_unvisited_renders_and_proxies_on_repo(repo)
.await
},
_ => Ok(CleanResult::default()),
}
}

async fn remove_unvisited_renders_and_proxies_for_storage(
&self,
username: Option<String>,
@@ -648,7 +684,7 @@ where
drop(stream);

if let Some(proxy_path) = repo.proxy_path() {
result += self.clean_proxies(username, proxy_path.to_owned()).await?;
result += self.clean_proxies(username, &proxy_path).await?;
}
Ok(result)
}
@@ -658,7 +694,7 @@ where
async fn clean_proxies(
&self,
username: Option<String>,
proxy_path: std::path::PathBuf,
proxy_path: &Path,
) -> Result<CleanResult> {
let mut result = CleanResult::default();
let removed = result.removed_proxies.entry(username).or_default();
@@ -715,8 +751,10 @@ where
let future = async move {
if !self.dry_run {
tracing::trace!(?path, "removing proxy render");
storage::fs::OpenFsRepository::remove_dir_atomically(&path, &workdir)
.await?;
storage::fs::OpenFsRepository::<storage::fs::RenderStore>::remove_dir_atomically(
&path, &workdir,
)
.await?;
}
Ok(digest)
};
Loading
Loading