Skip to content

Commit ade9a8e

Browse files
committed
Auto merge of #1670 - sgrif:sg-read-only-db, r=jtgeibel
Allow the database to be set in read only mode This is in preparation for our next database upgrade. Last time we needed to do this, we took the whole site down. This means that for 30 minutes, `cargo build` just didn't work. This is no longer something we consider acceptable. While we could just take everything down *except* `cargo build`, we can do better. The upgrade process is basically: - Set up a new replica of the primary database - Wait until the replica is "caught up" (< 200 commits behind) - Take down the site - Upgrade the replica in place - Verify the upgrade was successful - Swap over to the replica - Bring the site back up There's no reason that we actually need to bring the site down though, we just need to stop writing to the primary during the upgrade process. This does that by setting the default transaction mode to READ ONLY, and handling the error we will get back if we try to do a write. Unfortunately, Diesel doesn't expose any API to give us the underlying database error code, so we have to match on the error message instead. This code assumes that: - We are never manually setting a transaction to `READ WRITE` - We are never manually setting a transaction to `READ ONLY`, and expecting write errors to bubble up to the user We're not currently doing either of those things, and I can't imagine that we'll start doing either one in the future. During testing I found some issues with how this interacts with token auth. The middleware was assuming that *any* error from `User::find_by_api_token` was `diesel::NotFound`, and would proceed to run the request (this finally explains a spurious failure I've been getting about a poisoned transaction, it was probably a deadlock in the API key update) Because our tests run in a single transaction, we need to wrap that update to ensure that the error doesn't poison the transaction. If we didn't do anything else, we'd just assume that any user trying to do something with token auth was not logged in, and would give a 403 instead of a 503. To fix this, I've set it up to fall back to a simple find if the update fails. This does *not* make the download endpoint work when we're in read only mode, there is a separate issue for that which will come in a separate PR. I have, however, added an explicit test to ensure it works in read only mode.
2 parents 1b42e4c + 1759db6 commit ade9a8e

File tree

9 files changed

+162
-23
lines changed

9 files changed

+162
-23
lines changed

src/app.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,14 +69,19 @@ impl App {
6969
(_, Env::Test) => 1,
7070
_ => 30,
7171
};
72+
let read_only_mode = dotenv::var("READ_ONLY_MODE").is_ok();
73+
let connection_config = db::ConnectionConfig {
74+
statement_timeout: db_connection_timeout,
75+
read_only: read_only_mode,
76+
};
7277

7378
let thread_pool = Arc::new(ScheduledThreadPool::new(db_helper_threads));
7479

7580
let diesel_db_config = r2d2::Pool::builder()
7681
.max_size(db_pool_size)
7782
.min_idle(db_min_idle)
7883
.connection_timeout(Duration::from_secs(db_connection_timeout))
79-
.connection_customizer(Box::new(db::SetStatementTimeout(db_connection_timeout)))
84+
.connection_customizer(Box::new(connection_config))
8085
.thread_pool(thread_pool);
8186

8287
App {

src/db.rs

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -100,15 +100,26 @@ impl<T: Request + ?Sized> RequestTransaction for T {
100100
}
101101

102102
#[derive(Debug, Clone, Copy)]
103-
pub struct SetStatementTimeout(pub u64);
103+
pub struct ConnectionConfig {
104+
pub statement_timeout: u64,
105+
pub read_only: bool,
106+
}
104107

105-
impl CustomizeConnection<PgConnection, r2d2::Error> for SetStatementTimeout {
108+
impl CustomizeConnection<PgConnection, r2d2::Error> for ConnectionConfig {
106109
fn on_acquire(&self, conn: &mut PgConnection) -> Result<(), r2d2::Error> {
107110
use diesel::sql_query;
108111

109-
sql_query(format!("SET statement_timeout = {}", self.0 * 1000))
110-
.execute(conn)
111-
.map_err(r2d2::Error::QueryError)?;
112+
sql_query(format!(
113+
"SET statement_timeout = {}",
114+
self.statement_timeout * 1000
115+
))
116+
.execute(conn)
117+
.map_err(r2d2::Error::QueryError)?;
118+
if self.read_only {
119+
sql_query("SET default_transaction_read_only = 't'")
120+
.execute(conn)
121+
.map_err(r2d2::Error::QueryError)?;
122+
}
112123
Ok(())
113124
}
114125
}

src/git.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use url::Url;
1212
use crate::background_jobs::Environment;
1313
use crate::models::{DependencyKind, Version};
1414
use crate::schema::versions;
15-
use crate::util::errors::{internal, std_error_no_send, CargoResult};
15+
use crate::util::errors::{std_error_no_send, CargoError, CargoResult};
1616

1717
#[derive(Serialize, Deserialize, Debug)]
1818
pub struct Crate {
@@ -159,7 +159,9 @@ impl Job for AddCrate {
159159
}
160160

161161
pub fn add_crate(conn: &PgConnection, krate: Crate) -> CargoResult<()> {
162-
AddCrate { krate }.enqueue(conn).map_err(|e| internal(&e))
162+
AddCrate { krate }
163+
.enqueue(conn)
164+
.map_err(|e| CargoError::from_std_error(e))
163165
}
164166

165167
#[derive(Serialize, Deserialize)]
@@ -239,5 +241,5 @@ pub fn yank(conn: &PgConnection, krate: String, version: Version, yanked: bool)
239241
yanked,
240242
}
241243
.enqueue(conn)
242-
.map_err(|e| internal(&e))
244+
.map_err(|e| CargoError::from_std_error(e))
243245
}

src/middleware/current_user.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,9 @@ impl Middleware for CurrentUser {
4545
// Otherwise, look for an `Authorization` header on the request
4646
// and try to find a user in the database with a matching API token
4747
let user = if let Some(headers) = req.headers().find("Authorization") {
48-
User::find_by_api_token(&conn, headers[0]).ok()
48+
User::find_by_api_token(&conn, headers[0])
49+
.optional()
50+
.map_err(|e| Box::new(e) as Box<dyn Error + Send>)?
4951
} else {
5052
None
5153
};

src/middleware/run_pending_background_jobs.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@ impl Middleware for RunPendingBackgroundJobs {
1313
req: &mut dyn Request,
1414
res: Result<Response, Box<dyn Error + Send>>,
1515
) -> Result<Response, Box<dyn Error + Send>> {
16+
if response_is_error(&res) {
17+
return res;
18+
}
19+
1620
let app = req.app();
1721
let connection_pool = app.diesel_database.clone();
1822
let repo = Repository::open(&app.config.index_location).expect("Could not clone index");
@@ -28,3 +32,10 @@ impl Middleware for RunPendingBackgroundJobs {
2832
res
2933
}
3034
}
35+
36+
fn response_is_error(res: &Result<Response, Box<dyn Error + Send>>) -> bool {
37+
match res {
38+
Ok(res) => res.status.0 >= 400,
39+
Err(_) => true,
40+
}
41+
}

src/models/user.rs

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -106,18 +106,26 @@ impl<'a> NewUser<'a> {
106106

107107
impl User {
108108
/// Queries the database for a user with a certain `api_token` value.
109-
pub fn find_by_api_token(conn: &PgConnection, token_: &str) -> CargoResult<User> {
109+
pub fn find_by_api_token(conn: &PgConnection, token_: &str) -> QueryResult<User> {
110110
use crate::schema::api_tokens::dsl::{api_tokens, last_used_at, revoked, token, user_id};
111-
use crate::schema::users::dsl::{id, users};
112111
use diesel::update;
112+
113113
let tokens = api_tokens
114114
.filter(token.eq(token_))
115115
.filter(revoked.eq(false));
116-
let user_id_ = update(tokens)
117-
.set(last_used_at.eq(now.nullable()))
118-
.returning(user_id)
119-
.get_result::<i32>(conn)?;
120-
Ok(users.filter(id.eq(user_id_)).get_result(conn)?)
116+
117+
// If the database is in read only mode, we can't update last_used_at.
118+
// Try updating in a new transaction, if that fails, fall back to reading
119+
let user_id_ = conn
120+
.transaction(|| {
121+
update(tokens)
122+
.set(last_used_at.eq(now.nullable()))
123+
.returning(user_id)
124+
.get_result::<i32>(conn)
125+
})
126+
.or_else(|_| tokens.select(user_id).first(conn))?;
127+
128+
users::table.find(user_id_).first(conn)
121129
}
122130

123131
pub fn owning(krate: &Crate, conn: &PgConnection) -> CargoResult<Vec<Owner>> {

src/tests/all.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ mod git;
6868
mod keyword;
6969
mod krate;
7070
mod owners;
71+
mod read_only_mode;
7172
mod record;
7273
mod schema_details;
7374
mod server;

src/tests/read_only_mode.rs

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
use crate::builders::CrateBuilder;
2+
use crate::{RequestHelper, TestApp};
3+
use diesel::prelude::*;
4+
5+
#[test]
6+
fn can_hit_read_only_endpoints_in_read_only_mode() {
7+
let (app, anon) = TestApp::init().empty();
8+
app.db(set_read_only).unwrap();
9+
anon.get::<()>("/api/v1/crates").assert_status(200);
10+
}
11+
12+
#[test]
13+
fn cannot_hit_endpoint_which_writes_db_in_read_only_mode() {
14+
let (app, _, user, token) = TestApp::init().with_token();
15+
app.db(|conn| {
16+
CrateBuilder::new("foo_yank_read_only", user.as_model().id)
17+
.version("1.0.0")
18+
.expect_build(conn);
19+
set_read_only(conn).unwrap();
20+
});
21+
token
22+
.delete::<()>("/api/v1/crates/foo_yank_read_only/1.0.0/yank")
23+
.assert_status(503);
24+
}
25+
26+
#[test]
27+
#[ignore] // Will be implicitly fixed by #1387, no need to special case here
28+
fn can_download_crate_in_read_only_mode() {
29+
let (app, anon, user) = TestApp::with_proxy().with_user();
30+
31+
app.db(|conn| {
32+
CrateBuilder::new("foo_download_read_only", user.as_model().id)
33+
.version("1.0.0")
34+
.expect_build(conn);
35+
set_read_only(conn).unwrap();
36+
});
37+
38+
anon.get::<()>("/api/v1/crates/foo_download_read_only/1.0.0/download")
39+
.assert_status(302);
40+
41+
// We're in read only mode so the download should not have been counted
42+
app.db(|conn| {
43+
use cargo_registry::schema::version_downloads::dsl::*;
44+
use diesel::dsl::sum;
45+
46+
let dl_count = version_downloads
47+
.select(sum(downloads))
48+
.get_result::<Option<i64>>(conn);
49+
assert_eq!(Ok(None), dl_count);
50+
})
51+
}
52+
53+
fn set_read_only(conn: &PgConnection) -> QueryResult<()> {
54+
diesel::sql_query("SET TRANSACTION READ ONLY").execute(conn)?;
55+
Ok(())
56+
}

src/util/errors.rs

Lines changed: 49 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,22 @@ impl dyn CargoError {
4949
pub fn is<T: Any>(&self) -> bool {
5050
self.get_type_id() == TypeId::of::<T>()
5151
}
52+
53+
pub fn from_std_error(err: Box<dyn Error + Send>) -> Box<dyn CargoError> {
54+
Self::try_convert(&*err).unwrap_or_else(|| internal(&err))
55+
}
56+
57+
fn try_convert(err: &(dyn Error + Send + 'static)) -> Option<Box<Self>> {
58+
match err.downcast_ref() {
59+
Some(DieselError::NotFound) => Some(Box::new(NotFound)),
60+
Some(DieselError::DatabaseError(_, info))
61+
if info.message().ends_with("read-only transaction") =>
62+
{
63+
Some(Box::new(ReadOnlyMode))
64+
}
65+
_ => None,
66+
}
67+
}
5268
}
5369

5470
impl CargoError for Box<dyn CargoError> {
@@ -155,13 +171,9 @@ impl<E: Error + Send + 'static> CargoError for E {
155171
}
156172
}
157173

158-
impl<E: Any + Error + Send + 'static> From<E> for Box<dyn CargoError> {
174+
impl<E: Error + Send + 'static> From<E> for Box<dyn CargoError> {
159175
fn from(err: E) -> Box<dyn CargoError> {
160-
if let Some(DieselError::NotFound) = Any::downcast_ref::<DieselError>(&err) {
161-
Box::new(NotFound)
162-
} else {
163-
Box::new(err)
164-
}
176+
CargoError::try_convert(&err).unwrap_or_else(|| Box::new(err))
165177
}
166178
}
167179
// =============================================================================
@@ -340,3 +352,34 @@ pub fn std_error(e: Box<dyn CargoError>) -> Box<dyn Error + Send> {
340352
pub fn std_error_no_send(e: Box<dyn CargoError>) -> Box<dyn Error> {
341353
Box::new(CargoErrToStdErr(e))
342354
}
355+
356+
#[derive(Debug, Clone, Copy)]
357+
pub struct ReadOnlyMode;
358+
359+
impl CargoError for ReadOnlyMode {
360+
fn description(&self) -> &str {
361+
"tried to write in read only mode"
362+
}
363+
364+
fn response(&self) -> Option<Response> {
365+
let mut response = json_response(&Bad {
366+
errors: vec![StringError {
367+
detail: "Crates.io is currently in read-only mode for maintenance. \
368+
Please try again later."
369+
.to_string(),
370+
}],
371+
});
372+
response.status = (503, "Service Unavailable");
373+
Some(response)
374+
}
375+
376+
fn human(&self) -> bool {
377+
true
378+
}
379+
}
380+
381+
impl fmt::Display for ReadOnlyMode {
382+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
383+
"Tried to write in read only mode".fmt(f)
384+
}
385+
}

0 commit comments

Comments
 (0)