Skip to content

Commit b9d691d

Browse files
committed
Automatically create and monitor partitions for version_downloads
This adds a new background job which is responsible for creating new partitions for `version_downloads` 1 year in advance. The intent here is to give an extremely large window to notice if for some reason this job stops running. In the extremely unlikely event that a full year passes, data will start going in the default partition and we will get paged. Recovering from this will be a massive pain, but I've tried to ensure it'll never happen.
1 parent 5583ba3 commit b9d691d

8 files changed

+202
-11
lines changed

diesel.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ patch_file = "src/schema.patch"
99
[print_schema.filter]
1010
except_tables = [
1111
"version_downloads_archive",
12-
"version_downloads_default",
1312
"version_downloads_pre_2017",
1413
"version_downloads_2017",
1514
"version_downloads_2018_q1",

src/bin/enqueue-job.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ fn main() -> Result<(), Error> {
1111
println!("Enqueueing background job: {}", job);
1212

1313
match &*job {
14+
"generate_version_downloads_partition" => {
15+
Ok(tasks::generate_version_downloads_partition().enqueue(&conn)?)
16+
}
1417
"update_downloads" => Ok(tasks::update_downloads().enqueue(&conn)?),
1518
"dump_db" => {
1619
let database_url = args.next().unwrap_or_else(|| env("READ_ONLY_REPLICA_URL"));

src/bin/monitor.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ fn main() -> Result<(), Error> {
1616

1717
check_stalled_background_jobs(&conn)?;
1818
check_spam_attack(&conn)?;
19+
check_default_version_downloads_partition(&conn)?;
1920
Ok(())
2021
}
2122

@@ -116,6 +117,34 @@ fn check_spam_attack(conn: &PgConnection) -> Result<(), Error> {
116117
Ok(())
117118
}
118119

120+
fn check_default_version_downloads_partition(conn: &PgConnection) -> Result<(), Error> {
121+
use cargo_registry::schema::version_downloads_default::dsl::*;
122+
123+
const EVENT_KEY: &str = "version_downloads_missing_partition";
124+
125+
println!("Checking for data in the default `version_downloads` partition");
126+
let version_downloads_in_default_partition =
127+
version_downloads_default.count().get_result::<i64>(conn)?;
128+
129+
let event = if version_downloads_in_default_partition > 0 {
130+
on_call::Event::Trigger {
131+
incident_key: Some(EVENT_KEY.into()),
132+
description: format!(
133+
"{} rows exist in the default `version_downloads` partition",
134+
version_downloads_in_default_partition
135+
),
136+
}
137+
} else {
138+
on_call::Event::Resolve {
139+
incident_key: EVENT_KEY.into(),
140+
description: Some("No records in default `version_downloads` partition".into()),
141+
}
142+
};
143+
144+
log_and_trigger_event(event)?;
145+
Ok(())
146+
}
147+
119148
fn log_and_trigger_event(event: on_call::Event) -> Result<(), Error> {
120149
match event {
121150
on_call::Event::Trigger {

src/schema.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -866,6 +866,41 @@ table! {
866866
}
867867
}
868868

869+
table! {
870+
use diesel::sql_types::*;
871+
use diesel_full_text_search::{TsVector as Tsvector};
872+
873+
/// Representation of the `version_downloads_default` table.
874+
///
875+
/// (Automatically generated by Diesel.)
876+
version_downloads_default (version_id, date) {
877+
/// The `version_id` column of the `version_downloads_default` table.
878+
///
879+
/// Its SQL type is `Int4`.
880+
///
881+
/// (Automatically generated by Diesel.)
882+
version_id -> Int4,
883+
/// The `downloads` column of the `version_downloads_default` table.
884+
///
885+
/// Its SQL type is `Int4`.
886+
///
887+
/// (Automatically generated by Diesel.)
888+
downloads -> Int4,
889+
/// The `counted` column of the `version_downloads_default` table.
890+
///
891+
/// Its SQL type is `Int4`.
892+
///
893+
/// (Automatically generated by Diesel.)
894+
counted -> Int4,
895+
/// The `date` column of the `version_downloads_default` table.
896+
///
897+
/// Its SQL type is `Date`.
898+
///
899+
/// (Automatically generated by Diesel.)
900+
date -> Date,
901+
}
902+
}
903+
869904
table! {
870905
use diesel::sql_types::*;
871906
use diesel_full_text_search::{TsVector as Tsvector};
@@ -1066,6 +1101,7 @@ allow_tables_to_appear_in_same_query!(
10661101
users,
10671102
version_authors,
10681103
version_downloads,
1104+
version_downloads_default,
10691105
version_owner_actions,
10701106
versions,
10711107
versions_published_by,

src/tasks.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
pub mod dump_db;
2+
mod generate_version_downloads_partition;
3+
#[cfg(test)]
4+
mod test_helpers;
25
mod update_downloads;
36

47
pub use dump_db::dump_db;
8+
pub use generate_version_downloads_partition::generate_version_downloads_partition;
59
pub use update_downloads::update_downloads;
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
use crate::background_jobs::Environment;
2+
3+
use chrono::{Datelike, Duration, NaiveDate, Utc};
4+
use diesel::prelude::*;
5+
use diesel::sql_query;
6+
use swirl::PerformError;
7+
8+
table! {
9+
information_schema.tables (table_schema, table_name) {
10+
table_schema -> Text,
11+
table_name -> Text,
12+
table_type -> Text,
13+
}
14+
}
15+
16+
#[derive(PartialEq, Debug)]
17+
struct Quarter {
18+
start: NaiveDate,
19+
end: NaiveDate,
20+
num: u32,
21+
}
22+
23+
impl Quarter {
24+
fn from_date(date: NaiveDate) -> Option<Self> {
25+
let num = (date.month() + 2) / 3;
26+
let start = date.with_day(1)?.with_month(num * 3 - 2)?;
27+
let end = (start + Duration::days(93)).with_day(1)?;
28+
Some(Self { num, start, end })
29+
}
30+
}
31+
32+
#[swirl::background_job]
33+
pub fn generate_version_downloads_partition(env: &Environment) -> Result<(), PerformError> {
34+
let conn = env.connection()?;
35+
generate_partition(&conn, Utc::today().naive_utc() + Duration::days(365))?;
36+
Ok(())
37+
}
38+
39+
fn generate_partition(conn: &PgConnection, today: NaiveDate) -> QueryResult<()> {
40+
let quarter = Quarter::from_date(today).expect("could not determine start/end of quarter");
41+
let table_name = format!("version_downloads_{}_q{}", today.year(), quarter.num);
42+
43+
if table_exists(conn, &table_name)? {
44+
Ok(())
45+
} else {
46+
sql_query(format!(
47+
"CREATE TABLE {} PARTITION OF version_downloads \
48+
FOR VALUES FROM ('{}') TO ('{}')",
49+
table_name, quarter.start, quarter.end
50+
))
51+
.execute(conn)?;
52+
Ok(())
53+
}
54+
}
55+
56+
fn table_exists(conn: &PgConnection, name: &str) -> QueryResult<bool> {
57+
use self::tables::dsl::*;
58+
use diesel::dsl::{exists, select};
59+
60+
select(exists(
61+
tables
62+
.filter(table_schema.eq("public"))
63+
.filter(table_name.eq(name))
64+
.filter(table_type.like("BASE TABLE")),
65+
))
66+
.get_result(conn)
67+
}
68+
69+
#[cfg(test)]
70+
mod tests {
71+
use super::super::test_helpers::*;
72+
use super::*;
73+
74+
#[test]
75+
fn generate_partition_creates_table_if_it_doesnt_exist() {
76+
let conn = conn();
77+
78+
drop_table_if_exists(&conn, "version_downloads_2018_q1").unwrap();
79+
drop_table_if_exists(&conn, "version_downloads_2019_q2").unwrap();
80+
81+
assert!(!table_exists(&conn, "version_downloads_2018_q1").unwrap());
82+
assert!(!table_exists(&conn, "version_downloads_2019_q2").unwrap());
83+
84+
let q1_2018 = NaiveDate::from_ymd(2018, 2, 14);
85+
let q2_2019 = NaiveDate::from_ymd(2019, 4, 1);
86+
generate_partition(&conn, q1_2018).unwrap();
87+
88+
assert!(table_exists(&conn, "version_downloads_2018_q1").unwrap());
89+
assert!(!table_exists(&conn, "version_downloads_2019_q2").unwrap());
90+
91+
generate_partition(&conn, q2_2019).unwrap();
92+
93+
assert!(table_exists(&conn, "version_downloads_2018_q1").unwrap());
94+
assert!(table_exists(&conn, "version_downloads_2019_q2").unwrap());
95+
}
96+
97+
#[test]
98+
fn quarter_from_date() {
99+
let q1_2018 = NaiveDate::from_ymd(2018, 2, 14);
100+
let q4_2018 = NaiveDate::from_ymd(2018, 12, 1);
101+
let q1 = Quarter {
102+
start: NaiveDate::from_ymd(2018, 1, 1),
103+
end: NaiveDate::from_ymd(2018, 4, 1),
104+
num: 1,
105+
};
106+
let q4 = Quarter {
107+
start: NaiveDate::from_ymd(2018, 10, 1),
108+
end: NaiveDate::from_ymd(2019, 1, 1),
109+
num: 4,
110+
};
111+
112+
assert_eq!(Some(q1), Quarter::from_date(q1_2018));
113+
assert_eq!(Some(q4), Quarter::from_date(q4_2018));
114+
}
115+
116+
fn drop_table_if_exists(conn: &PgConnection, table_name: &str) -> QueryResult<()> {
117+
sql_query(&format!("DROP TABLE IF EXISTS {}", table_name)).execute(conn)?;
118+
Ok(())
119+
}
120+
}

src/tasks/test_helpers.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
use crate::env;
2+
use diesel::prelude::*;
3+
4+
pub(crate) fn conn() -> PgConnection {
5+
let conn = PgConnection::establish(&env("TEST_DATABASE_URL")).unwrap();
6+
conn.begin_test_transaction().unwrap();
7+
conn
8+
}

src/tasks/update_downloads.rs

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -74,19 +74,11 @@ fn collect(conn: &PgConnection, rows: &[VersionDownload]) -> QueryResult<()> {
7474

7575
#[cfg(test)]
7676
mod test {
77+
use super::super::test_helpers::*;
7778
use super::*;
78-
use crate::{
79-
env,
80-
models::{Crate, NewCrate, NewUser, NewVersion, User, Version},
81-
};
79+
use crate::models::{Crate, NewCrate, NewUser, NewVersion, User, Version};
8280
use std::collections::HashMap;
8381

84-
fn conn() -> PgConnection {
85-
let conn = PgConnection::establish(&env("TEST_DATABASE_URL")).unwrap();
86-
conn.begin_test_transaction().unwrap();
87-
conn
88-
}
89-
9082
fn user(conn: &PgConnection) -> User {
9183
NewUser::new(2, "login", None, None, "access_token")
9284
.create_or_update(None, conn)

0 commit comments

Comments
 (0)