Skip to content

Commit 9d7e431

Browse files
committed
Merge remote-tracking branch 'upstream/main' into cluster
# Conflicts: # src/query/service/src/pipelines/pipeline_builder.rs
2 parents b8f9544 + 10b2695 commit 9d7e431

File tree

49 files changed

+1198
-554
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+1198
-554
lines changed

Cargo.lock

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/query/catalog/src/plan/datasource/datasource_info/stage.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,14 @@ use databend_common_storage::StageFilesInfo;
3030

3131
#[derive(serde::Serialize, serde::Deserialize, Clone, PartialEq, Eq)]
3232
pub struct StageTableInfo {
33+
// common
34+
pub stage_root: String,
35+
pub stage_info: StageInfo,
36+
37+
// copy into table only
3338
pub schema: TableSchemaRef,
3439
pub default_exprs: Option<Vec<RemoteDefaultExpr>>,
3540
pub files_info: StageFilesInfo,
36-
pub stage_info: StageInfo,
3741
pub files_to_copy: Option<Vec<StageFileInfo>>,
3842
// files that
3943
// - are listed as candidates to be copied
@@ -42,9 +46,11 @@ pub struct StageTableInfo {
4246
// - may need to be purged as well (depends on the copy options)
4347
pub duplicated_files_detected: Vec<String>,
4448
pub is_select: bool,
45-
pub copy_into_location_options: CopyIntoLocationOptions,
4649
pub copy_into_table_options: CopyIntoTableOptions,
47-
pub stage_root: String,
50+
51+
// copy into location only
52+
pub copy_into_location_ordered: bool,
53+
pub copy_into_location_options: CopyIntoLocationOptions,
4854
}
4955

5056
impl StageTableInfo {

src/query/catalog/src/table_context.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -354,6 +354,8 @@ pub trait TableContext: Send + Sync {
354354
previous_snapshot: Option<Arc<TableSnapshot>>,
355355
) -> Result<TableMetaTimestamps>;
356356

357+
fn clear_table_meta_timestamps_cache(&self);
358+
357359
fn get_read_block_thresholds(&self) -> BlockThresholds;
358360
fn set_read_block_thresholds(&self, _thresholds: BlockThresholds);
359361

src/query/ee/src/storages/fuse/operations/handler.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,6 @@
1414

1515
use std::sync::Arc;
1616

17-
use chrono::DateTime;
18-
use chrono::Utc;
1917
use databend_common_base::base::GlobalInstance;
2018
use databend_common_catalog::table::Table;
2119
use databend_common_catalog::table_context::AbortChecker;
@@ -38,10 +36,9 @@ impl VacuumHandler for RealVacuumHandler {
3836
&self,
3937
table: &dyn Table,
4038
ctx: Arc<dyn TableContext>,
41-
retention_time: DateTime<Utc>,
4239
dry_run: bool,
4340
) -> Result<Option<Vec<String>>> {
44-
do_vacuum(table, ctx, retention_time, dry_run).await
41+
do_vacuum(table, ctx, dry_run).await
4542
}
4643

4744
async fn do_vacuum2(

src/query/ee/src/storages/fuse/operations/vacuum_table.rs

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ use std::time::Instant;
1818

1919
use chrono::DateTime;
2020
use chrono::Utc;
21-
use databend_common_catalog::table::NavigationPoint;
2221
use databend_common_catalog::table::Table;
2322
use databend_common_catalog::table_context::TableContext;
2423
use databend_common_exception::ErrorCode;
@@ -28,6 +27,7 @@ use databend_common_storages_fuse::io::SnapshotLiteExtended;
2827
use databend_common_storages_fuse::io::SnapshotsIO;
2928
use databend_common_storages_fuse::io::TableMetaLocationGenerator;
3029
use databend_common_storages_fuse::FuseTable;
30+
use databend_common_storages_fuse::RetentionPolicy;
3131
use databend_storages_common_cache::LoadParams;
3232
use databend_storages_common_table_meta::meta::SegmentInfo;
3333

@@ -358,23 +358,39 @@ pub async fn do_dry_run_orphan_files(
358358
pub async fn do_vacuum(
359359
table: &dyn Table,
360360
ctx: Arc<dyn TableContext>,
361-
retention_time: DateTime<Utc>,
362361
dry_run: bool,
363362
) -> Result<Option<Vec<String>>> {
364363
let fuse_table = FuseTable::try_from_table(table)?;
365364
let start = Instant::now();
366365
// First, do purge
367-
let instant = Some(NavigationPoint::TimePoint(retention_time));
368366
let dry_run_limit = if dry_run { Some(DRY_RUN_LIMIT) } else { None };
367+
// Let the table navigate to the point according to the table's retention policy.
368+
let navigation_point = None;
369+
let keep_last_snapshot = true;
369370
let purge_files_opt = fuse_table
370-
.purge(ctx.clone(), instant, dry_run_limit, true, dry_run)
371+
.purge(
372+
ctx.clone(),
373+
navigation_point,
374+
dry_run_limit,
375+
keep_last_snapshot,
376+
dry_run,
377+
)
371378
.await?;
372379
let status = format!("do_vacuum: purged table, cost:{:?}", start.elapsed());
373380
ctx.set_status_info(&status);
374-
let retention = fuse_table.get_data_retention_period(ctx.as_ref())?;
375-
// use min(now - get_retention_period(), retention_time) as gc orphan files retention time
376-
// to protect files that generated by txn which has not been committed being gc.
377-
let retention_time = std::cmp::min(chrono::Utc::now() - retention, retention_time);
381+
let retention_policy = fuse_table.get_data_retention_policy(ctx.as_ref())?;
382+
383+
let retention_period = match retention_policy {
384+
RetentionPolicy::ByTimePeriod(retention_period) => retention_period,
385+
RetentionPolicy::ByNumOfSnapshotsToKeep(_) => {
386+
// Technically, we should derive a reasonable retention period from the ByNumOfSnapshotsToKeep policy,
387+
// but it's not worth the effort since VACUUM2 will replace legacy purge and vacuum soon.
388+
// Use the table retention period for now.
389+
fuse_table.get_data_retention_period(ctx.as_ref())?
390+
}
391+
};
392+
393+
let retention_time = chrono::Utc::now() - retention_period;
378394
if let Some(mut purge_files) = purge_files_opt {
379395
let dry_run_limit = dry_run_limit.unwrap();
380396
if purge_files.len() < dry_run_limit {

src/query/ee/src/storages/fuse/operations/vacuum_table_v2.rs

Lines changed: 116 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ use std::collections::HashSet;
1616
use std::sync::Arc;
1717

1818
use chrono::DateTime;
19-
use chrono::Days;
2019
use chrono::Duration;
20+
use chrono::TimeDelta;
2121
use chrono::Utc;
2222
use databend_common_base::base::uuid::Uuid;
2323
use databend_common_catalog::table::Table;
@@ -32,6 +32,7 @@ use databend_common_storages_fuse::io::MetaReaders;
3232
use databend_common_storages_fuse::io::SegmentsIO;
3333
use databend_common_storages_fuse::io::TableMetaLocationGenerator;
3434
use databend_common_storages_fuse::FuseTable;
35+
use databend_common_storages_fuse::RetentionPolicy;
3536
use databend_storages_common_cache::CacheAccessor;
3637
use databend_storages_common_cache::CacheManager;
3738
use databend_storages_common_cache::LoadParams;
@@ -89,57 +90,93 @@ pub async fn do_vacuum2(
8990
let fuse_table = FuseTable::try_from_table(table)?;
9091
let start = std::time::Instant::now();
9192

92-
let retention_period_in_days = if fuse_table.is_transient() {
93-
0
94-
} else {
95-
ctx.get_settings().get_data_retention_time_in_days()?
96-
};
93+
let retention_policy = fuse_table.get_data_retention_policy(ctx.as_ref())?;
9794

98-
let is_vacuum_all = retention_period_in_days == 0;
95+
// By default, do not vacuum all the historical snapshots.
96+
let mut is_vacuum_all = false;
97+
let mut respect_flash_back_with_lvt = None;
9998

100-
let Some(lvt) = set_lvt(fuse_table, ctx.as_ref(), retention_period_in_days).await? else {
101-
return Ok(vec![]);
102-
};
99+
let snapshots_before_lvt = match retention_policy {
100+
RetentionPolicy::ByTimePeriod(delta_duration) => {
101+
info!("using by ByTimePeriod policy {:?}", delta_duration);
102+
let retention_period = if fuse_table.is_transient() {
103+
// For transient table, keep no history data
104+
TimeDelta::zero()
105+
} else {
106+
delta_duration
107+
};
103108

104-
ctx.set_status_info(&format!(
105-
"set lvt for table {} takes {:?}, lvt: {:?}",
106-
fuse_table.get_table_info().desc,
107-
start.elapsed(),
108-
lvt
109-
));
109+
// A zero retention period indicates that we should vacuum all the historical snapshots
110+
is_vacuum_all = retention_period.is_zero();
110111

111-
let start = std::time::Instant::now();
112-
let snapshots_before_lvt = if is_vacuum_all {
113-
list_until_prefix(
114-
fuse_table,
115-
fuse_table
116-
.meta_location_generator()
117-
.snapshot_location_prefix(),
118-
fuse_table.snapshot_loc().unwrap().as_str(),
119-
true,
120-
None,
121-
)
122-
.await?
123-
} else {
124-
list_until_timestamp(
125-
fuse_table,
126-
fuse_table
127-
.meta_location_generator()
128-
.snapshot_location_prefix(),
129-
lvt,
130-
true,
131-
None,
132-
)
133-
.await?
112+
let Some(lvt) = set_lvt(fuse_table, ctx.as_ref(), retention_period).await? else {
113+
return Ok(vec![]);
114+
};
115+
116+
if respect_flash_back {
117+
respect_flash_back_with_lvt = Some(lvt);
118+
}
119+
120+
ctx.set_status_info(&format!(
121+
"set lvt for table {} takes {:?}, lvt: {:?}",
122+
fuse_table.get_table_info().desc,
123+
start.elapsed(),
124+
lvt
125+
));
126+
127+
let snapshots_before_lvt =
128+
collect_gc_candidates_by_retention_period(fuse_table, lvt, is_vacuum_all).await?;
129+
snapshots_before_lvt
130+
}
131+
RetentionPolicy::ByNumOfSnapshotsToKeep(num_snapshots_to_keep) => {
132+
info!(
133+
"using by ByNumOfSnapshotsToKeep policy {:?}",
134+
num_snapshots_to_keep
135+
);
136+
// List the snapshot order by timestamp asc, till the current snapshot(inclusively).
137+
let need_one_more = true;
138+
let mut snapshots = list_until_prefix(
139+
fuse_table,
140+
fuse_table
141+
.meta_location_generator()
142+
.snapshot_location_prefix(),
143+
fuse_table.snapshot_loc().unwrap().as_str(),
144+
need_one_more,
145+
None,
146+
)
147+
.await?;
148+
149+
let len = snapshots.len();
150+
if len <= num_snapshots_to_keep {
151+
// Only the current snapshot is there, done
152+
return Ok(vec![]);
153+
}
154+
if num_snapshots_to_keep == 1 {
155+
// Expecting only one snapshot left, which means that we can use the current snapshot
156+
// as gc root, this flag will be propagated to the select_gc_root func later.
157+
is_vacuum_all = true;
158+
}
159+
160+
// When selecting the GC root later, the last snapshot in `snapshots` (after truncation)
161+
// is the candidate, but its commit status is uncertain, so its previous snapshot is used
162+
// as the GC root instead (except in the is_vacuum_all case).
163+
164+
// Therefore, during snapshot truncation, we keep 2 extra snapshots;
165+
// see `select_gc_root` for details.
166+
let num_candidates = len - num_snapshots_to_keep + 2;
167+
snapshots.truncate(num_candidates);
168+
snapshots
169+
}
134170
};
135171

136172
let elapsed = start.elapsed();
137173
ctx.set_status_info(&format!(
138-
"list snapshots before lvt for table {} takes {:?}, snapshots_dir: {:?}, lvt: {:?}, snapshots: {:?}",
174+
"list snapshots for table {} takes {:?}, snapshots_dir: {:?}, snapshots: {:?}",
139175
fuse_table.get_table_info().desc,
140176
elapsed,
141-
fuse_table.meta_location_generator().snapshot_location_prefix(),
142-
lvt,
177+
fuse_table
178+
.meta_location_generator()
179+
.snapshot_location_prefix(),
143180
slice_summary(&snapshots_before_lvt)
144181
));
145182

@@ -148,9 +185,8 @@ pub async fn do_vacuum2(
148185
fuse_table,
149186
&snapshots_before_lvt,
150187
is_vacuum_all,
151-
respect_flash_back,
188+
respect_flash_back_with_lvt,
152189
ctx.clone().get_abort_checker(),
153-
lvt,
154190
)
155191
.await?
156192
else {
@@ -341,13 +377,45 @@ pub async fn do_vacuum2(
341377
Ok(files_to_gc)
342378
}
343379

380+
async fn collect_gc_candidates_by_retention_period(
381+
fuse_table: &FuseTable,
382+
lvt: DateTime<Utc>,
383+
is_vacuum_all: bool,
384+
) -> Result<Vec<Entry>> {
385+
let snapshots_before_lvt = if is_vacuum_all {
386+
list_until_prefix(
387+
fuse_table,
388+
fuse_table
389+
.meta_location_generator()
390+
.snapshot_location_prefix(),
391+
fuse_table.snapshot_loc().unwrap().as_str(),
392+
true,
393+
None,
394+
)
395+
.await?
396+
} else {
397+
list_until_timestamp(
398+
fuse_table,
399+
fuse_table
400+
.meta_location_generator()
401+
.snapshot_location_prefix(),
402+
lvt,
403+
true,
404+
None,
405+
)
406+
.await?
407+
};
408+
409+
Ok(snapshots_before_lvt)
410+
}
411+
344412
/// Try set lvt as min(latest_snapshot.timestamp, now - retention_time).
345413
///
346414
/// Return `None` means we stop vacuumming, but don't want to report error to user.
347415
async fn set_lvt(
348416
fuse_table: &FuseTable,
349417
ctx: &dyn TableContext,
350-
retention: u64,
418+
retention_period: TimeDelta,
351419
) -> Result<Option<DateTime<Utc>>> {
352420
let Some(latest_snapshot) = fuse_table.read_table_snapshot().await? else {
353421
info!(
@@ -366,7 +434,7 @@ async fn set_lvt(
366434
let cat = ctx.get_default_catalog()?;
367435
// safe to unwrap, as we have checked the version is v5
368436
let latest_ts = latest_snapshot.timestamp.unwrap();
369-
let lvt_point_candidate = std::cmp::min(Utc::now() - Days::new(retention), latest_ts);
437+
let lvt_point_candidate = std::cmp::min(Utc::now() - retention_period, latest_ts);
370438

371439
let lvt_point = cat
372440
.set_table_lvt(
@@ -538,14 +606,13 @@ async fn select_gc_root(
538606
fuse_table: &FuseTable,
539607
snapshots_before_lvt: &[Entry],
540608
is_vacuum_all: bool,
541-
respect_flash_back: bool,
609+
respect_flash_back: Option<DateTime<Utc>>,
542610
abort_checker: AbortChecker,
543-
lvt: DateTime<Utc>,
544611
) -> Result<Option<(Arc<TableSnapshot>, Vec<String>, DateTime<Utc>)>> {
545612
let gc_root_path = if is_vacuum_all {
546613
// safe to unwrap, or we should have stopped vacuuming in set_lvt()
547614
fuse_table.snapshot_loc().unwrap()
548-
} else if respect_flash_back {
615+
} else if let Some(lvt) = respect_flash_back {
549616
let latest_location = fuse_table.snapshot_loc().unwrap();
550617
let gc_root = fuse_table
551618
.find(latest_location, abort_checker, |snapshot| {

src/query/ee_features/vacuum_handler/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ edition = { workspace = true }
1010
[dependencies]
1111
async-backtrace = { workspace = true }
1212
async-trait = { workspace = true }
13-
chrono = { workspace = true }
1413
databend-common-base = { workspace = true }
1514
databend-common-catalog = { workspace = true }
1615
databend-common-exception = { workspace = true }

0 commit comments

Comments
 (0)