Skip to content

Commit f2ec83d

Browse files
committed
address more comments
1 parent d739e91 commit f2ec83d

8 files changed

Lines changed: 156 additions & 80 deletions

File tree

apps/sim/app/api/workspaces/[id]/fork/diff/route.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { getForkDiffContract } from '@/lib/api/contracts/workspace-fork'
44
import { parseRequest } from '@/lib/api/server'
55
import { getSession } from '@/lib/auth'
66
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
7+
import { loadSourceDeployedStates } from '@/lib/workspaces/fork/copy/deploy-bridge'
78
import { assertCanPromote } from '@/lib/workspaces/fork/lineage/authz'
89
import { computeForkPromotePlan } from '@/lib/workspaces/fork/promote/promote-plan'
910

@@ -21,12 +22,17 @@ export const GET = withRouteHandler(
2122

2223
const auth = await assertCanPromote(id, otherWorkspaceId, direction, session.user.id)
2324

25+
const { deployedWorkflows, sourceStates } = await loadSourceDeployedStates(
26+
auth.sourceWorkspaceId
27+
)
2428
const plan = await computeForkPromotePlan({
2529
executor: db,
2630
edge: auth.edge,
2731
sourceWorkspaceId: auth.sourceWorkspaceId,
2832
targetWorkspaceId: auth.targetWorkspaceId,
2933
direction,
34+
deployedSourceWorkflows: deployedWorkflows,
35+
sourceStates,
3036
})
3137

3238
const toRef = (reference: (typeof plan.unmappedRequired)[number]) => ({

apps/sim/lib/workspaces/fork/copy/deploy-bridge.ts

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,31 @@ export async function getActiveDeploymentVersionNumber(
5757
return row?.version ?? null
5858
}
5959

60+
/**
61+
* Read a source workspace's deployed workflows and each one's active deployed state
62+
* on the global pool. Fork/promote callers MUST run this BEFORE opening their
63+
* transaction: doing these heavy per-workflow reads inside the tx checks out a
64+
* SECOND pooled connection while the tx holds the first, which can deadlock the
65+
* pool at saturation (primary pool max is 15). The source is read-only for the
66+
* operation, so a pre-transaction snapshot is the value that gets force-pushed.
67+
*
68+
* Holds every source state in memory at once (bounded by the workspace's deployed
69+
* workflow count) - the apply step needs each state to write its target inside the
70+
* single atomic transaction, so it cannot stream them one at a time.
71+
*/
72+
export async function loadSourceDeployedStates(sourceWorkspaceId: string): Promise<{
73+
deployedWorkflows: DeployedWorkflowSummary[]
74+
sourceStates: Map<string, WorkflowState>
75+
}> {
76+
const deployedWorkflows = await listDeployedWorkflows(db, sourceWorkspaceId)
77+
const sourceStates = new Map<string, WorkflowState>()
78+
for (const wf of deployedWorkflows) {
79+
const state = await readDeployedState(wf.id, sourceWorkspaceId)
80+
if (state) sourceStates.set(wf.id, state)
81+
}
82+
return { deployedWorkflows, sourceStates }
83+
}
84+
6085
/**
6186
* Read a workflow's active deployed state as a `WorkflowState`. Returns null ONLY
6287
* when the workflow genuinely has no active deployment (a legitimate skip); real

apps/sim/lib/workspaces/fork/create-fork.ts

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import {
1717
copyWorkflowStateIntoTarget,
1818
resolveForkFolderMapping,
1919
} from '@/lib/workspaces/fork/copy/copy-workflows'
20-
import { listDeployedWorkflows, readDeployedState } from '@/lib/workspaces/fork/copy/deploy-bridge'
20+
import { loadSourceDeployedStates } from '@/lib/workspaces/fork/copy/deploy-bridge'
2121
import {
2222
type ForkMappingUpsert,
2323
type ForkResourceType,
@@ -28,7 +28,6 @@ import type { ForkRemapKind } from '@/lib/workspaces/fork/remap/remap-references
2828
import type { WorkspaceWithOwner } from '@/lib/workspaces/permissions/utils'
2929
import type { WorkspaceCreationPolicy } from '@/lib/workspaces/policy'
3030
import { WORKSPACE_MODE } from '@/lib/workspaces/policy'
31-
import type { WorkflowState } from '@/stores/workflows/workflow/types'
3231

3332
const logger = createLogger('WorkspaceForkCreate')
3433

@@ -88,16 +87,10 @@ export async function createFork(params: CreateForkParams): Promise<CreateForkRe
8887
const selection = params.selection ?? EMPTY_SELECTION
8988
const childName = params.name?.trim() || `${source.name} (fork)`
9089

91-
// Read the source's deployed workflows + states BEFORE the transaction. These are
92-
// global-pool reads of the (unchanged) source; issuing them inside the fork tx
93-
// would trip the cross-connection tripwire (and check out a second pooled
94-
// connection that can deadlock at saturation).
95-
const deployedWorkflows = await listDeployedWorkflows(db, source.id)
96-
const sourceStates = new Map<string, WorkflowState>()
97-
for (const wf of deployedWorkflows) {
98-
const state = await readDeployedState(wf.id, source.id)
99-
if (state) sourceStates.set(wf.id, state)
100-
}
90+
// Read the source's deployed workflows + states BEFORE the transaction so these
91+
// global-pool reads don't check out a second pooled connection from inside the
92+
// fork tx (which can deadlock the pool at saturation).
93+
const { deployedWorkflows, sourceStates } = await loadSourceDeployedStates(source.id)
10194

10295
const { result, blobTasks, contentPlan } = await db.transaction(async (tx) => {
10396
const now = new Date()

apps/sim/lib/workspaces/fork/lineage/lineage.ts

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -87,10 +87,26 @@ export async function resolveForkEdge(
8787

8888
/**
8989
* Serialize concurrent promote/rollback on a fork edge with a transaction-scoped
90-
* advisory lock keyed by the edge (the child workspace id). `hashtext` is 32-bit,
91-
* so distinct edges can share a lock slot - at worst unnecessary serialization,
92-
* never a correctness issue.
90+
* advisory lock keyed by the edge (the child workspace id). `hashtextextended`
91+
* (64-bit, matching every other advisory lock in the repo) makes a collision
92+
* between distinct keys astronomically unlikely; a collision would only cause
93+
* unnecessary serialization, never a correctness issue.
9394
*/
9495
export async function acquireForkEdgeLock(tx: DbOrTx, childWorkspaceId: string): Promise<void> {
95-
await tx.execute(sql`select pg_advisory_xact_lock(hashtext(${`fork-edge:${childWorkspaceId}`}))`)
96+
await tx.execute(
97+
sql`select pg_advisory_xact_lock(hashtextextended(${`fork-edge:${childWorkspaceId}`}, 0))`
98+
)
99+
}
100+
101+
/**
102+
* Serialize every promote/rollback whose TARGET is this workspace. Sibling forks
103+
* promote into the same parent on different edge locks, so the edge lock alone does
104+
* not serialize them; this lock does, keeping concurrent syncs into one target from
105+
* interleaving and keeping rollback's "newest sync" check race-free. Always acquire
106+
* this BEFORE {@link acquireForkEdgeLock} so the two are taken in a consistent order.
107+
*/
108+
export async function acquireForkTargetLock(tx: DbOrTx, targetWorkspaceId: string): Promise<void> {
109+
await tx.execute(
110+
sql`select pg_advisory_xact_lock(hashtextextended(${`fork-target:${targetWorkspaceId}`}, 0))`
111+
)
96112
}

apps/sim/lib/workspaces/fork/promote/promote-plan.ts

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { workflow } from '@sim/db/schema'
22
import { generateId } from '@sim/utils/id'
33
import { and, eq, isNull } from 'drizzle-orm'
44
import type { DbOrTx } from '@/lib/db/types'
5-
import { listDeployedWorkflows, readDeployedState } from '@/lib/workspaces/fork/copy/deploy-bridge'
5+
import type { DeployedWorkflowSummary } from '@/lib/workspaces/fork/copy/deploy-bridge'
66
import type { ForkEdge } from '@/lib/workspaces/fork/lineage/lineage'
77
import { detectForkCascadeReferences } from '@/lib/workspaces/fork/mapping/cascade'
88
import { buildForkResolver, getEdgeMappingRows } from '@/lib/workspaces/fork/mapping/mapping-store'
@@ -13,6 +13,7 @@ import {
1313
type ForkReferenceResolver,
1414
scanWorkflowReferences,
1515
} from '@/lib/workspaces/fork/remap/remap-references'
16+
import type { WorkflowState } from '@/stores/workflows/workflow/types'
1617

1718
export interface ForkPromotePlanItem {
1819
sourceWorkflowId: string
@@ -100,8 +101,23 @@ export async function computeForkPromotePlan(params: {
100101
sourceWorkspaceId: string
101102
targetWorkspaceId: string
102103
direction: 'push' | 'pull'
104+
/**
105+
* Source deployed workflows + their states, read by the caller BEFORE its
106+
* transaction (see `loadSourceDeployedStates`) so the plan never checks out a
107+
* second pooled connection from inside a tx.
108+
*/
109+
deployedSourceWorkflows: DeployedWorkflowSummary[]
110+
sourceStates: Map<string, WorkflowState>
103111
}): Promise<ForkPromotePlan> {
104-
const { executor, edge, sourceWorkspaceId, targetWorkspaceId, direction } = params
112+
const {
113+
executor,
114+
edge,
115+
sourceWorkspaceId,
116+
targetWorkspaceId,
117+
direction,
118+
deployedSourceWorkflows,
119+
sourceStates,
120+
} = params
105121

106122
const mappingRows = await getEdgeMappingRows(executor, edge.childWorkspaceId)
107123
const [targetEnvKeys, sourceEnvKeys] = await Promise.all([
@@ -118,8 +134,7 @@ export async function computeForkPromotePlan(params: {
118134
else identityMap.set(row.childResourceId, row.parentResourceId)
119135
}
120136

121-
const [deployedSourceWorkflows, targetWorkflows, sourceWorkflowRows] = await Promise.all([
122-
listDeployedWorkflows(executor, sourceWorkspaceId),
137+
const [targetWorkflows, sourceWorkflowRows] = await Promise.all([
123138
executor
124139
.select({ id: workflow.id, name: workflow.name, updatedAt: workflow.updatedAt })
125140
.from(workflow)
@@ -138,13 +153,12 @@ export async function computeForkPromotePlan(params: {
138153
// parent's originals; undeployed sources are simply skipped (target left as-is).
139154
const existingSourceIds = new Set(sourceWorkflowRows.map((w) => w.id))
140155

141-
// Build the items and scan references in one pass, loading each source's deployed
142-
// state, scanning it, then discarding it - peak memory stays at one workflow state
143-
// (the deployed-state cache, bounded globally, retains originals for the copy loop).
156+
// Build the items and scan references in one pass from the pre-read source states
157+
// (loaded before the caller's transaction; see loadSourceDeployedStates).
144158
const items: ForkPromotePlanItem[] = []
145159
const referenceByKey = new Map<string, ForkReference>()
146160
for (const source of deployedSourceWorkflows) {
147-
const sourceState = await readDeployedState(source.id, sourceWorkspaceId)
161+
const sourceState = sourceStates.get(source.id)
148162
if (!sourceState) continue
149163

150164
const mappedTargetId = identityMap.get(source.id)
@@ -180,12 +194,14 @@ export async function computeForkPromotePlan(params: {
180194
items,
181195
})
182196

197+
const writtenTargetIds = new Set(items.map((item) => item.targetWorkflowId))
183198
const archivedTargetIds: string[] = []
184199
for (const row of mappingRows) {
185200
if (row.resourceType !== 'workflow' || row.childResourceId == null) continue
186201
const mappedSourceId = sourceIsParent ? row.parentResourceId : row.childResourceId
187202
const mappedTargetId = sourceIsParent ? row.childResourceId : row.parentResourceId
188203
if (existingSourceIds.has(mappedSourceId)) continue
204+
if (writtenTargetIds.has(mappedTargetId)) continue
189205
if (targetActiveIds.has(mappedTargetId)) archivedTargetIds.push(mappedTargetId)
190206
}
191207
const archivedTargets = archivedTargetIds.map((id) => ({

apps/sim/lib/workspaces/fork/promote/promote-run-store.ts

Lines changed: 27 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -100,49 +100,29 @@ export async function upsertPromoteRun(
100100
return id
101101
}
102102

103-
/** Remove one direction's undo point (keyed by target) after a successful rollback. */
104-
export async function deletePromoteRun(
103+
/**
104+
* Remove EVERY undo point targeting this workspace. Called after a rollback so the
105+
* undo is single-level: only the latest sync into a target is ever undoable, and
106+
* once it is undone there is no stack of older syncs to walk back into.
107+
*/
108+
export async function deleteAllPromoteRunsForTarget(
105109
tx: DbOrTx,
106-
childWorkspaceId: string,
107110
targetWorkspaceId: string
108111
): Promise<void> {
109112
await tx
110113
.delete(workspaceForkPromoteRun)
111-
.where(
112-
and(
113-
eq(workspaceForkPromoteRun.childWorkspaceId, childWorkspaceId),
114-
eq(workspaceForkPromoteRun.targetWorkspaceId, targetWorkspaceId)
115-
)
116-
)
114+
.where(eq(workspaceForkPromoteRun.targetWorkspaceId, targetWorkspaceId))
117115
}
118116

119117
/**
120-
* The undo point targeting this workspace, with the edge counterpart needed to
121-
* call rollback. `sourceWorkspaceId` is the "other" workspace the promote came
122-
* from (rollback resolves the edge from target + other).
118+
* The newest undo point targeting this workspace. A workspace can be the target of
119+
* several edges (pushes from its children, a pull from its parent), so order by
120+
* recency: this is the ONLY undoable sync - older ones are stale the moment a newer
121+
* sync lands, and rollback refuses them.
123122
*/
124-
export async function getUndoableRunForTarget(
123+
export async function getLatestPromoteRunForTarget(
125124
executor: DbOrTx,
126125
targetWorkspaceId: string
127-
): Promise<{ sourceWorkspaceId: string; direction: 'push' | 'pull' } | null> {
128-
const [row] = await executor
129-
.select({
130-
sourceWorkspaceId: workspaceForkPromoteRun.sourceWorkspaceId,
131-
direction: workspaceForkPromoteRun.direction,
132-
})
133-
.from(workspaceForkPromoteRun)
134-
.where(eq(workspaceForkPromoteRun.targetWorkspaceId, targetWorkspaceId))
135-
// A workspace can be the target of several edges; surface the most recent.
136-
.orderBy(desc(workspaceForkPromoteRun.createdAt))
137-
.limit(1)
138-
return row ?? null
139-
}
140-
141-
/** The undo point whose target is this workspace and whose edge counterpart is `otherWorkspaceId`. */
142-
export async function getPromoteRunForRollback(
143-
executor: DbOrTx,
144-
targetWorkspaceId: string,
145-
childWorkspaceId: string
146126
): Promise<PromoteRunRow | null> {
147127
const [row] = await executor
148128
.select({
@@ -155,13 +135,22 @@ export async function getPromoteRunForRollback(
155135
createdAt: workspaceForkPromoteRun.createdAt,
156136
})
157137
.from(workspaceForkPromoteRun)
158-
.where(
159-
and(
160-
eq(workspaceForkPromoteRun.childWorkspaceId, childWorkspaceId),
161-
eq(workspaceForkPromoteRun.targetWorkspaceId, targetWorkspaceId)
162-
)
163-
)
138+
.where(eq(workspaceForkPromoteRun.targetWorkspaceId, targetWorkspaceId))
139+
.orderBy(desc(workspaceForkPromoteRun.createdAt))
164140
.limit(1)
165141
if (!row) return null
166142
return { ...row, snapshot: row.snapshot as PromoteRunSnapshot }
167143
}
144+
145+
/**
146+
* The "other" workspace and direction of the latest sync into this target, for the
147+
* UI's undo affordance. `sourceWorkspaceId` is the workspace the sync came from
148+
* (rollback resolves the edge from target + other).
149+
*/
150+
export async function getUndoableRunForTarget(
151+
executor: DbOrTx,
152+
targetWorkspaceId: string
153+
): Promise<{ sourceWorkspaceId: string; direction: 'push' | 'pull' } | null> {
154+
const run = await getLatestPromoteRunForTarget(executor, targetWorkspaceId)
155+
return run ? { sourceWorkspaceId: run.sourceWorkspaceId, direction: run.direction } : null
156+
}

apps/sim/lib/workspaces/fork/promote/promote.ts

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,13 @@ import {
1212
} from '@/lib/workspaces/fork/copy/copy-workflows'
1313
import {
1414
getActiveDeploymentVersionNumber,
15-
readDeployedState,
15+
loadSourceDeployedStates,
1616
} from '@/lib/workspaces/fork/copy/deploy-bridge'
17-
import { acquireForkEdgeLock, type ForkEdge } from '@/lib/workspaces/fork/lineage/lineage'
17+
import {
18+
acquireForkEdgeLock,
19+
acquireForkTargetLock,
20+
type ForkEdge,
21+
} from '@/lib/workspaces/fork/lineage/lineage'
1822
import {
1923
type ForkMappingUpsert,
2024
upsertEdgeMappings,
@@ -101,7 +105,17 @@ export async function promoteFork(params: PromoteForkParams): Promise<PromoteFor
101105

102106
const targetMembers = (await getUsersWithPermissions(targetWorkspaceId)).map((m) => m.userId)
103107

108+
// Read the source's deployed workflows + states BEFORE the transaction so these
109+
// heavy per-workflow reads never check out a second pooled connection from inside
110+
// the promote tx (which can deadlock the pool at saturation). The source is
111+
// read-only here, so this pre-tx snapshot is exactly what gets force-pushed.
112+
const { deployedWorkflows, sourceStates } = await loadSourceDeployedStates(sourceWorkspaceId)
113+
104114
const txResult: PromoteTxBlocked | PromoteTxApplied = await db.transaction(async (tx) => {
115+
// Target lock before edge lock (consistent ordering): the target lock serializes
116+
// every sync into this target so sibling forks can't interleave writes, and so
117+
// rollback's "newest sync" check stays race-free against a concurrent promote.
118+
await acquireForkTargetLock(tx, targetWorkspaceId)
105119
await acquireForkEdgeLock(tx, edge.childWorkspaceId)
106120

107121
const plan = await computeForkPromotePlan({
@@ -110,6 +124,8 @@ export async function promoteFork(params: PromoteForkParams): Promise<PromoteFor
110124
sourceWorkspaceId,
111125
targetWorkspaceId,
112126
direction,
127+
deployedSourceWorkflows: deployedWorkflows,
128+
sourceStates,
113129
})
114130

115131
if (plan.unmappedRequired.length > 0) {
@@ -143,12 +159,10 @@ export async function promoteFork(params: PromoteForkParams): Promise<PromoteFor
143159
const createdTargetIds: string[] = []
144160
const writtenItems: typeof plan.items = []
145161
for (const item of plan.items) {
146-
// Re-read the source's deployed state one workflow at a time so peak memory
147-
// stays at a single workflow state. Only items actually written below feed
148-
// the snapshot, identity rows, and deploy list - a source that lost its
149-
// active deployment between plan and copy is skipped cleanly (no phantom
150-
// mapping/deploy of a never-created target).
151-
const sourceState = await readDeployedState(item.sourceWorkflowId, sourceWorkspaceId)
162+
// Use the pre-read source state (loaded above, before the tx). An item only
163+
// exists when its state was present at read time, so this lookup hits; the
164+
// guard stays as defense so the written counts below never over-report.
165+
const sourceState = sourceStates.get(item.sourceWorkflowId)
152166
if (!sourceState) continue
153167
if (item.mode === 'replace') {
154168
const priorVersion = await getActiveDeploymentVersionNumber(tx, item.targetWorkflowId)

0 commit comments

Comments
 (0)