Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.

Commit 3bc54e4

Browse files
committed
Batch index and blob writing
about 35% faster now, faster than DbLedger in benchmarks.
1 parent 9b99faf commit 3bc54e4

File tree

3 files changed

+29
-49
lines changed

3 files changed

+29
-49
lines changed

src/blob_store.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -227,9 +227,9 @@ impl Store {
227227
pub fn slot_data_from(
228228
&self,
229229
slot: u64,
230-
range: std::ops::RangeFrom<u64>,
230+
start_index: u64,
231231
) -> Result<impl Iterator<Item = Result<Vec<u8>>>> {
232-
self.slot_data(slot, range.start..std::u64::MAX)
232+
self.slot_data(slot, start_index..std::u64::MAX)
233233
}
234234

235235
pub fn slot_data_range(
@@ -261,9 +261,9 @@ impl Store {
261261
pub fn slot_entries_from(
262262
&self,
263263
slot: u64,
264-
range: std::ops::RangeFrom<u64>,
264+
start_index: u64,
265265
) -> Result<impl Iterator<Item = Result<Entry>>> {
266-
self.slot_entries_range(slot, range.start..std::u64::MAX)
266+
self.slot_entries_range(slot, start_index..std::u64::MAX)
267267
}
268268

269269
pub fn slot_entries_range(

src/blob_store/store_impl.rs

Lines changed: 24 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::packet::{Blob, BlobError, BLOB_HEADER_SIZE};
22
use crate::result::Error as SErr;
33

4-
use byteorder::{BigEndian, ByteOrder};
4+
use byteorder::{BigEndian, ByteOrder, WriteBytesExt};
55

66
use std::borrow::Borrow;
77
use std::collections::HashMap;
@@ -17,7 +17,8 @@ const INDEX_FILE_NAME: &str = "index";
1717
pub(super) const ERASURE_FILE_NAME: &str = "erasure";
1818
pub(super) const ERASURE_INDEX_FILE_NAME: &str = "erasure_index";
1919

20-
const DATA_FILE_BUF_SIZE: usize = 64 * 1024 * 32;
20+
//const DATA_FILE_BUF_SIZE: usize = 2 * 1024 * 1024;
21+
const DATA_FILE_BUF_SIZE: usize = 64 * 1024;
2122

2223
pub(super) const INDEX_RECORD_SIZE: u64 = 3 * 8;
2324

@@ -133,7 +134,6 @@ impl Store {
133134
{
134135
let mut blobs: Vec<_> = iter.into_iter().collect();
135136
assert!(!blobs.is_empty());
136-
println!("insert_blobs: inserting {} blobs", blobs.len());
137137

138138
// sort on lexi order (slot_idx, blob_idx)
139139
// TODO: this sort may cause panics while malformed blobs result in `Result`s elsewhere
@@ -160,14 +160,7 @@ impl Store {
160160
.or_insert(index..(index + 1));
161161
}
162162

163-
println!("insert_blobs: arranged slots and stuff");
164-
165163
for (slot, range) in slot_ranges {
166-
println!("insert_blobs: slot = {}", slot);
167-
println!(
168-
"insert_blobs: range.start = {}, range.end = {}",
169-
range.start, range.end
170-
);
171164
let slot_path = self.mk_slot_path(slot);
172165
ensure_slot(&slot_path)?;
173166

@@ -177,18 +170,6 @@ impl Store {
177170
slot_path.join(store_impl::INDEX_FILE_NAME),
178171
);
179172

180-
println!(
181-
"insert_blobs: paths: data = {}",
182-
data_path.to_string_lossy()
183-
);
184-
println!(
185-
"insert_blobs: paths: meta = {}",
186-
meta_path.to_string_lossy()
187-
);
188-
println!(
189-
"insert_blobs: paths: index = {}",
190-
index_path.to_string_lossy()
191-
);
192173
// load meta_data
193174
let (mut meta_file, mut meta) = if meta_path.exists() {
194175
let mut f = OpenOptions::new().read(true).write(true).open(&meta_path)?;
@@ -207,46 +188,40 @@ impl Store {
207188
)
208189
};
209190

210-
println!("insert_blobs: loaded meta data: {:?}", meta);
191+
let slot_blobs = &blobs[range];
192+
let mut idx_buf = Vec::with_capacity(slot_blobs.len() * INDEX_RECORD_SIZE as usize);
211193

212194
let mut data_wtr =
213195
BufWriter::with_capacity(DATA_FILE_BUF_SIZE, open_append(&data_path)?);
214-
let mut index_wtr = BufWriter::new(open_append(&index_path)?);
215-
let slot_blobs = &blobs[range];
216-
let mut blob_indices = Vec::with_capacity(slot_blobs.len());
217-
218-
let mut idx_buf = [0u8; INDEX_RECORD_SIZE as usize];
196+
let mut offset = data_wtr.seek(SeekFrom::Current(0))?;
197+
let mut blob_slices_to_write = Vec::with_capacity(slot_blobs.len());
219198

220199
for blob in slot_blobs {
221200
let blob = blob.borrow();
222201
let blob_index = blob.index().map_err(bad_blob)?;
223202
let blob_size = blob.size().map_err(bad_blob)?;
224203

225-
let offset = data_wtr.seek(SeekFrom::Current(0))?;
226-
// TODO: blob.size() is wrong here, but should be right
227-
let serialized_blob_datas = &blob.data[..BLOB_HEADER_SIZE + blob_size];
204+
let serialized_blob_data = &blob.data[..BLOB_HEADER_SIZE + blob_size];
228205
let serialized_entry_data = &blob.data[BLOB_HEADER_SIZE..];
229206
let entry: Entry = bincode::deserialize(serialized_entry_data)
230207
.expect("Blobs must be well formed by the time they reach the ledger");
231208

232-
data_wtr.write_all(&serialized_blob_datas)?;
233-
let data_len = serialized_blob_datas.len() as u64;
209+
blob_slices_to_write.push(serialized_blob_data);
210+
let data_len = serialized_blob_data.len() as u64;
234211

235212
let blob_idx = BlobIndex {
236213
index: blob_index,
237214
size: data_len,
238215
offset,
239216
};
240217

241-
BigEndian::write_u64(&mut idx_buf[0..8], blob_idx.index);
242-
BigEndian::write_u64(&mut idx_buf[8..16], blob_idx.offset);
243-
BigEndian::write_u64(&mut idx_buf[16..24], blob_idx.size);
218+
offset += data_len;
244219

245-
// update index file
246-
// TODO: batch writes for outer loop
247-
blob_indices.push(blob_idx);
248-
println!("insert_blobs: blob_idx = {:?}", blob_idx);
249-
index_wtr.write_all(&idx_buf)?;
220+
// Write indices to buffer, which will be written to index file
221+
// in the outer (per-slot) loop
222+
idx_buf.write_u64::<BigEndian>(blob_idx.index)?;
223+
idx_buf.write_u64::<BigEndian>(blob_idx.offset)?;
224+
idx_buf.write_u64::<BigEndian>(blob_idx.size)?;
250225

251226
// update meta. write to file once in outer loop
252227
if blob_index > meta.received {
@@ -263,9 +238,15 @@ impl Store {
263238
}
264239
}
265240

266-
println!("insert_blobs: saving meta data: {:?}", meta);
241+
let mut index_wtr = BufWriter::new(open_append(&index_path)?);
242+
243+
// write blob slices
244+
for slice in blob_slices_to_write {
245+
data_wtr.write_all(slice)?;
246+
}
267247

268248
bincode::serialize_into(&mut meta_file, &meta)?;
249+
index_wtr.write_all(&idx_buf)?;
269250

270251
data_wtr.flush()?;
271252
let data_f = data_wtr.into_inner()?;
@@ -298,7 +279,7 @@ impl Store {
298279
let index_rdr = File::open(&index_path)?;
299280

300281
let index_size = index_rdr.metadata()?.len();
301-
println!("slot_data: index_size = {}", index_size);
282+
302283
let mut index_rdr = BufReader::new(index_rdr);
303284
let mut buf = [0u8; INDEX_RECORD_SIZE as usize];
304285
let mut blob_indices: Vec<BlobIndex> =
@@ -321,7 +302,6 @@ impl Store {
321302
}
322303

323304
blob_indices.sort_unstable_by_key(|bix| bix.index);
324-
println!("slot_data: blob_indices: {:#?}", blob_indices);
325305

326306
let data_rdr = BufReader::new(File::open(&data_path)?);
327307

tests/blob_store.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ fn test_insert_noncontiguous_blobs() {
8484
.collect::<Vec<Vec<u8>>>();
8585

8686
let retrieved: Result<Vec<_>> = store
87-
.slot_data_from(0, 0..)
87+
.slot_data_from(0, 0)
8888
.expect("couldn't create slot daaa iterator")
8989
.collect();
9090
let retrieved = retrieved.expect("Bad iterator somehow or something");

0 commit comments

Comments
 (0)