Skip to content

Commit

Permalink
Migrate s3 data (#517)
Browse files Browse the repository at this point in the history
* chore(backend): remove media cleanup job

* build(backend): bump version

* refactor(backend): move startup jobs to a function

* feat(backend): migrate s3 images for exercises

* feat(backend): delete copied objects

* fix(backend): copy from correct source

* chore(backend): add logs for migration

* fix(backend): do not copy into itself
  • Loading branch information
IgnisDa authored Dec 15, 2023
1 parent 3a358ac commit 0b811e2
Show file tree
Hide file tree
Showing 5 changed files with 233 additions and 99 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion apps/backend/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ryot"
version = "3.5.3"
version = "3.5.4"
edition = "2021"
repository = "https://github.com/IgnisDa/ryot"
license = "GPL-V3"
Expand Down
4 changes: 0 additions & 4 deletions apps/backend/src/background.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,6 @@ pub async fn media_jobs(_information: ScheduledJob, ctx: JobContext) -> Result<(
.await
.unwrap();
let service = ctx.data::<Arc<MiscellaneousService>>().unwrap();
service
.cleanup_data_without_associated_user_activities()
.await
.unwrap();
tracing::trace!("Checking for updates for media in Watchlist");
service
.update_watchlist_media_and_send_notifications()
Expand Down
245 changes: 231 additions & 14 deletions apps/backend/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,13 @@ use axum::{
routing::{get, post, Router},
Extension, Server,
};
use database::Migrator;
use database::{ExerciseSource, MetadataSource, Migrator};
use itertools::Itertools;
use rs_utils::PROJECT_NAME;
use sea_orm::{ConnectOptions, Database, EntityTrait, PaginatorTrait};
use sea_orm::{
ActiveModelTrait, ActiveValue, ColumnTrait, ConnectOptions, Database, EntityTrait,
PaginatorTrait, QueryFilter,
};
use sea_orm_migration::MigratorTrait;
use sqlx::{pool::PoolOptions, SqlitePool};
use tokio::try_join;
Expand All @@ -39,11 +42,15 @@ use tower_http::{
};
use tracing_appender::non_blocking::WorkerGuard;
use tracing_subscriber::{fmt, layer::SubscriberExt};
use utils::TEMP_DIR;
use utils::{AppServices, TEMP_DIR};

use crate::{
background::{media_jobs, perform_application_job, user_jobs, yank_integrations_data},
entities::prelude::Exercise,
entities::{
exercise, metadata,
prelude::{Exercise, Metadata, Workout},
workout,
},
graphql::get_schema,
models::ExportAllResponse,
routes::{
Expand Down Expand Up @@ -156,21 +163,19 @@ async fn main() -> Result<()> {

let app_services = create_app_services(
db.clone(),
s3_client,
config,
s3_client.clone(),
config.clone(),
&perform_application_job_storage,
tz,
)
.await;

if !cfg!(debug_assertions) && Exercise::find().count(&db).await? == 0 {
tracing::info!("Instance does not have exercises data. Deploying job to download them...");
app_services
.exercise_service
.deploy_update_exercise_library_job()
.await
.unwrap();
}
before_startup_jobs(
&app_services,
&s3_client,
&config.file_storage.s3_bucket_name,
)
.await?;

if cfg!(debug_assertions) {
use schematic::schema::{typescript::TypeScriptRenderer, SchemaGenerator};
Expand Down Expand Up @@ -347,3 +352,215 @@ fn init_tracing() -> Result<WorkerGuard> {
.expect("Unable to set global tracing subscriber");
Ok(guard)
}

async fn before_startup_jobs(
app_services: &AppServices,
s3_client: &aws_sdk_s3::Client,
bkt: &String,
) -> Result<()> {
if !cfg!(debug_assertions)
&& Exercise::find()
.count(&app_services.media_service.db)
.await?
== 0
{
tracing::info!("Instance does not have exercises data. Deploying job to download them...");
app_services
.exercise_service
.deploy_update_exercise_library_job()
.await
.unwrap();
}

tracing::info!("Migrating custom data to S3...");

let all_ex = Exercise::find()
.filter(exercise::Column::Source.eq(ExerciseSource::Custom))
.all(&app_services.media_service.db)
.await?;
for ex in all_ex {
let mut attributes = ex.attributes.clone();
let mut images = vec![];
for image in &ex.attributes.internal_images {
let url = match image {
models::StoredUrl::S3(u) => u,
_ => continue,
};
let dest = if url.contains("uploads/exercises/") {
url.clone()
} else {
url.replace("uploads/", "uploads/exercises/")
};
if url == &dest {
break;
}
images.push(dest.clone());
s3_client
.copy_object()
.copy_source(format!("{}/{}", bkt, url))
.bucket(bkt)
.key(dest)
.send()
.await?;
app_services
.file_storage_service
.delete_object(url.clone())
.await;
}
attributes.internal_images = images.into_iter().map(models::StoredUrl::S3).collect();
let mut to_update: exercise::ActiveModel = ex.into();
to_update.attributes = ActiveValue::Set(attributes);
to_update.update(&app_services.media_service.db).await?;
}

let all_workouts = Workout::find().all(&app_services.media_service.db).await?;
for wkt in all_workouts {
let mut information = wkt.information.clone();
let mut images = vec![];
let mut videos = vec![];
for image in &wkt.information.assets.images {
let dest = if image.contains("uploads/workouts/") {
image.clone()
} else {
image.replace("uploads/", "uploads/workouts/")
};
if image == &dest {
continue;
}
images.push(dest.clone());
s3_client
.copy_object()
.copy_source(format!("{}/{}", bkt, image))
.bucket(bkt)
.key(dest)
.send()
.await?;
app_services
.file_storage_service
.delete_object(image.clone())
.await;
}
for video in &wkt.information.assets.videos {
let dest = if video.contains("uploads/workouts/") {
video.clone()
} else {
video.replace("uploads/", "uploads/workouts/")
};
if video == &dest {
break;
}
videos.push(dest.clone());
s3_client
.copy_object()
.copy_source(format!("{}/{}", bkt, video))
.bucket(bkt)
.key(dest)
.send()
.await?;
app_services
.file_storage_service
.delete_object(video.clone())
.await;
}
for (idx, exercise) in wkt.information.exercises.iter().enumerate() {
let mut images = vec![];
let mut videos = vec![];
for image in &exercise.assets.images {
let dest = if image.contains("uploads/exercises/") {
image.clone()
} else {
image.replace("uploads/", "uploads/exercises/")
};
if image == &dest {
break;
}
images.push(dest.clone());
s3_client
.copy_object()
.copy_source(format!("{}/{}", bkt, image))
.bucket(bkt)
.key(dest)
.send()
.await?;
app_services
.file_storage_service
.delete_object(image.clone())
.await;
}
for video in &exercise.assets.videos {
let dest = if video.contains("uploads/exercises/") {
video.clone()
} else {
video.replace("uploads/", "uploads/exercises/")
};
if video == &dest {
break;
}
videos.push(dest.clone());
s3_client
.copy_object()
.copy_source(format!("{}/{}", bkt, video))
.bucket(bkt)
.key(dest)
.send()
.await?;
app_services
.file_storage_service
.delete_object(video.clone())
.await;
}
information.exercises[idx].assets.images = images;
information.exercises[idx].assets.videos = videos;
}
information.assets.images = images;
information.assets.videos = videos;
let mut to_update: workout::ActiveModel = wkt.into();
to_update.information = ActiveValue::Set(information);
to_update.update(&app_services.media_service.db).await?;
}

let all_meta = Metadata::find()
.filter(metadata::Column::Source.eq(MetadataSource::Custom))
.all(&app_services.media_service.db)
.await?;
for meta in all_meta {
let mut images = vec![];
for image in meta.images.clone().unwrap_or_default().iter_mut() {
let url = match &image.url {
models::StoredUrl::S3(u) => u.clone(),
_ => continue,
};
let dest = if url.contains("uploads/metadata/") {
url.clone()
} else {
url.replace("uploads/", "uploads/metadata/")
};
if url == dest {
break;
}
image.url = models::StoredUrl::S3(dest.clone());
images.push(image.clone());
s3_client
.copy_object()
.copy_source(format!("{}/{}", bkt, url))
.bucket(bkt)
.key(dest)
.send()
.await?;
app_services
.file_storage_service
.delete_object(url.clone())
.await;
}
if images.is_empty() {
continue;
}
let mut to_update: metadata::ActiveModel = meta.into();
to_update.images = ActiveValue::Set(Some(images));
to_update.update(&app_services.media_service.db).await?;
}

tracing::info!("Migrating custom data to S3... Done");

Ok(())
}
79 changes: 0 additions & 79 deletions apps/backend/src/miscellaneous/resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3226,85 +3226,6 @@ impl MiscellaneousService {
Ok(())
}

pub async fn cleanup_data_without_associated_user_activities(&self) -> Result<()> {
tracing::trace!("Cleaning up media items without associated user activities");
let mut all_metadata = Metadata::find().stream(&self.db).await?;
while let Some(metadata) = all_metadata.try_next().await? {
let num_user_associations = UserToEntity::find()
.filter(user_to_entity::Column::MetadataId.eq(metadata.id))
.count(&self.db)
.await
.unwrap();
if num_user_associations == 0 {
metadata.delete(&self.db).await.ok();
}
}
tracing::trace!("Cleaning up genres without associated metadata");
let mut all_genre = Genre::find().stream(&self.db).await?;
while let Some(genre) = all_genre.try_next().await? {
let num_associations = MetadataToGenre::find()
.filter(metadata_to_genre::Column::GenreId.eq(genre.id))
.count(&self.db)
.await
.unwrap();
if num_associations == 0 {
genre.delete(&self.db).await.ok();
}
}
tracing::trace!("Cleaning up people without associated metadata");
let mut all_creators = Person::find().stream(&self.db).await?;
while let Some(creator) = all_creators.try_next().await? {
let num_associations = MetadataToPerson::find()
.filter(metadata_to_person::Column::PersonId.eq(creator.id))
.count(&self.db)
.await
.unwrap();
let num_col_associations = CollectionToEntity::find()
.filter(collection_to_entity::Column::PersonId.eq(creator.id))
.count(&self.db)
.await
.unwrap();
let num_reviews = Review::find()
.filter(review::Column::PersonId.eq(creator.id))
.count(&self.db)
.await
.unwrap();
if num_associations + num_col_associations + num_reviews == 0 {
creator.delete(&self.db).await.ok();
}
}
tracing::trace!("Cleaning up partial metadata without associated metadata");
let mut all_partial_metadata = PartialMetadataModel::find().stream(&self.db).await?;
while let Some(partial_metadata) = all_partial_metadata.try_next().await? {
let num_associations = MetadataToPartialMetadata::find()
.filter(
metadata_to_partial_metadata::Column::PartialMetadataId.eq(partial_metadata.id),
)
.count(&self.db)
.await
.unwrap();
let num_person_associations = PersonToPartialMetadata::find()
.filter(
person_to_partial_metadata::Column::PartialMetadataId.eq(partial_metadata.id),
)
.count(&self.db)
.await
.unwrap();
let num_group_associations = PartialMetadataToMetadataGroup::find()
.filter(
partial_metadata_to_metadata_group::Column::PartialMetadataId
.eq(partial_metadata.id),
)
.count(&self.db)
.await
.unwrap();
if num_associations + num_person_associations + num_group_associations == 0 {
partial_metadata.delete(&self.db).await.ok();
}
}
Ok(())
}

pub async fn deploy_update_metadata_job(&self, metadata_id: i32) -> Result<String> {
let metadata = Metadata::find_by_id(metadata_id)
.one(&self.db)
Expand Down

0 comments on commit 0b811e2

Please sign in to comment.