Skip to content

Commit b9b37f2

Browse files
goffrieConvex, Inc.
authored and
Convex, Inc.
committed
Make usage tracking always wait for queue space (#37228)
GitOrigin-RevId: 874d29e984309399ded0287922ab63fe720541f4
1 parent b8c88ce commit b9b37f2

File tree

21 files changed

+740
-610
lines changed

21 files changed

+740
-610
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/application/src/application_function_runner/http_routing.rs

Lines changed: 48 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -174,16 +174,18 @@ impl<RT: Runtime> ApplicationFunctionRunner<RT> {
174174
)
175175
})?),
176176
};
177-
self.function_log.log_http_action(
178-
outcome,
179-
result_for_logging,
180-
log_lines,
181-
start.elapsed(),
182-
caller,
183-
usage_tracker,
184-
context,
185-
response_sha256,
186-
);
177+
self.function_log
178+
.log_http_action(
179+
outcome,
180+
result_for_logging,
181+
log_lines,
182+
start.elapsed(),
183+
caller,
184+
usage_tracker,
185+
context,
186+
response_sha256,
187+
)
188+
.await;
187189
Ok(result)
188190
},
189191
Err(e) if e.is_deterministic_user_error() || e.is_client_disconnect() => {
@@ -221,16 +223,18 @@ impl<RT: Runtime> ApplicationFunctionRunner<RT> {
221223
);
222224
send_log_line(new_log_line.clone());
223225
log_lines.push(new_log_line);
224-
self.function_log.log_http_action(
225-
outcome.clone(),
226-
Ok(r),
227-
log_lines,
228-
start.elapsed(),
229-
caller,
230-
usage_tracker,
231-
context,
232-
response_sha256,
233-
);
226+
self.function_log
227+
.log_http_action(
228+
outcome.clone(),
229+
Ok(r),
230+
log_lines,
231+
start.elapsed(),
232+
caller,
233+
usage_tracker,
234+
context,
235+
response_sha256,
236+
)
237+
.await;
234238
Ok(HttpActionResult::Streamed)
235239
},
236240
None => {
@@ -244,31 +248,35 @@ impl<RT: Runtime> ApplicationFunctionRunner<RT> {
244248
None,
245249
None,
246250
);
247-
self.function_log.log_http_action(
248-
outcome.clone(),
249-
Err(js_err),
250-
log_lines,
251-
start.elapsed(),
252-
caller,
253-
usage_tracker,
254-
context,
255-
response_sha256,
256-
);
251+
self.function_log
252+
.log_http_action(
253+
outcome.clone(),
254+
Err(js_err),
255+
log_lines,
256+
start.elapsed(),
257+
caller,
258+
usage_tracker,
259+
context,
260+
response_sha256,
261+
)
262+
.await;
257263
Ok(result)
258264
},
259265
}
260266
},
261267
Err(e) => {
262-
self.function_log.log_http_action_system_error(
263-
&e,
264-
request_head,
265-
identity.into(),
266-
start,
267-
caller,
268-
log_lines,
269-
context,
270-
response_sha256,
271-
);
268+
self.function_log
269+
.log_http_action_system_error(
270+
&e,
271+
request_head,
272+
identity.into(),
273+
start,
274+
caller,
275+
log_lines,
276+
context,
277+
response_sha256,
278+
)
279+
.await;
272280
Err(e)
273281
},
274282
}

crates/application/src/application_function_runner/mod.rs

