Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 119 additions & 0 deletions src/bin/nix-index-sort.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
use std::{
fs::File,
io::{BufRead, BufReader, BufWriter, Read, Write},
path::PathBuf,
};

use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use clap::Parser;
use nix_index::database::{FILE_MAGIC, FORMAT_VERSION};

#[derive(Debug, PartialEq, PartialOrd, Eq, Ord)]
struct Package {
meta: Vec<u8>,

paths: Vec<Vec<u8>>,
}

struct PackageStream<R> {
reader: R,

buffer: Vec<u8>,
pending_entries: Vec<Vec<u8>>,
}

impl<R> PackageStream<R> {
fn new(reader: R) -> Self {
PackageStream {
reader,
buffer: Vec::new(),
pending_entries: Vec::new(),
}
}
}

impl<R: BufRead> Iterator for PackageStream<R> {
type Item = Result<Package, std::io::Error>;

fn next(&mut self) -> Option<Self::Item> {
let &mut PackageStream {
ref mut reader,
ref mut buffer,
ref mut pending_entries,
..
} = self;
loop {
buffer.clear();
let len = match reader.read_until(b'\n', buffer) {
Ok(len) => len,
Err(err) => return Some(Err(err)),
};
if len == 0 {
return None;
}
if buffer.starts_with(b"p\0") {
return Some(Ok(Package {
paths: std::mem::take(pending_entries),
meta: buffer[..len].into(),
}));
} else {
pending_entries.push(buffer[..len].into());
}
}
}
}

// Sorts the index database.
//
// This makes the database reproducible, meaning that building it at a given nixpkgs commit
// produces the same file byte for byte.
#[derive(Debug, Parser)]
#[clap(author, about, version)]
struct Args {
/// Directory where the index is stored
#[clap(short, long = "input-db", env = "NIX_INDEX_DATABASE")]
input_database: PathBuf,

#[clap(short, long = "output-db")]
output_database: PathBuf,

/// Zstandard compression level
#[clap(short, long = "compression", default_value = "22")]
compression_level: i32,
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
let args = Args::parse();
let mut file = File::open(args.input_database).unwrap();
let mut magic = [0u8; 4];
file.read_exact(&mut magic)?;
let _version = file.read_u64::<LittleEndian>()?;

let reader = BufReader::new(zstd::Decoder::new(file).unwrap());

let packages = PackageStream::new(reader);

// TODO: use extsort instead of an in-memory sort.
let mut res = packages.collect::<Result<Vec<_>, std::io::Error>>()?;
res.sort();
let mut file = File::create(args.output_database)?;
file.write_all(FILE_MAGIC)?;
file.write_u64::<LittleEndian>(FORMAT_VERSION)?;
let mut encoder = zstd::Encoder::new(file, args.compression_level)?;
encoder.multithread(num_cpus::get() as u32)?;
let mut writer = BufWriter::new(encoder);
for package in res {
for path in &package.paths {
writer.write_all(path)?;
}
writer.write_all(&package.meta)?;
// for path in &package.paths {
// println!("{:?}", String::from_utf8_lossy(path));
// }
// println!("{:?}", String::from_utf8_lossy(&package.meta));
}
if let Ok(enc) = writer.into_inner() {
enc.finish()?;
}
Ok(())
}
4 changes: 2 additions & 2 deletions src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ use crate::package::StorePath;
/// The version of the database format supported by this nix-index version.
///
/// This should be updated whenever you make an incompatible change to the database format.
const FORMAT_VERSION: u64 = 1;
pub const FORMAT_VERSION: u64 = 1;

/// The magic for nix-index database files, used to ensure that the file we're passed is
/// actually a file generated by nix-index.
const FILE_MAGIC: &[u8] = b"NIXI";
pub const FILE_MAGIC: &[u8] = b"NIXI";

