Skip to content

Commit a10aa0e

Browse files
authored
Skip running a job if the crate/version deleted (#11500)
1 parent 6a264e4 commit a10aa0e

File tree

7 files changed

+123
-15
lines changed

7 files changed

+123
-15
lines changed

src/tests/worker/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
11
mod git;
2+
mod readmes;
23
mod rss;
4+
mod send_publish_notifications;
35
mod sync_admins;
6+
mod update_default_version;

src/tests/worker/readmes.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
use crate::tests::util::TestApp;
2+
use crate::worker::jobs;
3+
use crates_io_worker::BackgroundJob;
4+
5+
#[tokio::test(flavor = "multi_thread")]
6+
async fn skips_when_crate_deleted() -> anyhow::Result<()> {
7+
let (app, _) = TestApp::full().empty().await;
8+
let mut conn = app.db_conn().await;
9+
10+
let job =
11+
jobs::RenderAndUploadReadme::new(-1, "deleted".to_string(), ".".to_string(), None, None);
12+
13+
job.enqueue(&mut conn).await?;
14+
app.run_pending_background_jobs().await;
15+
16+
Ok(())
17+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
use crate::tests::util::TestApp;
2+
use crate::worker::jobs;
3+
use crates_io_worker::BackgroundJob;
4+
5+
#[tokio::test(flavor = "multi_thread")]
6+
async fn skips_when_crate_deleted() -> anyhow::Result<()> {
7+
let (app, _) = TestApp::full().empty().await;
8+
let mut conn = app.db_conn().await;
9+
10+
let job = jobs::SendPublishNotificationsJob::new(-1);
11+
12+
job.enqueue(&mut conn).await?;
13+
app.run_pending_background_jobs().await;
14+
15+
Ok(())
16+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
use crate::tests::util::TestApp;
2+
use crate::worker::jobs;
3+
use crates_io_worker::BackgroundJob;
4+
5+
#[tokio::test(flavor = "multi_thread")]
6+
async fn skips_when_crate_deleted() -> anyhow::Result<()> {
7+
let (app, _) = TestApp::full().empty().await;
8+
let mut conn = app.db_conn().await;
9+
10+
let job = jobs::UpdateDefaultVersion::new(-1);
11+
12+
job.enqueue(&mut conn).await?;
13+
app.run_pending_background_jobs().await;
14+
15+
Ok(())
16+
}

src/worker/jobs/readmes.rs

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,13 @@ use crate::tasks::spawn_blocking;
55
use crate::worker::Environment;
66
use crates_io_markdown::text_to_html;
77
use crates_io_worker::BackgroundJob;
8+
use diesel::result::DatabaseErrorKind;
9+
use diesel::result::Error::DatabaseError;
810
use diesel_async::AsyncConnection;
911
use diesel_async::scoped_futures::ScopedFutureExt;
1012
use serde::{Deserialize, Serialize};
1113
use std::sync::Arc;
12-
use tracing::{info, instrument};
14+
use tracing::{info, instrument, warn};
1315

1416
#[derive(Clone, Serialize, Deserialize)]
1517
pub struct RenderAndUploadReadme {
@@ -70,13 +72,39 @@ impl BackgroundJob for RenderAndUploadReadme {
7072
let mut conn = env.deadpool.get().await?;
7173
conn.transaction(|conn| {
7274
async move {
73-
Version::record_readme_rendering(job.version_id, conn).await?;
74-
let (crate_name, vers): (String, String) = versions::table
75+
match Version::record_readme_rendering(job.version_id, conn).await {
76+
Ok(_) => {}
77+
Err(DatabaseError(DatabaseErrorKind::ForeignKeyViolation, ..)) => {
78+
warn!(
79+
"Skipping README rendering recording for version {}: version not found",
80+
job.version_id
81+
);
82+
return Ok(());
83+
}
84+
Err(err) => {
85+
warn!(
86+
"Failed to record README rendering for version {}: {err}",
87+
job.version_id,
88+
);
89+
return Err(err.into());
90+
}
91+
}
92+
93+
let result = versions::table
7594
.find(job.version_id)
7695
.inner_join(crates::table)
7796
.select((crates::name, versions::num))
78-
.first(conn)
79-
.await?;
97+
.first::<(String, String)>(conn)
98+
.await
99+
.optional()?;
100+
101+
let Some((crate_name, vers)) = result else {
102+
warn!(
103+
"Skipping README rendering for version {}: version not found",
104+
job.version_id
105+
);
106+
return Ok(());
107+
};
80108

81109
tracing::Span::current().record("krate.name", tracing::field::display(&crate_name));
82110

src/worker/jobs/send_publish_notifications.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,12 @@ impl BackgroundJob for SendPublishNotificationsJob {
3939
let mut conn = ctx.deadpool.get().await?;
4040

4141
// Get crate name, version and other publish details
42-
let publish_details = PublishDetails::for_version(version_id, &mut conn).await?;
42+
let Some(publish_details) = PublishDetails::for_version(version_id, &mut conn).await?
43+
else {
44+
warn!("Skipping publish notifications for {version_id}: no version found");
45+
46+
return Ok(());
47+
};
4348

4449
let publish_time = publish_details
4550
.publish_time
@@ -168,13 +173,17 @@ struct PublishDetails {
168173
}
169174

170175
impl PublishDetails {
171-
async fn for_version(version_id: i32, conn: &mut AsyncPgConnection) -> QueryResult<Self> {
176+
async fn for_version(
177+
version_id: i32,
178+
conn: &mut AsyncPgConnection,
179+
) -> QueryResult<Option<Self>> {
172180
versions::table
173181
.find(version_id)
174182
.inner_join(crates::table)
175183
.left_join(users::table)
176184
.select(PublishDetails::as_select())
177185
.first(conn)
178186
.await
187+
.optional()
179188
}
180189
}

src/worker/jobs/update_default_version.rs

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use diesel::prelude::*;
77
use diesel_async::RunQueryDsl;
88
use serde::{Deserialize, Serialize};
99
use std::sync::Arc;
10-
use tracing::info;
10+
use tracing::{info, warn};
1111

1212
#[derive(Serialize, Deserialize)]
1313
pub struct UpdateDefaultVersion {
@@ -32,18 +32,37 @@ impl BackgroundJob for UpdateDefaultVersion {
3232

3333
info!("Updating default version for crate {crate_id}");
3434
let mut conn = ctx.deadpool.get().await?;
35-
update_default_version(crate_id, &mut conn).await?;
35+
36+
match update_default_version(crate_id, &mut conn).await {
37+
Ok(_) => {
38+
info!("Successfully updated default version for crate {crate_id}");
39+
}
40+
Err(diesel::result::Error::NotFound) => {
41+
warn!("Skipping default version update for crate {crate_id}: crate not found");
42+
return Ok(());
43+
}
44+
Err(err) => {
45+
warn!("Failed to update default version for crate {crate_id}: {err}");
46+
return Err(err.into());
47+
}
48+
}
3649

3750
// Get the crate name for OG image generation
38-
let crate_name: String = crates::table
51+
let crate_name = crates::table
3952
.filter(crates::id.eq(crate_id))
4053
.select(crates::name)
41-
.first(&mut conn)
42-
.await?;
54+
.first::<String>(&mut conn)
55+
.await
56+
.optional()?;
4357

44-
// Generate OG image after updating default version
45-
info!("Enqueueing OG image generation for crate {crate_name}");
46-
GenerateOgImage::new(crate_name).enqueue(&mut conn).await?;
58+
if let Some(crate_name) = crate_name {
59+
// Generate OG image after updating default version
60+
info!("Enqueueing OG image generation for crate {crate_name}");
61+
GenerateOgImage::new(crate_name).enqueue(&mut conn).await?;
62+
} else {
63+
warn!("No crate found for ID {crate_id}, skipping OG image generation");
64+
return Ok(());
65+
}
4766

4867
Ok(())
4968
}

0 commit comments

Comments
 (0)