Skip to content

Commit 478d4da

Browse files
sjuddConvex, Inc.
authored and
Convex, Inc.
committed
Implement upload_previous_segments for text indexes (#26033)
GitOrigin-RevId: 1311c504f4ca466145e5f02b5199c81766ca9321
1 parent 9d67856 commit 478d4da

File tree

5 files changed

+85
-24
lines changed

5 files changed

+85
-24
lines changed

crates/database/src/text_index_worker/text_meta.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -134,10 +134,16 @@ impl SearchIndex for TextSearchIndex {
134134
}
135135

136136
async fn upload_previous_segments(
137-
_storage: Arc<dyn Storage>,
138-
_segments: Self::PreviousSegments,
137+
storage: Arc<dyn Storage>,
138+
segments: Self::PreviousSegments,
139139
) -> anyhow::Result<Vec<Self::Segment>> {
140-
anyhow::bail!("Not implemented");
140+
segments
141+
.0
142+
.into_iter()
143+
.map(|segment| segment.upload_metadata(storage.clone()))
144+
.collect::<FuturesUnordered<_>>()
145+
.try_collect::<Vec<_>>()
146+
.await
141147
}
142148

143149
fn estimate_document_size(_schema: &Self::Schema, _doc: &ResolvedDocument) -> u64 {

crates/search/src/fragmented_segment.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -320,10 +320,9 @@ impl MutableFragmentedSegmentMetadata {
320320
let mut buf = vec![];
321321
self.mutated_deleted_bitset.write(&mut buf)?;
322322

323-
let filename = format!("deleted_bitset");
324323
let object_key = upload_single_file(
325324
&mut buf.as_slice(),
326-
filename,
325+
"deleted_bitset".to_string(),
327326
storage.clone(),
328327
SearchFileType::VectorDeletedBitset,
329328
)

crates/search/src/incremental_index.rs

Lines changed: 54 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,11 @@ use crate::{
4343
archive::extract_zip,
4444
constants::CONVEX_EN_TOKENIZER,
4545
convex_en,
46-
disk_index::download_single_file_zip,
46+
disk_index::{
47+
download_single_file_zip,
48+
upload_single_file,
49+
},
50+
SearchFileType,
4751
TantivySearchIndexSchema,
4852
SEARCH_FIELD_ID,
4953
};
@@ -68,6 +72,7 @@ pub struct UpdatableTextSegment {
6872
inverted_index: Arc<InvertedIndexReader>,
6973
id_tracker: StaticIdTracker,
7074
deletion_tracker: MemoryDeletionTracker,
75+
original: FragmentedTextSegment,
7176
}
7277

7378
fn inverted_index_from_index(index: &Index) -> anyhow::Result<Arc<InvertedIndexReader>> {
@@ -94,6 +99,51 @@ impl UpdatableTextSegment {
9499
inverted_index,
95100
id_tracker,
96101
deletion_tracker,
102+
// TODO(sam): We should probably create this outside of this method, then pass it
103+
// through here. For now this is unused in these tests.
104+
original: FragmentedTextSegment {
105+
segment_key: "segment".try_into()?,
106+
id_tracker_key: "id_tracker".try_into()?,
107+
deleted_terms_table_key: "deleted_terms".try_into()?,
108+
alive_bitset_key: "bitset".try_into()?,
109+
num_indexed_documents: 0,
110+
id: "test_id".to_string(),
111+
},
112+
})
113+
}
114+
115+
pub async fn upload_metadata(
116+
self,
117+
storage: Arc<dyn Storage>,
118+
) -> anyhow::Result<FragmentedTextSegment> {
119+
// TODO(CX-6511): Skip the upload and return the original file if this segment
120+
// wasn't modified.
121+
122+
let mut bitset_buf = vec![];
123+
let mut deleted_terms_buf = vec![];
124+
self.deletion_tracker
125+
.write(&mut bitset_buf, &mut deleted_terms_buf)?;
126+
127+
let mut bitset_slice = bitset_buf.as_slice();
128+
let upload_bitset = upload_single_file(
129+
&mut bitset_slice,
130+
"alive_bitset".to_string(),
131+
storage.clone(),
132+
SearchFileType::TextAliveBitset,
133+
);
134+
let mut deleted_terms_slice = deleted_terms_buf.as_slice();
135+
let upload_deleted_terms = upload_single_file(
136+
&mut deleted_terms_slice,
137+
"deleted_terms".to_string(),
138+
storage.clone(),
139+
SearchFileType::TextDeletedTerms,
140+
);
141+
let (alive_bitset_key, deleted_terms_table_key) =
142+
futures::try_join!(upload_bitset, upload_deleted_terms)?;
143+
Ok(FragmentedTextSegment {
144+
deleted_terms_table_key,
145+
alive_bitset_key,
146+
..self.original
97147
})
98148
}
99149

@@ -150,6 +200,7 @@ impl UpdatableTextSegment {
150200
inverted_index,
151201
id_tracker,
152202
deletion_tracker,
203+
original,
153204
})
154205
}
155206
}
@@ -292,7 +343,7 @@ pub async fn build_new_segment(
292343
let new_deletion_tracker = MemoryDeletionTracker::new(new_id_tracker.num_ids() as u32);
293344
let alive_bit_set_path = dir.join(ALIVE_BITSET_PATH);
294345
let deleted_terms_path = dir.join(DELETED_TERMS_PATH);
295-
new_deletion_tracker.write(&alive_bit_set_path, &deleted_terms_path)?;
346+
new_deletion_tracker.write_to_path(&alive_bit_set_path, &deleted_terms_path)?;
296347
let id_tracker_path = dir.join(ID_TRACKER_PATH);
297348
new_id_tracker.write(&id_tracker_path)?;
298349

@@ -371,7 +422,7 @@ pub async fn merge_segments(
371422
let tracker = MemoryDeletionTracker::new(num_docs as u32);
372423
let alive_bit_set_path = dir.to_path_buf().join(ALIVE_BITSET_PATH);
373424
let deleted_terms_path = dir.to_path_buf().join(DELETED_TERMS_PATH);
374-
tracker.write(&alive_bit_set_path, &deleted_terms_path)?;
425+
tracker.write_to_path(&alive_bit_set_path, &deleted_terms_path)?;
375426
Ok(TextSegmentPaths {
376427
index_path: index_dir,
377428
id_tracker_path,

crates/search/src/searcher/searcher.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1554,7 +1554,7 @@ mod tests {
15541554
let dir = previous_segment_dir.join(format!("segment_{i}"));
15551555
std::fs::create_dir(&dir)?;
15561556
updated_deletion_tracker
1557-
.write(dir.join(ALIVE_BITSET_PATH), dir.join(DELETED_TERMS_PATH))?;
1557+
.write_to_path(dir.join(ALIVE_BITSET_PATH), dir.join(DELETED_TERMS_PATH))?;
15581558
previous_segment_dirs.push(dir);
15591559
}
15601560
Ok(TestIndex {

crates/text_search/src/tracker.rs

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -301,25 +301,30 @@ impl MemoryDeletionTracker {
301301
self.num_deleted_terms = num_deleted_terms;
302302
}
303303

304-
pub fn write<P: AsRef<Path>>(
304+
pub fn write_to_path<P: AsRef<Path>>(
305305
self,
306306
alive_bitset_path: P,
307307
deleted_terms_path: P,
308308
) -> anyhow::Result<()> {
309-
{
310-
let mut out = BufWriter::new(File::create(alive_bitset_path)?);
311-
self.alive_bitset.serialize(&mut out)?;
312-
out.into_inner()?.sync_all()?;
313-
}
314-
{
315-
let mut out = BufWriter::new(File::create(deleted_terms_path)?);
316-
Self::write_deleted_terms(
317-
self.term_to_deleted_documents,
318-
self.num_deleted_terms,
319-
&mut out,
320-
)?;
321-
out.into_inner()?.sync_all()?;
322-
}
309+
let mut alive_bitset = BufWriter::new(File::create(alive_bitset_path)?);
310+
let mut deleted_terms = BufWriter::new(File::create(deleted_terms_path)?);
311+
self.write(&mut alive_bitset, &mut deleted_terms)?;
312+
alive_bitset.into_inner()?.sync_all()?;
313+
deleted_terms.into_inner()?.sync_all()?;
314+
Ok(())
315+
}
316+
317+
pub fn write(
318+
self,
319+
mut alive_bitset: impl Write,
320+
mut deleted_terms: impl Write,
321+
) -> anyhow::Result<()> {
322+
self.alive_bitset.serialize(&mut alive_bitset)?;
323+
Self::write_deleted_terms(
324+
self.term_to_deleted_documents,
325+
self.num_deleted_terms,
326+
&mut deleted_terms,
327+
)?;
323328
Ok(())
324329
}
325330

0 commit comments

Comments
 (0)