Lines changed: 112 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -698,15 +698,17 @@ impl<RT: Runtime> ApplicationFunctionRunner<RT> {
698698

699699
let result = outcome.result.clone();
700700
let log_lines = outcome.log_lines.clone();
701-
self.function_log.log_query(
702-
&outcome,
703-
stats,
704-
false,
705-
start.elapsed(),
706-
caller,
707-
tx.usage_tracker,
708-
context,
709-
);
701+
self.function_log
702+
.log_query(
703+
&outcome,
704+
stats,
705+
false,
706+
start.elapsed(),
707+
caller,
708+
tx.usage_tracker,
709+
context,
710+
)
711+
.await;
710712

711713
Ok((result, log_lines))
712714
}
@@ -811,17 +813,19 @@ impl<RT: Runtime> ApplicationFunctionRunner<RT> {
811813
let (mut tx, mut outcome) = match result {
812814
Ok(r) => r,
813815
Err(e) => {
814-
self.function_log.log_mutation_system_error(
815-
&e,
816-
path.debug_into_component_path(),
817-
arguments,
818-
identity,
819-
start,
820-
caller,
821-
context.clone(),
822-
mutation_queue_length,
823-
mutation_retry_count,
824-
)?;
816+
self.function_log
817+
.log_mutation_system_error(
818+
&e,
819+
path.debug_into_component_path(),
820+
arguments,
821+
identity,
822+
start,
823+
caller,
824+
context.clone(),
825+
mutation_queue_length,
826+
mutation_retry_count,
827+
)
828+
.await?;
825829
return Err(e);
826830
},
827831
};
@@ -840,16 +844,18 @@ impl<RT: Runtime> ApplicationFunctionRunner<RT> {
840844
// developer error.
841845
Err(ref error) => {
842846
drop(tx);
843-
self.function_log.log_mutation(
844-
outcome.clone(),
845-
stats,
846-
execution_time,
847-
caller,
848-
usage_tracker,
849-
context.clone(),
850-
mutation_queue_length,
851-
mutation_retry_count,
852-
);
847+
self.function_log
848+
.log_mutation(
849+
outcome.clone(),
850+
stats,
851+
execution_time,
852+
caller,
853+
usage_tracker,
854+
context.clone(),
855+
mutation_queue_length,
856+
mutation_retry_count,
857+
)
858+
.await;
853859
return Ok(Err(MutationError {
854860
error: error.to_owned(),
855861
log_lines,
@@ -890,74 +896,82 @@ impl<RT: Runtime> ApplicationFunctionRunner<RT> {
890896
self.runtime.wait(sleep).await;
891897
let (table_name, document_id, write_source) =
892898
e.occ_info().unwrap_or((None, None, None));
893-
self.function_log.log_mutation_occ_error(
894-
outcome,
895-
stats,
896-
execution_time,
897-
caller.clone(),
898-
usage_tracker,
899-
context.clone(),
900-
OccInfo {
901-
table_name,
902-
document_id,
903-
write_source,
904-
retry_count: mutation_retry_count as u64,
905-
},
906-
mutation_queue_length,
907-
mutation_retry_count,
908-
);
899+
self.function_log
900+
.log_mutation_occ_error(
901+
outcome,
902+
stats,
903+
execution_time,
904+
caller.clone(),
905+
usage_tracker,
906+
context.clone(),
907+
OccInfo {
908+
table_name,
909+
document_id,
910+
write_source,
911+
retry_count: mutation_retry_count as u64,
912+
},
913+
mutation_queue_length,
914+
mutation_retry_count,
915+
)
916+
.await;
909917
continue;
910918
}
911919
outcome.result = Err(JsError::from_error_ref(&e));
912920

913921
if e.is_occ() {
914922
let (table_name, document_id, write_source) =
915923
e.occ_info().unwrap_or((None, None, None));
916-
self.function_log.log_mutation_occ_error(
917-
outcome,
918-
stats,
919-
execution_time,
920-
caller,
921-
usage_tracker,
922-
context.clone(),
923-
OccInfo {
924-
table_name,
925-
document_id,
926-
write_source,
927-
retry_count: mutation_retry_count as u64,
928-
},
929-
mutation_queue_length,
930-
mutation_retry_count,
931-
);
924+
self.function_log
925+
.log_mutation_occ_error(
926+
outcome,
927+
stats,
928+
execution_time,
929+
caller,
930+
usage_tracker,
931+
context.clone(),
932+
OccInfo {
933+
table_name,
934+
document_id,
935+
write_source,
936+
retry_count: mutation_retry_count as u64,
937+
},
938+
mutation_queue_length,
939+
mutation_retry_count,
940+
)
941+
.await;
932942
} else {
933-
self.function_log.log_mutation_system_error(
934-
&e,
935-
path.debug_into_component_path(),
936-
arguments,
937-
identity,
938-
start,
939-
caller,
940-
context,
941-
mutation_queue_length,
942-
mutation_retry_count,
943-
)?;
943+
self.function_log
944+
.log_mutation_system_error(
945+
&e,
946+
path.debug_into_component_path(),
947+
arguments,
948+
identity,
949+
start,
950+
caller,
951+
context,
952+
mutation_queue_length,
953+
mutation_retry_count,
954+
)
955+
.await?;
944956
}
945957
log_occ_retries(backoff.failures() as usize);
946958
return Err(e);
947959
}
948960
},
949961
};
950962

951-
self.function_log.log_mutation(
952-
outcome.clone(),
953-
stats,
954-
execution_time,
955-
caller,
956-
usage_tracker,
957-
context.clone(),
958-
mutation_queue_length,
959-
mutation_retry_count,
960-
);
963+
self.function_log
964+
.log_mutation(
965+
outcome.clone(),
966+
stats,
967+
execution_time,
968+
caller,
969+
usage_tracker,
970+
context.clone(),
971+
mutation_queue_length,
972+
mutation_retry_count,
973+
)
974+
.await;
961975
log_occ_retries(backoff.failures() as usize);
962976
return Ok(result);
963977
}
@@ -1109,22 +1123,26 @@ impl<RT: Runtime> ApplicationFunctionRunner<RT> {
11091123
let completion = match completion_result {
11101124
Ok(c) => c,
11111125
Err(e) => {
1112-
self.function_log.log_action_system_error(
1113-
&e,
1114-
path.debug_into_component_path(),
1115-
arguments,
1116-
identity.into(),
1117-
start,
1118-
caller,
1119-
vec![].into(),
1120-
context,
1121-
)?;
1126+
self.function_log
1127+
.log_action_system_error(
1128+
&e,
1129+
path.debug_into_component_path(),
1130+
arguments,
1131+
identity.into(),
1132+
start,
1133+
caller,
1134+
vec![].into(),
1135+
context,
1136+
)
1137+
.await?;
11221138
anyhow::bail!(e)
11231139
},
11241140
};
11251141
let log_lines = completion.log_lines().clone();
11261142
let result = completion.outcome.result.clone();
1127-
self.function_log.log_action(completion, usage_tracking);
1143+
self.function_log
1144+
.log_action(completion, usage_tracking)
1145+
.await;
11281146

11291147
let value = match result {
11301148
Ok(value) => value,

0 commit comments

Comments
 (0)