Skip to content

Commit 64b5093

Browse files
Sujay JayakarConvex, Inc.
Sujay Jayakar
authored and
Convex, Inc.
committed
Improve schema checking path for deploy2 (#29394)
GitOrigin-RevId: d244fa200dc2687b066abc5439e222679192589a
1 parent 036e1b3 commit 64b5093

File tree

10 files changed

+309
-59
lines changed

10 files changed

+309
-59
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/application/Cargo.toml

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,14 +63,17 @@ sync_types = { package = "convex_sync_types", path = "../../crates/convex/sync_t
6363
tempfile = { workspace = true }
6464
thiserror = { workspace = true }
6565
thousands = { workspace = true }
66+
tokio = { workspace = true }
6667
tracing = { workspace = true }
6768
url = { workspace = true }
6869
usage_tracking = { path = "../../crates/usage_tracking" }
6970
value = { path = "../value" }
7071
vector = { path = "../vector" }
7172

7273
[dev-dependencies]
73-
authentication = { path = "../../crates/authentication", features = ["testing"] }
74+
authentication = { path = "../../crates/authentication", features = [
75+
"testing",
76+
] }
7477
common = { path = "../common", features = ["testing"] }
7578
database = { path = "../database", features = ["testing"] }
7679
errors = { path = "../errors", features = ["testing"] }
@@ -89,7 +92,9 @@ runtime = { path = "../runtime", features = ["testing"] }
8992
search = { path = "../search", features = ["testing"] }
9093
shape_inference = { path = "../shape_inference", features = ["testing"] }
9194
storage = { path = "../storage", features = ["testing"] }
92-
usage_tracking = { path = "../../crates/usage_tracking", features = ["testing"] }
95+
usage_tracking = { path = "../../crates/usage_tracking", features = [
96+
"testing",
97+
] }
9398
value = { path = "../value", features = ["testing"] }
9499
vector = { path = "../vector", features = ["testing"] }
95100

crates/application/src/deploy_config.rs

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -575,3 +575,90 @@ pub struct FinishPushDiff {
575575
pub definition_diffs: BTreeMap<ComponentDefinitionPath, ComponentDefinitionDiff>,
576576
pub component_diffs: BTreeMap<ComponentPath, ComponentDiff>,
577577
}
578+
579+
#[derive(Debug)]
580+
pub enum SchemaStatus {
581+
InProgress {
582+
components: BTreeMap<ComponentPath, ComponentSchemaStatus>,
583+
},
584+
Failed {
585+
error: String,
586+
component_path: ComponentPath,
587+
table_name: Option<String>,
588+
},
589+
RaceDetected,
590+
Complete,
591+
}
592+
593+
#[derive(Serialize, Deserialize, Debug)]
594+
#[serde(tag = "type")]
595+
#[serde(rename_all = "camelCase")]
596+
pub enum SchemaStatusJson {
597+
#[serde(rename_all = "camelCase")]
598+
InProgress {
599+
components: BTreeMap<String, ComponentSchemaStatusJson>,
600+
},
601+
#[serde(rename_all = "camelCase")]
602+
Failed {
603+
error: String,
604+
component_path: String,
605+
table_name: Option<String>,
606+
},
607+
RaceDetected,
608+
Complete,
609+
}
610+
611+
impl From<SchemaStatus> for SchemaStatusJson {
612+
fn from(value: SchemaStatus) -> Self {
613+
match value {
614+
SchemaStatus::InProgress { components } => SchemaStatusJson::InProgress {
615+
components: components
616+
.into_iter()
617+
.map(|(k, v)| (String::from(k), v.into()))
618+
.collect(),
619+
},
620+
SchemaStatus::Failed {
621+
error,
622+
component_path,
623+
table_name,
624+
} => SchemaStatusJson::Failed {
625+
error,
626+
component_path: String::from(component_path),
627+
table_name,
628+
},
629+
SchemaStatus::RaceDetected => SchemaStatusJson::RaceDetected,
630+
SchemaStatus::Complete => SchemaStatusJson::Complete,
631+
}
632+
}
633+
}
634+
635+
#[derive(Debug)]
636+
pub struct ComponentSchemaStatus {
637+
pub schema_validation_complete: bool,
638+
pub indexes_complete: usize,
639+
pub indexes_total: usize,
640+
}
641+
642+
impl ComponentSchemaStatus {
643+
pub fn is_complete(&self) -> bool {
644+
self.schema_validation_complete && self.indexes_complete == self.indexes_total
645+
}
646+
}
647+
648+
#[derive(Serialize, Deserialize, Debug)]
649+
#[serde(rename_all = "camelCase")]
650+
pub struct ComponentSchemaStatusJson {
651+
pub schema_validation_complete: bool,
652+
pub indexes_complete: usize,
653+
pub indexes_total: usize,
654+
}
655+
656+
impl From<ComponentSchemaStatus> for ComponentSchemaStatusJson {
657+
fn from(value: ComponentSchemaStatus) -> Self {
658+
Self {
659+
schema_validation_complete: value.schema_validation_complete,
660+
indexes_complete: value.indexes_complete,
661+
indexes_total: value.indexes_total,
662+
}
663+
}
664+
}

crates/application/src/lib.rs

Lines changed: 45 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,14 @@
88
use std::{
99
collections::{
1010
BTreeMap,
11-
BTreeSet,
1211
HashSet,
1312
},
1413
ops::Bound,
1514
sync::Arc,
16-
time::SystemTime,
15+
time::{
16+
Duration,
17+
SystemTime,
18+
},
1719
};
1820

1921
use anyhow::Context;
@@ -117,7 +119,9 @@ use database::{
117119
WriteSource,
118120
};
119121
use deploy_config::{
122+
ComponentSchemaStatus,
120123
FinishPushDiff,
124+
SchemaStatus,
121125
StartPushResponse,
122126
};
123127
use errors::{
@@ -1903,10 +1907,12 @@ impl<RT: Runtime> Application<RT> {
19031907
&self,
19041908
identity: Identity,
19051909
schema_change: SchemaChange,
1906-
) -> anyhow::Result<()> {
1910+
timeout: Duration,
1911+
) -> anyhow::Result<SchemaStatus> {
1912+
let deadline = self.runtime().monotonic_now() + timeout;
19071913
loop {
19081914
let mut tx = self.begin(identity.clone()).await?;
1909-
let mut waiting = BTreeSet::new();
1915+
let mut components_status = BTreeMap::new();
19101916

19111917
for (component_path, schema_id) in &schema_change.schema_ids {
19121918
let Some(schema_id) = schema_id else {
@@ -1922,24 +1928,18 @@ impl<RT: Runtime> Application<RT> {
19221928
.await?
19231929
.context("Missing schema document")?;
19241930
let SchemaMetadata { state, .. } = document.into_value().0.try_into()?;
1925-
let is_pending = match state {
1926-
SchemaState::Pending => true,
1927-
SchemaState::Active | SchemaState::Validated => false,
1931+
let schema_validation_complete = match state {
1932+
SchemaState::Pending => false,
1933+
SchemaState::Active | SchemaState::Validated => true,
19281934
SchemaState::Failed { error, table_name } => {
1929-
let msg = match table_name {
1930-
Some(t) => format!("Schema for table `{t}` failed: {error}"),
1931-
None => format!("Schema failed: {error}"),
1932-
};
1933-
anyhow::bail!(ErrorMetadata::bad_request("SchemaFailed", msg))
1935+
return Ok(SchemaStatus::Failed {
1936+
error,
1937+
component_path: component_path.clone(),
1938+
table_name,
1939+
});
19341940
},
1935-
SchemaState::Overwritten => anyhow::bail!(ErrorMetadata::bad_request(
1936-
"RaceDetected",
1937-
"Push aborted since another push has been started."
1938-
)),
1941+
SchemaState::Overwritten => return Ok(SchemaStatus::RaceDetected),
19391942
};
1940-
if is_pending {
1941-
waiting.insert(component_path.clone());
1942-
}
19431943

19441944
let component_id = if component_path.is_root() {
19451945
ComponentId::Root
@@ -1956,27 +1956,45 @@ impl<RT: Runtime> Application<RT> {
19561956
ComponentId::Child(internal_id)
19571957
};
19581958
let namespace = TableNamespace::from(component_id);
1959+
let mut indexes_complete = 0;
1960+
let mut indexes_total = 0;
19591961
for index in IndexModel::new(&mut tx)
19601962
.get_application_indexes(namespace)
19611963
.await?
19621964
{
1963-
if index.config.is_backfilling() {
1964-
waiting.insert(component_path.clone());
1965+
if !index.config.is_backfilling() {
1966+
indexes_complete += 1;
19651967
}
1968+
indexes_total += 1;
19661969
}
1970+
components_status.insert(
1971+
component_path.clone(),
1972+
ComponentSchemaStatus {
1973+
schema_validation_complete,
1974+
indexes_complete,
1975+
indexes_total,
1976+
},
1977+
);
19671978
}
19681979

1969-
if waiting.is_empty() {
1970-
break;
1980+
if components_status.values().all(|c| c.is_complete()) {
1981+
return Ok(SchemaStatus::Complete);
19711982
}
19721983

1973-
tracing::info!("Waiting for schema changes to complete for {waiting:?}...");
1974-
1984+
let now = self.runtime().monotonic_now();
1985+
if now > deadline {
1986+
return Ok(SchemaStatus::InProgress {
1987+
components: components_status,
1988+
});
1989+
}
19751990
let token = tx.into_token()?;
19761991
let subscription = self.subscribe(token).await?;
1977-
subscription.wait_for_invalidation().await;
1992+
1993+
tokio::select! {
1994+
_ = subscription.wait_for_invalidation() => {},
1995+
_ = self.runtime.wait(deadline.clone() - now) => {},
1996+
}
19781997
}
1979-
Ok(())
19801998
}
19811999

19822000
pub async fn finish_push(

crates/application/src/test_helpers.rs

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,10 @@ use value::{
8787

8888
use crate::{
8989
cron_jobs::CronJobExecutor,
90-
deploy_config::StartPushRequest,
90+
deploy_config::{
91+
SchemaStatus,
92+
StartPushRequest,
93+
},
9194
log_visibility::AllowLogging,
9295
scheduled_jobs::{
9396
ScheduledJobExecutor,
@@ -321,8 +324,20 @@ impl<RT: Runtime> ApplicationTestExt<RT> for Application<RT> {
321324
async fn load_component_tests_modules(&self, layout: &str) -> anyhow::Result<()> {
322325
let request = Self::load_start_push_request(Path::new(layout))?;
323326
let start_push = self.start_push(request).await?;
324-
self.wait_for_schema(Identity::system(), start_push.schema_change.clone())
325-
.await?;
327+
loop {
328+
let schema_status = self
329+
.wait_for_schema(
330+
Identity::system(),
331+
start_push.schema_change.clone(),
332+
Duration::from_secs(10),
333+
)
334+
.await?;
335+
match schema_status {
336+
SchemaStatus::InProgress { .. } => continue,
337+
SchemaStatus::Complete => break,
338+
_ => anyhow::bail!("Unexpected schema status: {schema_status:?}"),
339+
}
340+
}
326341
self.finish_push(start_push, false).await?;
327342
Ok(())
328343
}

crates/local_backend/src/deploy_config2.rs

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
1-
use std::collections::BTreeMap;
1+
use std::{
2+
collections::BTreeMap,
3+
time::Duration,
4+
};
25

36
use application::deploy_config::{
47
FinishPushDiff,
8+
SchemaStatusJson,
59
StartPushRequest,
610
StartPushResponse,
711
};
@@ -163,11 +167,28 @@ pub async fn start_push(
163167
Ok(Json(SerializedStartPushResponse::try_from(resp)?))
164168
}
165169

170+
const DEFAULT_SCHEMA_TIMEOUT_MS: u32 = 10_000;
171+
166172
#[derive(Deserialize)]
167173
#[serde(rename_all = "camelCase")]
168174
pub struct WaitForSchemaRequest {
169175
admin_key: String,
170176
schema_change: SerializedSchemaChange,
177+
timeout_ms: Option<u32>,
178+
}
179+
180+
#[derive(Serialize)]
181+
#[serde(tag = "type")]
182+
pub enum WaitForSchemaResponse {
183+
InProgress {
184+
status: SchemaStatusJson,
185+
},
186+
Failed {
187+
error: String,
188+
table_name: Option<String>,
189+
},
190+
RaceDetected,
191+
Complete,
171192
}
172193

173194
#[debug_handler]
@@ -181,12 +202,13 @@ pub async fn wait_for_schema(
181202
req.admin_key,
182203
)
183204
.await?;
184-
205+
let timeout = Duration::from_millis(req.timeout_ms.unwrap_or(DEFAULT_SCHEMA_TIMEOUT_MS) as u64);
185206
let schema_change = req.schema_change.try_into()?;
186-
st.application
187-
.wait_for_schema(identity, schema_change)
207+
let resp = st
208+
.application
209+
.wait_for_schema(identity, schema_change, timeout)
188210
.await?;
189-
Ok(Json(()))
211+
Ok(Json(SchemaStatusJson::from(resp)))
190212
}
191213

192214
#[derive(Deserialize)]

npm-packages/convex/src/cli/lib/codegen.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,12 @@ export async function doInitialComponentCodegen(
142142
ctx: Context,
143143
tmpDir: TempDir,
144144
componentDirectory: ComponentDirectory,
145-
opts?: { dryRun?: boolean; generateCommonJSApi?: boolean; debug?: boolean },
145+
opts?: {
146+
dryRun?: boolean;
147+
generateCommonJSApi?: boolean;
148+
debug?: boolean;
149+
verbose?: boolean;
150+
},
146151
) {
147152
const { projectConfig } = await readProjectConfig(ctx);
148153

@@ -156,7 +161,7 @@ export async function doInitialComponentCodegen(
156161
const isPublishedPackage =
157162
componentDirectory.definitionPath.endsWith(".js") &&
158163
!componentDirectory.isRoot;
159-
if (isPublishedPackage) {
164+
if (isPublishedPackage && opts?.verbose) {
160165
logMessage(
161166
ctx,
162167
`skipping initial codegen for installed package ${componentDirectory.path}`,

0 commit comments

Comments
 (0)