Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 1 addition & 37 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ crates-index-diff = { version = "28.0.0", features = [ "max-performance" ]}
reqwest = { version = "0.12", features = ["json", "gzip"] }
semver = { version = "1.0.4", features = ["serde"] }
slug = "0.1.1"
sqlx = { version = "0.8", features = [ "runtime-tokio", "postgres", "chrono" ] }
sqlx = { version = "0.8", features = [ "runtime-tokio", "postgres", "sqlite", "chrono" ] }
url = { version = "2.1.1", features = ["serde"] }
docsrs-metadata = { path = "crates/metadata" }
anyhow = { version = "1.0.42", features = ["backtrace"]}
Expand All @@ -58,7 +58,6 @@ zip = {version = "5.1.1", default-features = false, features = ["bzip2"]}
bzip2 = "0.6.0"
getrandom = "0.3.1"
itertools = { version = "0.14.0" }
rusqlite = { version = "0.32.1", features = ["bundled"] }
hex = "0.4.3"
derive_more = { version = "2.0.0", features = ["display"] }

Expand Down
221 changes: 138 additions & 83 deletions src/storage/archive_index.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::error::Result;
use crate::storage::{FileRange, compression::CompressionAlgorithm};
use anyhow::{Context as _, bail};
use rusqlite::{Connection, OpenFlags, OptionalExtension};
use itertools::Itertools as _;
use sqlx::{Acquire as _, QueryBuilder, Row as _, Sqlite};
use std::{fs, io, path::Path};
use tracing::instrument;

Expand All @@ -20,97 +21,156 @@ impl FileInfo {
}
}

/// crates a new empty SQLite database, and returns a configured connection
/// pool to connect to the DB.
/// Any existing DB at the given path will be deleted first.
async fn sqlite_create<P: AsRef<Path>>(path: P) -> Result<sqlx::SqlitePool> {
let path = path.as_ref();
if path.exists() {
fs::remove_file(path)?;
}

sqlx::SqlitePool::connect_with(
sqlx::sqlite::SqliteConnectOptions::new()
.filename(path)
.read_only(false)
.pragma("synchronous", "full")
.create_if_missing(true),
)
.await
.map_err(Into::into)
}

/// open existing SQLite database, return a configured connection poll
/// to connect to the DB.
/// Will error when the database doesn't exist at that path.
async fn sqlite_open<P: AsRef<Path>>(path: P) -> Result<sqlx::SqlitePool> {
sqlx::SqlitePool::connect_with(
sqlx::sqlite::SqliteConnectOptions::new()
.filename(path)
.read_only(true)
.pragma("synchronous", "off") // not needed for readonly db
.serialized(false) // same as OPEN_NOMUTEX
.create_if_missing(false),
)
.await
.map_err(Into::into)
}

