Skip to content

Combine FsRepository and OpenFsRepository #1156

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/spfs-cli/main/src/cmd_init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ impl InitSubcommand {
pub async fn run(&self, _config: &spfs::Config) -> Result<i32> {
match self {
Self::Repo { path } => {
spfs::storage::fs::FsRepository::create(&path).await?;
spfs::storage::fs::MaybeOpenFsRepository::create(&path).await?;
Ok(0)
}
}
Expand Down
3 changes: 2 additions & 1 deletion crates/spfs-vfs/src/fuse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use fuser::{
};
use spfs::OsError;
use spfs::prelude::*;
use spfs::storage::LocalRepository;
#[cfg(feature = "fuse-backend-abi-7-31")]
use spfs::tracking::BlobRead;
use spfs::tracking::{Entry, EntryKind, EnvSpec, Manifest};
Expand Down Expand Up @@ -381,7 +382,7 @@ impl Filesystem {
reply.error(libc::ENOENT);
return;
};
let payload_path = fs_repo.payloads.build_digest_path(digest);
let payload_path = fs_repo.payloads().build_digest_path(digest);
match std::fs::OpenOptions::new().read(true).open(payload_path) {
Ok(file) => {
handle = Some(Handle::BlobFile { entry, file });
Expand Down
3 changes: 2 additions & 1 deletion crates/spfs-vfs/src/winfsp/mount.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use dashmap::DashMap;
use libc::c_void;
use spfs::OsError;
use spfs::prelude::*;
use spfs::storage::LocalRepository;
use spfs::tracking::{Entry, EntryKind};
use tokio::io::AsyncReadExt;
use windows::Win32::Foundation::{ERROR_SEEK_ON_DEVICE, STATUS_NOT_A_DIRECTORY};
Expand Down Expand Up @@ -287,7 +288,7 @@ impl winfsp::filesystem::FileSystemContext for Mount {
send.send(Err(winfsp::FspError::IO(std::io::ErrorKind::NotFound)));
return;
};
let payload_path = fs_repo.payloads.build_digest_path(&digest);
let payload_path = fs_repo.payloads().build_digest_path(&digest);
match std::fs::OpenOptions::new().read(true).open(payload_path) {
Ok(file) => {
let _ = send.send(Ok(Some(Handle::BlobFile { entry, file })));
Expand Down
2 changes: 1 addition & 1 deletion crates/spfs/benches/spfs_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub fn commit_benchmark(c: &mut Criterion) {
.expect("create a temp directory for spfs repo");
let repo: Arc<RepositoryHandle> = Arc::new(
tokio_runtime
.block_on(spfs::storage::fs::FsRepository::create(
.block_on(spfs::storage::fs::MaybeOpenFsRepository::create(
repo_path.path().join("repo"),
))
.expect("create spfs repo")
Expand Down
4 changes: 2 additions & 2 deletions crates/spfs/src/bootstrap_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ async fn test_shell_initialization_startup_scripts(
};
let root = tmpdir.path().to_string_lossy().to_string();
let repo = crate::storage::RepositoryHandle::from(
crate::storage::fs::FsRepository::create(&root)
crate::storage::fs::MaybeOpenFsRepository::create(&root)
.await
.unwrap(),
);
Expand Down Expand Up @@ -118,7 +118,7 @@ async fn test_shell_initialization_no_startup_scripts(
};
let root = tmpdir.path().to_string_lossy().to_string();
let repo = crate::storage::RepositoryHandle::from(
crate::storage::fs::FsRepository::create(&root)
crate::storage::fs::MaybeOpenFsRepository::create(&root)
.await
.unwrap(),
);
Expand Down
7 changes: 4 additions & 3 deletions crates/spfs/src/clean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use super::prune::PruneParameters;
use crate::io::Pluralize;
use crate::prelude::*;
use crate::runtime::makedirs_with_perms;
use crate::storage::fs::OpenFsRepository;
use crate::storage::fs::FsRepositoryOps;
use crate::storage::{TagNamespace, TagNamespaceBuf};
use crate::{Digest, Error, Result, encoding, graph, storage, tracking};

Expand Down Expand Up @@ -705,7 +705,7 @@ where
async fn remove_unvisited_renders_and_proxies_for_storage(
&self,
username: Option<String>,
repo: &storage::fs::OpenFsRepository,
repo: impl FsRepositoryOps,
) -> Result<CleanResult> {
let mut result = CleanResult::default();
let mut stream = repo
Expand Down Expand Up @@ -811,7 +811,8 @@ where
let future = async move {
if !self.dry_run {
tracing::trace!(?path, "removing proxy render");
OpenFsRepository::remove_dir_atomically(&path, &workdir).await?;
storage::fs::OpenFsRepository::remove_dir_atomically(&path, &workdir)
.await?;
}
Ok(digest)
};
Expand Down
4 changes: 2 additions & 2 deletions crates/spfs/src/clean_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ async fn test_clean_untagged_objects_layers_platforms(#[future] tmprepo: TempRep
async fn test_clean_manifest_renders(tmpdir: tempfile::TempDir) {
init_logging();
let tmprepo = Arc::new(
storage::fs::FsRepository::create(tmpdir.path())
storage::fs::MaybeOpenFsRepository::create(tmpdir.path())
.await
.unwrap()
.into(),
Expand Down Expand Up @@ -482,7 +482,7 @@ async fn test_clean_manifest_renders(tmpdir: tempfile::TempDir) {
};
let fs_repo = fs_repo.opened().await.unwrap();

storage::fs::Renderer::new(&*fs_repo)
storage::fs::Renderer::new(&fs_repo)
.render_manifest(&manifest.to_graph_manifest(), None)
.await
.unwrap();
Expand Down
4 changes: 2 additions & 2 deletions crates/spfs/src/commit_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ use crate::fixtures::*;
async fn test_commit_empty(tmpdir: tempfile::TempDir) {
let root = tmpdir.path().to_string_lossy().to_string();
let repo = crate::storage::RepositoryHandle::from(
crate::storage::fs::FsRepository::create(&root)
crate::storage::fs::MaybeOpenFsRepository::create(&root)
.await
.unwrap(),
);
let storage = crate::runtime::Storage::new(repo).unwrap();
let repo = crate::storage::RepositoryHandle::from(
crate::storage::fs::FsRepository::create(root)
crate::storage::fs::MaybeOpenFsRepository::create(root)
.await
.unwrap(),
);
Expand Down
13 changes: 7 additions & 6 deletions crates/spfs/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,9 +311,9 @@ impl RemoteConfig {
inner,
} = self;
let mut handle: storage::RepositoryHandle = match inner.clone() {
RepositoryConfig::Fs(config) => {
storage::fs::FsRepository::from_config(config).await?.into()
}
RepositoryConfig::Fs(config) => storage::fs::MaybeOpenFsRepository::from_config(config)
.await?
.into(),
RepositoryConfig::Tar(config) => storage::tar::TarRepository::from_config(config)
.await?
.into(),
Expand Down Expand Up @@ -574,7 +574,8 @@ impl Config {
source,
})?;

local_repo.set_tag_namespace(self.storage.tag_namespace.clone());
Arc::make_mut(&mut local_repo.fs_impl)
.set_tag_namespace(self.storage.tag_namespace.clone());

Ok(local_repo)
}
Expand All @@ -583,8 +584,8 @@ impl Config {
///
/// The returned repo is guaranteed to be created, valid and open already. Ie
/// the local repository is not allowed to be lazily opened.
pub async fn get_local_repository(&self) -> Result<storage::fs::FsRepository> {
self.get_opened_local_repository().await.map(Into::into)
pub async fn get_local_repository(&self) -> Result<storage::fs::OpenFsRepository> {
self.get_opened_local_repository().await
}

/// Get the local repository handle as configured, creating it if needed.
Expand Down
2 changes: 1 addition & 1 deletion crates/spfs/src/config_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ async fn test_config_get_remote() {
.tempdir()
.unwrap();
let remote = tmpdir.path().join("remote");
let _ = crate::storage::fs::FsRepository::create(&remote)
let _ = crate::storage::fs::MaybeOpenFsRepository::create(&remote)
.await
.unwrap();

Expand Down
2 changes: 1 addition & 1 deletion crates/spfs/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ impl Error {

/// Create an [`Error::FailedToOpenRepository`] instance for
/// a repository using its address and root cause.
pub fn failed_to_open_repository<R: storage::Repository>(
pub fn failed_to_open_repository<R: storage::Address>(
repo: &R,
source: storage::OpenRepositoryError,
) -> Self {
Expand Down
25 changes: 16 additions & 9 deletions crates/spfs/src/fixtures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,20 @@ impl TempRepo {
{
match self {
TempRepo::FS(_, tempdir) => {
let mut repo = spfs::storage::fs::FsRepository::open(tempdir.path().join("repo"))
.await
.unwrap();
repo.set_tag_namespace(Some(
spfs::storage::TagNamespaceBuf::new(namespace.as_ref())
.expect("tag namespaces used in tests must be valid"),
));
let repo = spfs::storage::fs::MaybeOpenFsRepository {
fs_impl: {
let mut fs_impl = spfs::storage::fs::MaybeOpenFsRepositoryImpl::open(
tempdir.path().join("repo"),
)
.await
.unwrap();
fs_impl.set_tag_namespace(Some(
spfs::storage::TagNamespaceBuf::new(namespace.as_ref())
.expect("tag namespaces used in tests must be valid"),
));
fs_impl.into()
},
};
TempRepo::FS(Arc::new(repo.into()), Arc::clone(tempdir))
}
_ => panic!("only TempRepo::FS type supports setting tag namespaces"),
Expand Down Expand Up @@ -131,7 +138,7 @@ pub async fn tmprepo(kind: &str) -> TempRepo {
let tmpdir = tmpdir();
match kind {
"fs" => {
let repo = spfs::storage::fs::FsRepository::create(tmpdir.path().join("repo"))
let repo = spfs::storage::fs::MaybeOpenFsRepository::create(tmpdir.path().join("repo"))
.await
.unwrap()
.into();
Expand All @@ -148,7 +155,7 @@ pub async fn tmprepo(kind: &str) -> TempRepo {
"rpc" => {
use crate::storage::prelude::*;
let repo = std::sync::Arc::new(spfs::storage::RepositoryHandle::FS(
spfs::storage::fs::FsRepository::create(tmpdir.path().join("repo"))
spfs::storage::fs::MaybeOpenFsRepository::create(tmpdir.path().join("repo"))
.await
.unwrap(),
));
Expand Down
20 changes: 10 additions & 10 deletions crates/spfs/src/runtime/storage_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ fn test_config_serialization() {
async fn test_storage_create_runtime(tmpdir: tempfile::TempDir) {
let root = tmpdir.path().to_string_lossy().to_string();
let repo = crate::storage::RepositoryHandle::from(
crate::storage::fs::FsRepository::create(root)
crate::storage::fs::MaybeOpenFsRepository::create(root)
.await
.unwrap(),
);
Expand Down Expand Up @@ -75,7 +75,7 @@ async fn test_storage_runtime_with_annotation(

let root = tmpdir.path().to_string_lossy().to_string();
let repo = crate::storage::RepositoryHandle::from(
crate::storage::fs::FsRepository::create(root)
crate::storage::fs::MaybeOpenFsRepository::create(root)
.await
.unwrap(),
);
Expand Down Expand Up @@ -141,7 +141,7 @@ async fn test_storage_runtime_add_annotations_list(

let root = tmpdir.path().to_string_lossy().to_string();
let repo = crate::storage::RepositoryHandle::from(
crate::storage::fs::FsRepository::create(root)
crate::storage::fs::MaybeOpenFsRepository::create(root)
.await
.unwrap(),
);
Expand Down Expand Up @@ -214,7 +214,7 @@ async fn test_storage_runtime_with_nested_annotation(
// Setup the objects needed for the runtime used in the test
let root = tmpdir.path().to_string_lossy().to_string();
let repo = crate::storage::RepositoryHandle::from(
crate::storage::fs::FsRepository::create(root)
crate::storage::fs::MaybeOpenFsRepository::create(root)
.await
.unwrap(),
);
Expand Down Expand Up @@ -283,7 +283,7 @@ async fn test_storage_runtime_with_annotation_all(

let root = tmpdir.path().to_string_lossy().to_string();
let repo = crate::storage::RepositoryHandle::from(
crate::storage::fs::FsRepository::create(root)
crate::storage::fs::MaybeOpenFsRepository::create(root)
.await
.unwrap(),
);
Expand Down Expand Up @@ -357,7 +357,7 @@ async fn test_storage_runtime_with_nested_annotation_all(
// setup the objects needed for the runtime used in the test
let root = tmpdir.path().to_string_lossy().to_string();
let repo = crate::storage::RepositoryHandle::from(
crate::storage::fs::FsRepository::create(root)
crate::storage::fs::MaybeOpenFsRepository::create(root)
.await
.unwrap(),
);
Expand Down Expand Up @@ -433,7 +433,7 @@ async fn test_storage_runtime_with_nested_annotation_all(
async fn test_storage_remove_runtime(tmpdir: tempfile::TempDir) {
let root = tmpdir.path().to_string_lossy().to_string();
let repo = crate::storage::RepositoryHandle::from(
crate::storage::fs::FsRepository::create(root)
crate::storage::fs::MaybeOpenFsRepository::create(root)
.await
.unwrap(),
);
Expand All @@ -454,7 +454,7 @@ async fn test_storage_remove_runtime(tmpdir: tempfile::TempDir) {
async fn test_storage_iter_runtimes(tmpdir: tempfile::TempDir) {
let root = tmpdir.path().to_string_lossy().to_string();
let repo = crate::storage::RepositoryHandle::from(
crate::storage::fs::FsRepository::create(root)
crate::storage::fs::MaybeOpenFsRepository::create(root)
.await
.unwrap(),
);
Expand Down Expand Up @@ -506,7 +506,7 @@ async fn test_storage_iter_runtimes(tmpdir: tempfile::TempDir) {
async fn test_runtime_reset(tmpdir: tempfile::TempDir) {
let root = tmpdir.path().to_string_lossy().to_string();
let repo = crate::storage::RepositoryHandle::from(
crate::storage::fs::FsRepository::create(root)
crate::storage::fs::MaybeOpenFsRepository::create(root)
.await
.unwrap(),
);
Expand Down Expand Up @@ -553,7 +553,7 @@ async fn test_runtime_reset(tmpdir: tempfile::TempDir) {
async fn test_runtime_ensure_extra_bind_mount_locations_exist(tmpdir: tempfile::TempDir) {
let root = tmpdir.path().to_string_lossy().to_string();
let repo = crate::storage::RepositoryHandle::from(
crate::storage::fs::FsRepository::create(root)
crate::storage::fs::MaybeOpenFsRepository::create(root)
.await
.unwrap(),
);
Expand Down
3 changes: 2 additions & 1 deletion crates/spfs/src/storage/fallback/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,8 @@ impl TagStorageMut for FallbackProxy {
&mut self,
tag_namespace: Option<TagNamespaceBuf>,
) -> Result<Option<TagNamespaceBuf>> {
Ok(Arc::make_mut(&mut self.primary).set_tag_namespace(tag_namespace))
Ok(Arc::make_mut(&mut Arc::make_mut(&mut self.primary).fs_impl)
.set_tag_namespace(tag_namespace))
}
}

Expand Down
6 changes: 3 additions & 3 deletions crates/spfs/src/storage/fs/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::graph::{DatabaseView, Object, ObjectProto};
use crate::{Error, Result, encoding, graph};

#[async_trait::async_trait]
impl DatabaseView for super::FsRepository {
impl DatabaseView for super::MaybeOpenFsRepository {
async fn has_object(&self, digest: encoding::Digest) -> bool {
let Ok(opened) = self.opened().await else {
return false;
Expand Down Expand Up @@ -55,7 +55,7 @@ impl DatabaseView for super::FsRepository {
}

#[async_trait::async_trait]
impl graph::Database for super::FsRepository {
impl graph::Database for super::MaybeOpenFsRepository {
async fn remove_object(&self, digest: encoding::Digest) -> crate::Result<()> {
self.opened().await?.remove_object(digest).await
}
Expand All @@ -73,7 +73,7 @@ impl graph::Database for super::FsRepository {
}

#[async_trait::async_trait]
impl graph::DatabaseExt for super::FsRepository {
impl graph::DatabaseExt for super::MaybeOpenFsRepository {
async fn write_object<T: ObjectProto>(&self, obj: &graph::FlatObject<T>) -> Result<()> {
self.opened().await?.write_object(obj).await
}
Expand Down
5 changes: 4 additions & 1 deletion crates/spfs/src/storage/fs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,13 @@ pub use renderer::{
RenderType,
Renderer,
};
#[cfg(test)]
pub use repository::MaybeOpenFsRepositoryImpl;
pub use repository::{
Config,
DURABLE_EDITS_DIR,
FsRepository,
FsRepositoryOps,
MaybeOpenFsRepository,
OpenFsRepository,
Params,
RenderStore,
Expand Down
4 changes: 2 additions & 2 deletions crates/spfs/src/storage/fs/payloads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ use std::pin::Pin;
use futures::future::ready;
use futures::{Stream, StreamExt, TryFutureExt};

use super::{FsRepository, OpenFsRepository};
use super::{MaybeOpenFsRepository, OpenFsRepository};
use crate::storage::prelude::*;
use crate::tracking::BlobRead;
use crate::{Error, Result, encoding, graph};

#[async_trait::async_trait]
impl crate::storage::PayloadStorage for FsRepository {
impl crate::storage::PayloadStorage for MaybeOpenFsRepository {
async fn has_payload(&self, digest: encoding::Digest) -> bool {
let Ok(opened) = self.opened().await else {
return false;
Expand Down
Loading
Loading