@@ -166,6 +166,7 @@ use value::{
166
166
use crate :: {
167
167
export_worker:: FileStorageZipMetadata ,
168
168
metrics:: {
169
+ log_snapshot_import_age,
169
170
log_worker_starting,
170
171
snapshot_import_timer,
171
172
} ,
@@ -177,6 +178,10 @@ static IMPORT_SIZE_LIMIT: LazyLock<String> =
177
178
178
179
const INITIAL_BACKOFF : Duration = Duration :: from_secs ( 1 ) ;
179
180
const MAX_BACKOFF : Duration = Duration :: from_secs ( 60 ) ;
181
+ // If an import is taking longer than a day, it's a problem (and our fault).
182
+ // But the customer is probably no longer waiting so we should fail the import.
183
+ // If an import takes more than a week, the file may be deleted from S3.
184
+ const MAX_IMPORT_AGE : Duration = Duration :: from_secs ( 24 * 60 * 60 ) ;
180
185
181
186
pub struct SnapshotImportWorker < RT : Runtime > {
182
187
runtime : RT ,
@@ -591,6 +596,25 @@ impl<RT: Runtime> SnapshotImportWorker<RT> {
591
596
& mut self ,
592
597
snapshot_import : ParsedDocument < SnapshotImport > ,
593
598
) -> anyhow:: Result < ( Timestamp , usize ) > {
599
+ if let Some ( creation_time) = snapshot_import. creation_time ( ) {
600
+ let now = CreationTime :: try_from ( * self . database . now_ts_for_reads ( ) ) ?;
601
+ let age = Duration :: from_millis ( ( f64:: from ( now) - f64:: from ( creation_time) ) as u64 ) ;
602
+ log_snapshot_import_age ( age) ;
603
+ if age > MAX_IMPORT_AGE / 2 {
604
+ tracing:: warn!(
605
+ "SnapshotImport {} running too long ({:?})" ,
606
+ snapshot_import. id( ) ,
607
+ age
608
+ ) ;
609
+ }
610
+ if age > MAX_IMPORT_AGE {
611
+ anyhow:: bail!( ErrorMetadata :: bad_request(
612
+ "ImportFailed" ,
613
+ "Import took too long. Try again or contact Convex."
614
+ ) ) ;
615
+ }
616
+ }
617
+
594
618
let ( initial_schemas, objects) = self . parse_import ( snapshot_import. id ( ) ) . await ?;
595
619
596
620
let usage = FunctionUsageTracker :: new ( ) ;
@@ -1159,7 +1183,7 @@ fn wrap_import_err(e: anyhow::Error) -> anyhow::Error {
1159
1183
}
1160
1184
}
1161
1185
1162
- async fn wait_for_export_worker < RT : Runtime > (
1186
+ async fn wait_for_import_worker < RT : Runtime > (
1163
1187
application : & Application < RT > ,
1164
1188
identity : Identity ,
1165
1189
import_id : DocumentIdV6 ,
@@ -1201,7 +1225,7 @@ pub async fn do_import<RT: Runtime>(
1201
1225
let import_id =
1202
1226
upload_import_file ( application, identity. clone ( ) , format, mode, body_stream) . await ?;
1203
1227
1204
- let snapshot_import = wait_for_export_worker ( application, identity. clone ( ) , import_id) . await ?;
1228
+ let snapshot_import = wait_for_import_worker ( application, identity. clone ( ) , import_id) . await ?;
1205
1229
match & snapshot_import. state {
1206
1230
ImportState :: Uploaded | ImportState :: InProgress { .. } | ImportState :: Completed { .. } => {
1207
1231
anyhow:: bail!( "should be WaitingForConfirmation, is {snapshot_import:?}" )
@@ -1214,7 +1238,7 @@ pub async fn do_import<RT: Runtime>(
1214
1238
1215
1239
perform_import ( application, identity. clone ( ) , import_id) . await ?;
1216
1240
1217
- let snapshot_import = wait_for_export_worker ( application, identity. clone ( ) , import_id) . await ?;
1241
+ let snapshot_import = wait_for_import_worker ( application, identity. clone ( ) , import_id) . await ?;
1218
1242
match & snapshot_import. state {
1219
1243
ImportState :: Uploaded
1220
1244
| ImportState :: WaitingForConfirmation { .. }
@@ -2337,7 +2361,7 @@ mod tests {
2337
2361
use crate :: {
2338
2362
snapshot_import:: {
2339
2363
upload_import_file,
2340
- wait_for_export_worker ,
2364
+ wait_for_import_worker ,
2341
2365
} ,
2342
2366
test_helpers:: {
2343
2367
ApplicationFixtureArgs ,
2705
2729
)
2706
2730
. await ?;
2707
2731
2708
- let snapshot_import = wait_for_export_worker ( & app, new_admin_id ( ) , import_id) . await ?;
2732
+ let snapshot_import = wait_for_import_worker ( & app, new_admin_id ( ) , import_id) . await ?;
2709
2733
2710
2734
let state = snapshot_import. state . clone ( ) ;
2711
2735
must_let ! ( let ImportState :: WaitingForConfirmation {
0 commit comments