/// create an archive index based on a zipfile.
///
/// Will delete the destination file if it already exists.
#[instrument(skip(zipfile))]
pub(crate) fn create<R: io::Read + io::Seek, P: AsRef<Path> + std::fmt::Debug>(
pub(crate) async fn create<R: io::Read + io::Seek, P: AsRef<Path> + std::fmt::Debug>(
zipfile: &mut R,
destination: P,
) -> Result<()> {
let destination = destination.as_ref();
if destination.exists() {
fs::remove_file(destination)?;
}
let pool = sqlite_create(destination).await?;
let mut conn = pool.acquire().await?;
let mut tx = conn.begin().await?;

let conn = rusqlite::Connection::open(destination)?;
conn.execute("PRAGMA synchronous = FULL", ())?;
conn.execute("BEGIN", ())?;
conn.execute(
"
sqlx::query(
r#"
CREATE TABLE files (
id INTEGER PRIMARY KEY,
path TEXT UNIQUE,
start INTEGER,
end INTEGER,
compression INTEGER
);
",
(),
)?;
"#,
)
.execute(&mut *tx)
.await?;

let mut archive = zip::ZipArchive::new(zipfile)?;
let compression_bzip = CompressionAlgorithm::Bzip2 as i32;

for i in 0..archive.len() {
let zf = archive.by_index(i)?;

conn.execute(
"INSERT INTO files (path, start, end, compression) VALUES (?, ?, ?, ?)",
(
zf.name(),
zf.data_start(),
zf.data_start() + zf.compressed_size() - 1,
match zf.compression() {
zip::CompressionMethod::Bzip2 => compression_bzip,
c => bail!("unsupported compression algorithm {} in zip-file", c),
},
),
)?;
const CHUNKS: usize = 1000;
for chunk in &(0..archive.len()).chunks(CHUNKS) {
for i in chunk {
let mut insert_stmt =
QueryBuilder::<Sqlite>::new("INSERT INTO files (path, start, end, compression) ");

let entry = archive.by_index(i)?;

let start = entry.data_start() as i64;
let end = (entry.data_start() + entry.compressed_size() - 1) as i64;
let compression_raw = match entry.compression() {
zip::CompressionMethod::Bzip2 => compression_bzip,
c => bail!("unsupported compression algorithm {} in zip-file", c),
};

insert_stmt.push_values([()], |mut b, _| {
b.push_bind(entry.name())
.push_bind(start)
.push_bind(end)
.push_bind(compression_raw);
});
insert_stmt
.build()
.persistent(false)
.execute(&mut *tx)
.await?;
}
}
conn.execute("CREATE INDEX idx_files_path ON files (path);", ())?;
conn.execute("END", ())?;
conn.execute("VACUUM", ())?;

sqlx::query("CREATE INDEX idx_files_path ON files (path);")
.execute(&mut *tx)
.await?;

// Commit the transaction before VACUUM (VACUUM cannot run inside a transaction)
tx.commit().await?;

// VACUUM outside the transaction
sqlx::query("VACUUM").execute(&mut *conn).await?;

Ok(())
}

fn find_in_sqlite_index(conn: &Connection, search_for: &str) -> Result<Option<FileInfo>> {
let mut stmt = conn.prepare(
async fn find_in_sqlite_index<'e, E>(executor: E, search_for: &str) -> Result<Option<FileInfo>>
where
E: sqlx::Executor<'e, Database = sqlx::Sqlite>,
{
let row = sqlx::query(
"
SELECT start, end, compression
FROM files
WHERE path = ?
",
)?;

stmt.query_row((search_for,), |row| {
let compression: i32 = row.get(2)?;

Ok(FileInfo {
range: row.get(0)?..=row.get(1)?,
compression: compression.try_into().map_err(|value| {
rusqlite::Error::FromSqlConversionFailure(
2,
rusqlite::types::Type::Integer,
format!("invalid compression algorithm '{value}' in database").into(),
)
)
.bind(search_for)
.fetch_optional(executor)
.await
.context("error fetching SQLite data")?;

if let Some(row) = row {
let start: u64 = row.try_get(0)?;
let end: u64 = row.try_get(1)?;
let compression_raw: i32 = row.try_get(2)?;

Ok(Some(FileInfo {
range: start..=end,
compression: compression_raw.try_into().map_err(|value| {
anyhow::anyhow!(format!(
"invalid compression algorithm '{value}' in database"
))
})?,
})
})
.optional()
.context("error fetching SQLite data")
}))
} else {
Ok(None)
}
}

#[instrument]
pub(crate) fn find_in_file<P: AsRef<Path> + std::fmt::Debug>(
pub(crate) async fn find_in_file<P: AsRef<Path> + std::fmt::Debug>(
archive_index_path: P,
search_for: &str,
) -> Result<Option<FileInfo>> {
let connection = Connection::open_with_flags(
archive_index_path,
OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
)?;
find_in_sqlite_index(&connection, search_for)
let pool = sqlite_open(archive_index_path).await?;
let mut conn = pool.acquire().await?;

find_in_sqlite_index(&mut *conn, search_for).await
}

#[cfg(test)]
Expand Down Expand Up @@ -138,43 +198,38 @@ mod tests {
tf
}

#[test]
fn index_create_save_load_sqlite() {
#[tokio::test]
async fn index_create_save_load_sqlite() -> Result<()> {
let mut tf = create_test_archive(1);

let tempfile = tempfile::NamedTempFile::new().unwrap().into_temp_path();
create(&mut tf, &tempfile).unwrap();
create(&mut tf, &tempfile).await?;

let fi = find_in_file(&tempfile, "testfile0").unwrap().unwrap();
let fi = find_in_file(&tempfile, "testfile0").await?.unwrap();

assert_eq!(fi.range, FileRange::new(39, 459));
assert_eq!(fi.compression, CompressionAlgorithm::Bzip2);

assert!(
find_in_file(&tempfile, "some_other_file",)
.unwrap()
.is_none()
);
assert!(find_in_file(&tempfile, "some_other_file",).await?.is_none());
Ok(())
}

#[test]
fn archive_with_more_than_65k_files() {
#[tokio::test]
async fn archive_with_more_than_65k_files() -> Result<()> {
let mut tf = create_test_archive(100_000);

let tempfile = tempfile::NamedTempFile::new().unwrap().into_temp_path();
create(&mut tf, &tempfile).unwrap();

let connection = Connection::open_with_flags(
tempfile,
OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
)
.unwrap();
let mut stmt = connection.prepare("SELECT count(*) FROM files").unwrap();

let count = stmt
.query_row([], |row| Ok(row.get::<_, usize>(0)))
.unwrap()
.unwrap();
assert_eq!(count, 100_000);
let tempfile = tempfile::NamedTempFile::new()?.into_temp_path();
create(&mut tf, &tempfile).await?;

let pool = sqlite_open(&tempfile).await?;
let mut conn = pool.acquire().await?;

let row = sqlx::query("SELECT count(*) FROM files")
.fetch_one(&mut *conn)
.await?;

assert_eq!(row.get::<i64, _>(0), 100_000);

Ok(())
}
}
Loading
Loading