Skip to content

Commit d98f807

Browse files
committed
add snapshot validation logic
1 parent f9061cc commit d98f807

File tree

6 files changed

+242
-2
lines changed

6 files changed

+242
-2
lines changed

crates/iceberg/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1+
#![feature(let_chains)]
12
// Licensed to the Apache Software Foundation (ASF) under one
3+
24
// or more contributor license agreements. See the NOTICE file
35
// distributed with this work for additional information
46
// regarding copyright ownership. The ASF licenses this file

crates/iceberg/src/spec/snapshot.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ pub type SnapshotRef = Arc<Snapshot>;
4242
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
4343
#[serde(rename_all = "lowercase")]
4444
/// The operation field is used by some operations, like snapshot expiration, to skip processing certain snapshots.
45+
#[derive(Hash)]
4546
pub enum Operation {
4647
/// Only data files were added and no files were removed.
4748
Append,

crates/iceberg/src/transaction/append.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,19 @@
1616
// under the License.
1717

1818
use std::collections::{HashMap, HashSet};
19+
use std::sync::Arc;
1920

2021
use arrow_array::StringArray;
2122
use futures::TryStreamExt;
2223
use uuid::Uuid;
2324

2425
use crate::error::Result;
25-
use crate::spec::{DataFile, ManifestEntry, ManifestFile, Operation};
26+
use crate::spec::{DataFile, ManifestEntry, ManifestFile, Operation, Snapshot, SnapshotRef};
27+
use crate::table::Table;
2628
use crate::transaction::snapshot::{
2729
DefaultManifestProcess, SnapshotProduceAction, SnapshotProduceOperation,
2830
};
31+
use crate::transaction::validate::SnapshotValidator;
2932
use crate::transaction::Transaction;
3033
use crate::writer::file_writer::ParquetWriter;
3134
use crate::{Error, ErrorKind};
@@ -209,6 +212,8 @@ impl SnapshotProduceOperation for FastAppendOperation {
209212
}
210213
}
211214