/// A writer for creating a new file database.
pub struct Writer {
Expand Down
7 changes: 4 additions & 3 deletions src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ use crate::{hydra, nixpkgs, package::StorePath};

#[derive(Error, Debug)]
pub enum Error {
#[error("querying available packages failed: {source}")]
#[error("querying available packages in set {} failed: {source}", package_set.as_ref().unwrap_or(&".".to_string()))]
QueryPackages {
package_set: Option<String>,
#[source]
source: nixpkgs::Error,
},
Expand Down Expand Up @@ -36,13 +37,13 @@ pub enum Error {
#[error("writing the paths.cache file failed: {source}")]
WritePathsCache {
#[source]
source: Box<dyn std::error::Error>,
source: Box<dyn std::error::Error + Send>,
},
#[error("creating the database at '{path:?}' failed: {source}")]
CreateDatabase {
path: PathBuf,
#[source]
source: Box<dyn std::error::Error>,
source: Box<dyn std::error::Error + Send>,
},
#[error("creating the directory for the database at '{path:?}' failed: {source}")]
CreateDatabaseDir {
Expand Down
31 changes: 23 additions & 8 deletions src/listings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ fn fetch_listings_impl(

// Processes a single store path, fetching the file listing for it and
// adding its references to the queue
let process = move |mut handle: WorkSetHandle<_, _>, path: StorePath| async move {
let process = move |_handle: WorkSetHandle<_, _>, path: StorePath| async move {
let Some(parsed) = fetcher
.fetch_references(path.clone())
.map_err(|e| Error::FetchReferences { path, source: e })
Expand All @@ -87,10 +87,12 @@ fn fetch_listings_impl(
return Ok(None);
};

for reference in parsed.references {
let hash = reference.hash().into_owned();
handle.add_work(hash, reference);
}
// Exclude non-toplevel packages as they are not reachable as nixpkgs attrs
// and result in non-deterninism in the produced index.
// for reference in parsed.references {
// let hash = reference.hash().into_owned();
// handle.add_work(hash, reference);
// }

let path = parsed.store_path.clone();
let nar_path = parsed.nar_path;
Expand Down Expand Up @@ -164,10 +166,23 @@ pub fn fetch<'a>(
let all_paths = all_queries
.par_iter()
.flat_map_iter(|&(system, scope)| {
nixpkgs::query_packages(nixpkgs, system, scope.as_deref(), show_trace)
nixpkgs::query_packages(nixpkgs, system, scope.as_deref(), show_trace).map(|x| {
x.map_err(|e| Error::QueryPackages {
package_set: scope.as_deref().map(|s| s.to_string()),
source: e,
})
})
})
.filter_map(|res| match res {
Ok(path) => Some(path),
Err(e) => {
// Older versions of nixpkgs may not have all scopes, so we skip them instead
// of completely bailing out.
eprintln!("Error getting package set: {e}");
None
}
})
.collect::<std::result::Result<_, nixpkgs::Error>>()
.map_err(|e| Error::QueryPackages { source: e })?;
.collect::<Vec<_>>();

Ok(fetch_listings_impl(fetcher, jobs, all_paths))
}
12 changes: 11 additions & 1 deletion src/nixpkgs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ pub fn query_packages(
parser: None,
child: None,
cmd: Some(cmd),
had_error: false,
}
}

Expand All @@ -74,6 +75,7 @@ pub struct PackagesQuery<R: Read> {
parser: Option<PackagesParser<R>>,
child: Option<Child>,
cmd: Option<Command>,
had_error: bool,
}

impl PackagesQuery<ChildStdout> {
Expand Down Expand Up @@ -125,6 +127,11 @@ impl Iterator for PackagesQuery<ChildStdout> {
type Item = Result<StorePath, Error>;

fn next(&mut self) -> Option<Self::Item> {
// if we emitted an error in the previous call to next,
// there is nothing meaningful we can emit, so signal that we have no more elements to emit.
if self.had_error {
return None;
}
if let Err(e) = self.ensure_initialized() {
return Some(Err(e));
}
Expand All @@ -137,7 +144,10 @@ impl Iterator for PackagesQuery<ChildStdout> {
//
// If the subprocess returned an error, then the parser probably tried to parse garbage output
// so we will ignore the parser error and instead return the error printed by the subprocess.
v.map_err(|e| self.check_error().unwrap_or_else(|| Error::from(e)))
v.map_err(|e| {
self.had_error = true;
self.check_error().unwrap_or_else(|| Error::from(e))
})
})
.or_else(|| {
self.parser = None;
Expand Down