215+
impl SnapshotValidator for FastAppendOperation {}
216+
212217
#[cfg(test)]
213218
mod tests {
214219
use crate::scan::tests::TableTestFixture;

crates/iceberg/src/transaction/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
mod append;
2121
mod snapshot;
2222
mod sort_order;
23+
mod validate;
2324

2425
use std::cmp::Ordering;
2526
use std::collections::HashMap;

crates/iceberg/src/transaction/snapshot.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,13 @@ use crate::spec::{
3030
MAIN_BRANCH, PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT,
3131
PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT,
3232
};
33+
use crate::transaction::validate::SnapshotValidator;
3334
use crate::transaction::Transaction;
3435
use crate::{Error, ErrorKind, TableRequirement, TableUpdate};
3536

3637
const META_ROOT_PATH: &str = "metadata";
3738

38-
pub(crate) trait SnapshotProduceOperation: Send + Sync {
39+
pub(crate) trait SnapshotProduceOperation: Send + SnapshotValidator + Sync {
3940
fn operation(&self) -> Operation;
4041
#[allow(unused)]
4142
fn delete_entries(
@@ -308,6 +309,11 @@ impl<'a> SnapshotProduceAction<'a> {
308309
.await?;
309310
let next_seq_num = self.tx.current_table.metadata().next_sequence_number();
310311

312+
snapshot_produce_operation.validate(
313+
&self.tx.current_table,
314+
self.tx.current_table.metadata().current_snapshot(),
315+
);
316+
311317
let summary = self
312318
.summary(&snapshot_produce_operation)
313319
.map_err(|err| {
Lines changed: 225 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,225 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::collections::HashSet;
19+
use std::sync::Arc;
20+
21+
use crate::spec::{ManifestContentType, ManifestFile, Operation, SnapshotRef, TableMetadata};
22+
use crate::table::Table;
23+
24+
pub(crate) trait SnapshotValidator {
25+
fn validate(&self, table: &Table, snapshot: Option<&SnapshotRef>) -> () {}
26+
27+
#[allow(dead_code)]
28+
async fn validation_history(
29+
&self,
30+
base: &Table,
31+
to_snapshot: Option<&SnapshotRef>,
32+
from_snapshot: Option<&SnapshotRef>,
33+
matching_operations: HashSet<Operation>,
34+
manifest_content_type: ManifestContentType,
35+
) -> (Vec<ManifestFile>, HashSet<i64>) {
36+
let mut manifests = vec![];
37+
let mut new_snapshots = HashSet::new();
38+
let mut last_snapshot: Option<&SnapshotRef> = None;
39+
40+
let snapshots = Self::ancestors_between(to_snapshot, from_snapshot, base.metadata());
41+
for current_snapshot in &snapshots {
42+
last_snapshot = Some(current_snapshot);
43+
44+
if matching_operations.contains(&current_snapshot.summary().operation) {
45+
new_snapshots.insert(current_snapshot.snapshot_id().clone());
46+
current_snapshot
47+
.load_manifest_list(base.file_io(), base.metadata())
48+
.await
49+
.expect("Failed to load manifest list!")
50+
.entries()
51+
.into_iter()
52+
.for_each(|manifest| {
53+
if manifest.content == manifest_content_type
54+
&& manifest.added_snapshot_id == current_snapshot.snapshot_id()
55+
{
56+
manifests.push(manifest.clone());
57+
}
58+
});
59+
}
60+
}
61+
62+
if last_snapshot.is_some()
63+
&& last_snapshot.unwrap().parent_snapshot_id()
64+
!= from_snapshot.map(|snapshot| snapshot.snapshot_id())
65+
{
66+
panic!("Cannot determine history between starting snapshot {} and the last known ancestor {}",
67+
from_snapshot.map_or_else(
68+
|| "None".to_string(),
69+
|snapshot| snapshot.snapshot_id().to_string()),
70+
last_snapshot.map_or_else(
71+
|| "None".to_string(),
72+
|snapshot| snapshot.parent_snapshot_id().unwrap().to_string()));
73+
}
74+
75+
(manifests, new_snapshots)
76+
}
77+
78+
fn ancestors_between(
79+
to_snapshot: Option<&SnapshotRef>,
80+
from_snapshot: Option<&SnapshotRef>,
81+
table_metadata: &TableMetadata,
82+
) -> Vec<SnapshotRef> {
83+
let mut snapshots = Vec::new();
84+
let mut current_snapshot = to_snapshot;
85+
while let Some(snapshot) = current_snapshot {
86+
snapshots.push(Arc::clone(&snapshot));
87+
match snapshot.parent_snapshot_id() {
88+
Some(parent_snapshot_id)
89+
if from_snapshot.is_some()
90+
&& parent_snapshot_id == from_snapshot.unwrap().snapshot_id() =>
91+
{
92+
break
93+
}
94+
Some(parent_snapshot_id) => {
95+
current_snapshot = table_metadata.snapshot_by_id(parent_snapshot_id)
96+
}
97+
None => break,
98+
}
99+
}
100+
101+
snapshots
102+
}
103+
}
104+
105+
#[cfg(test)]
106+
mod tests {
107+
use std::collections::HashSet;
108+
109+
use crate::spec::{
110+
DataContentType, DataFileBuilder, DataFileFormat, Literal, ManifestContentType, Operation,
111+
SnapshotRef, Struct,
112+
};
113+
use crate::transaction::tests::{make_v2_minimal_table, make_v2_table};
114+
use crate::transaction::validate::SnapshotValidator;
115+
use crate::transaction::{Table, Transaction};
116+
use crate::TableUpdate;
117+
118+
struct TestValidator {}
119+
120+
impl SnapshotValidator for TestValidator {}
121+
122+
async fn make_v2_table_with_updates() -> (Table, Vec<TableUpdate>) {
123+
let table = make_v2_minimal_table();
124+
let tx = Transaction::new(&table);
125+
let mut action = tx.fast_append(None, vec![]).unwrap();
126+
127+
let data_file_1 = DataFileBuilder::default()
128+
.content(DataContentType::Data)
129+
.file_path("test/1.parquet".to_string())
130+
.file_format(DataFileFormat::Parquet)
131+
.file_size_in_bytes(100)
132+
.record_count(1)
133+
.partition_spec_id(table.metadata().default_partition_spec_id())
134+
.partition(Struct::from_iter([Some(Literal::long(300))]))
135+
.build()
136+
.unwrap();
137+
138+
let data_file_2 = DataFileBuilder::default()
139+
.content(DataContentType::Data)
140+
.file_path("test/2.parquet".to_string())
141+
.file_format(DataFileFormat::Parquet)
142+
.file_size_in_bytes(100)
143+
.record_count(1)
144+
.partition_spec_id(table.metadata().default_partition_spec_id())
145+
.partition(Struct::from_iter([Some(Literal::long(300))]))
146+
.build()
147+
.unwrap();
148+
149+
action.add_data_files(vec![data_file_1.clone()]).unwrap();
150+
let tx = action.apply().await.unwrap();
151+
let mut action = tx.fast_append(None, vec![]).unwrap();
152+
action.add_data_files(vec![data_file_2.clone()]).unwrap();
153+
let tx = action.apply().await.unwrap();
154+
155+
(table.clone(), tx.updates)
156+
}
157+
158+
#[tokio::test]
159+
async fn test_validation_history() {
160+
let (table, updates) = make_v2_table_with_updates().await;
161+
let parent_snapshot = if let TableUpdate::AddSnapshot { snapshot } = &updates[0] {
162+
SnapshotRef::new(snapshot.clone())
163+
} else {
164+
unreachable!()
165+
};
166+
let current_snapshot = if let TableUpdate::AddSnapshot { snapshot } = &updates[2] {
167+
SnapshotRef::new(snapshot.clone())
168+
} else {
169+
unreachable!()
170+
};
171+
172+
let test_validator = TestValidator {};
173+
174+
// specifying from_snapshot, validating up to the from_snapshot
175+
let (manifests, snapshots) = test_validator
176+
.validation_history(
177+
&table,
178+
Some(&current_snapshot),
179+
Some(&parent_snapshot),
180+
HashSet::from([Operation::Append]),
181+
ManifestContentType::Data,
182+
)
183+
.await;
184+
185+
manifests
186+
.iter()
187+
.for_each(|manifest| assert_eq!(manifest.content, ManifestContentType::Data));
188+
assert_eq!(snapshots.into_iter().collect::<Vec<_>>(), vec![
189+
current_snapshot.snapshot_id()
190+
]);
191+
}
192+
193+
#[test]
194+
fn test_ancestor_between() {
195+
let table = make_v2_table();
196+
let current_snapshot = table.metadata().current_snapshot();
197+
let parent_snapshot_id = current_snapshot.unwrap().parent_snapshot_id().unwrap();
198+
let parent_snapshot = table.metadata().snapshot_by_id(parent_snapshot_id);
199+
200+
// not specifying from_snapshot, listing all ancestors
201+
let all_ancestors =
202+
TestValidator::ancestors_between(current_snapshot, None, table.metadata());
203+
assert_eq!(
204+
vec![
205+
current_snapshot.unwrap().snapshot_id(),
206+
current_snapshot.unwrap().parent_snapshot_id().unwrap()
207+
],
208+
all_ancestors
209+
.iter()
210+
.map(|snapshot| snapshot.snapshot_id())
211+
.collect::<Vec<_>>()
212+
);
213+
214+
// specifying from_snapshot, listing only 1 snapshot
215+
let ancestors =
216+
TestValidator::ancestors_between(current_snapshot, parent_snapshot, table.metadata());
217+
assert_eq!(
218+
vec![current_snapshot.unwrap().snapshot_id()],
219+
ancestors
220+
.iter()
221+
.map(|snapshot| snapshot.snapshot_id())
222+
.collect::<Vec<_>>()
223+
);
224+
}
225+
}

0 commit comments

Comments
 (0)