Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .changeset/pending-operation-virtual-prop.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
'@tanstack/db': minor
---

Add `$pendingOperation` virtual property to track optimistic mutation type

- New virtual property `$pendingOperation` on every collection row: `'insert' | 'update' | 'delete' | null`
- Items deleted in pending transactions can stay visible in query results when `$pendingOperation` is referenced in a `.where()` clause
- Works with live queries, `createEffect`, joins/subqueries, GROUP BY, and ordered/paginated queries
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ const stripVirtualProps = <T extends Record<string, any> | undefined>(
$origin: _origin,
$key: _key,
$collectionId: _collectionId,
$pendingOperation: _pendingOperation,
...rest
} = value as Record<string, unknown>
return rest as T
Expand Down
64 changes: 61 additions & 3 deletions packages/db/src/collection/state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { SortedMap } from '../SortedMap'
import { enrichRowWithVirtualProps } from '../virtual-props.js'
import { DIRECT_TRANSACTION_METADATA_KEY } from './transaction-metadata.js'
import type {
PendingOperationType,
VirtualOrigin,
VirtualRowProps,
WithVirtualProps,
Expand Down Expand Up @@ -110,6 +111,7 @@ export class CollectionStateManager<
origin: VirtualOrigin
key: TKey
collectionId: string
pendingOperation: PendingOperationType
enriched: WithVirtualProps<TOutput, TKey>
}
>()
Expand Down Expand Up @@ -161,7 +163,38 @@ export class CollectionStateManager<
if (this.isLocalOnly) {
return true
}
return !this.optimisticUpserts.has(key) && !this.optimisticDeletes.has(key)
return (
!this.optimisticUpserts.has(key) &&
!this.optimisticDeletes.has(key) &&
!this.pendingOptimisticUpserts.has(key) &&
!this.pendingOptimisticDeletes.has(key)
)
}

/**
* Gets the pending operation type for a row.
* Returns the type of optimistic mutation pending for this key, or null if none.
* Used to compute the $pendingOperation virtual property.
*/
public getPendingOperation(key: TKey): PendingOperationType {
if (this.isLocalOnly) {
return null
}
// Check active optimistic state
if (this.optimisticDeletes.has(key)) {
return 'delete'
}
if (this.optimisticUpserts.has(key)) {
return this.syncedData.has(key) ? 'update' : 'insert'
}
// Check completed-but-awaiting-sync state to avoid flicker
if (this.pendingOptimisticDeletes.has(key)) {
return 'delete'
}
if (this.pendingOptimisticUpserts.has(key)) {
return this.syncedData.has(key) ? 'update' : 'insert'
}
return null
}

/**
Expand Down Expand Up @@ -190,6 +223,10 @@ export class CollectionStateManager<
$origin: overrides?.$origin ?? this.getRowOrigin(key),
$key: overrides?.$key ?? key,
$collectionId: overrides?.$collectionId ?? this.collection.id,
$pendingOperation:
overrides?.$pendingOperation !== undefined
? overrides.$pendingOperation
: this.getPendingOperation(key),
}
}

Expand All @@ -206,6 +243,7 @@ export class CollectionStateManager<
return this.createVirtualPropsSnapshot(key, {
$synced: true,
$origin: 'local',
$pendingOperation: null,
})
}

Expand All @@ -218,11 +256,20 @@ export class CollectionStateManager<
optimisticDeletes.has(key) ||
options?.completedOptimisticKeys?.has(key) === true

// Compute $pendingOperation from the provided or current state
let pendingOperation: PendingOperationType = null
if (optimisticDeletes.has(key)) {
pendingOperation = 'delete'
} else if (optimisticUpserts.has(key)) {
pendingOperation = this.syncedData.has(key) ? 'update' : 'insert'
}

return this.createVirtualPropsSnapshot(key, {
$synced: !hasOptimisticChange,
$origin: hasOptimisticChange
? 'local'
: ((options?.rowOrigins ?? this.rowOrigins).get(key) ?? 'remote'),
$pendingOperation: pendingOperation,
})
}

Expand All @@ -235,14 +282,19 @@ export class CollectionStateManager<
const origin = existingRow.$origin ?? virtualProps.$origin
const resolvedKey = existingRow.$key ?? virtualProps.$key
const collectionId = existingRow.$collectionId ?? virtualProps.$collectionId
const pendingOperation =
existingRow.$pendingOperation !== undefined
? existingRow.$pendingOperation
: virtualProps.$pendingOperation

const cached = this.virtualPropsCache.get(row as object)
if (
cached &&
cached.synced === synced &&
cached.origin === origin &&
cached.key === resolvedKey &&
cached.collectionId === collectionId
cached.collectionId === collectionId &&
cached.pendingOperation === pendingOperation
) {
return cached.enriched
}
Expand All @@ -253,13 +305,15 @@ export class CollectionStateManager<
$origin: origin,
$key: resolvedKey,
$collectionId: collectionId,
$pendingOperation: pendingOperation,
} as WithVirtualProps<TOutput, TKey>

this.virtualPropsCache.set(row as object, {
synced,
origin,
key: resolvedKey,
collectionId,
pendingOperation,
enriched,
})

Expand Down Expand Up @@ -1173,7 +1227,9 @@ export class CollectionStateManager<
const nextVirtualProps = this.getVirtualPropsSnapshotForState(key)
const virtualChanged =
previousVirtualProps.$synced !== nextVirtualProps.$synced ||
previousVirtualProps.$origin !== nextVirtualProps.$origin
previousVirtualProps.$origin !== nextVirtualProps.$origin ||
previousVirtualProps.$pendingOperation !==
nextVirtualProps.$pendingOperation
const previousValueWithVirtual =
previousVisibleValue !== undefined
? enrichRowWithVirtualProps(
Expand All @@ -1182,6 +1238,7 @@ export class CollectionStateManager<
this.collection.id,
() => previousVirtualProps.$synced,
() => previousVirtualProps.$origin,
() => previousVirtualProps.$pendingOperation,
)
: undefined

Expand Down Expand Up @@ -1229,6 +1286,7 @@ export class CollectionStateManager<
this.collection.id,
() => previousVirtualProps.$synced,
() => previousVirtualProps.$origin,
() => previousVirtualProps.$pendingOperation,
)
events.push({
type: `update`,
Expand Down
129 changes: 128 additions & 1 deletion packages/db/src/collection/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ type CollectionSubscriptionOptions = {
whereExpression?: BasicExpression<boolean>
/** Callback to call when the subscription is unsubscribed */
onUnsubscribe?: (event: SubscriptionUnsubscribedEvent) => void
/** When true, optimistic delete events are converted to updates with $pendingOperation: 'delete'
* instead of being passed through as deletes. This keeps deleted items visible in query results. */
includePendingDeletes?: boolean
}

export class CollectionSubscription
Expand All @@ -77,6 +80,12 @@ export class CollectionSubscription
// Keep track of the keys we've sent (needed for join and orderBy optimizations)
private sentKeys = new Set<string | number>()

// Track keys where a delete was converted to an update by convertPendingDeletes,
// along with the enriched value that was sent to D2. When a rollback causes an insert
// for one of these keys, we convert it to an update with the correct previousValue
// so D2's multiplicity bookkeeping stays balanced.
private convertedDeleteValues = new Map<string | number, any>()

// Track the count of rows sent via requestLimitedSnapshot for offset-based pagination
private limitedSnapshotRowCount = 0

Expand Down Expand Up @@ -165,6 +174,7 @@ export class CollectionSubscription
this.limitedSnapshotRowCount = 0
this.lastSentKey = undefined
this.loadedSubsets = []
this.convertedDeleteValues.clear()
return
}

Expand All @@ -181,6 +191,7 @@ export class CollectionSubscription
this.loadedInitialState = false
this.limitedSnapshotRowCount = 0
this.lastSentKey = undefined
this.convertedDeleteValues.clear()

// Clear the loadedSubsets array since we're re-requesting fresh
this.loadedSubsets = []
Expand Down Expand Up @@ -319,7 +330,15 @@ export class CollectionSubscription
}

emitEvents(changes: Array<ChangeMessage<any, any>>) {
const newChanges = this.filterAndFlipChanges(changes)
// When includePendingDeletes is enabled, convert optimistic delete events
// to updates BEFORE filterAndFlipChanges so that sentKeys correctly tracks
// the key as still present. This ensures the later sync-confirmed delete
// can properly remove the item.
const processedChanges = this.options.includePendingDeletes
? this.convertPendingDeletes(changes)
: changes

const newChanges = this.filterAndFlipChanges(processedChanges)

if (this.isBufferingForTruncate) {
// Buffer the changes instead of emitting immediately
Expand All @@ -332,6 +351,100 @@ export class CollectionSubscription
}
}

/**
* Converts optimistic delete events to update events with $pendingOperation: 'delete'.
* Only converts deletes for keys that are still in optimisticDeletes or pendingOptimisticDeletes
* (i.e., pending deletes, not sync-confirmed ones).
*/
private convertPendingDeletes(
changes: Array<ChangeMessage<any, any>>,
): Array<ChangeMessage<any, any>> {
const state = this.collection._state
const result: Array<ChangeMessage<any, any>> = []

for (const change of changes) {
if (change.type === `delete`) {
// Check if this is an optimistic delete (key still pending) vs a sync-confirmed delete
const isPendingDelete =
state.optimisticDeletes.has(change.key) ||
state.pendingOptimisticDeletes.has(change.key)

if (!isPendingDelete) {
// Sync-confirmed delete β€” clean up any stale converted value
this.convertedDeleteValues.delete(change.key)
} else {
// Convert to update β€” the delete event's value contains the pre-delete row data
// already enriched with virtual props. We need to add $pendingOperation: 'delete'.
const enrichedValue = {
...change.value,
$pendingOperation: `delete` as const,
}
// Use the pre-delete value as previousValue. change.value on delete events
// carries the pre-delete virtual props (including the previous $pendingOperation,
// e.g., 'update' if the item was updated before being deleted).
const previousValue = change.previousValue ?? change.value

result.push({
type: `update`,
key: change.key,
value: enrichedValue,
previousValue,
})
this.convertedDeleteValues.set(change.key, enrichedValue)
continue
}
} else if (
change.type === `insert` &&
this.convertedDeleteValues.has(change.key)
) {
// This insert is from a rollback of a previously-converted delete.
// Convert to update with the correct previousValue (the pending-delete value
// we sent to D2) so the multiplicity bookkeeping stays balanced.
const pendingDeleteValue = this.convertedDeleteValues.get(change.key)
this.convertedDeleteValues.delete(change.key)
result.push({
...change,
type: `update`,
previousValue: pendingDeleteValue,
})
continue
}
result.push(change)
}

return result
}

/**
* Appends pending-delete items to a changes array.
* These items are in optimisticDeletes or pendingOptimisticDeletes but still
* have data in syncedData. They are added as inserts with $pendingOperation: 'delete'.
* Skips keys already in sentKeys to avoid duplicates.
*/
private appendPendingDeleteItems(
changes: Array<ChangeMessage<any, any>>,
): void {
const state = this.collection._state
const pendingDeleteKeys = new Set([
...state.optimisticDeletes,
...state.pendingOptimisticDeletes,
])
for (const key of pendingDeleteKeys) {
if (this.sentKeys.has(key)) continue
const syncedValue = state.syncedData.get(key)
if (syncedValue !== undefined) {
// enrichWithVirtualProps computes $pendingOperation via getPendingOperation(key),
// which returns 'delete' for keys in optimisticDeletes/pendingOptimisticDeletes.
const enrichedValue = state.enrichWithVirtualProps(syncedValue, key)
changes.push({
type: `insert` as const,
key,
value: enrichedValue,
})
}
}
}

/**
* Sends the snapshot to the callback.
* Returns a boolean indicating if it succeeded.
Expand Down Expand Up @@ -397,6 +510,13 @@ export class CollectionSubscription
return false
}

// When includePendingDeletes is enabled, also include items that are
// optimistically deleted β€” they won't appear in entries() but their
// data is still in syncedData.
if (this.options.includePendingDeletes) {
this.appendPendingDeleteItems(snapshot)
}

// Only send changes that have not been sent yet
const filteredSnapshot = snapshot.filter(
(change) => !this.sentKeys.has(change.key),
Expand Down Expand Up @@ -543,6 +663,12 @@ export class CollectionSubscription
keys = index.take(valuesNeeded(), biggestObservedValue!, filterFn)
}

// When includePendingDeletes is enabled, also include pending-delete items
// that the index traversal missed (because collection.get() returns undefined for them).
if (this.options.includePendingDeletes) {
this.appendPendingDeleteItems(changes)
}

// Track row count for offset-based pagination (before sending to callback)
// Use the current count as the offset for this load
const currentOffset = this.limitedSnapshotRowCount
Expand Down Expand Up @@ -736,6 +862,7 @@ export class CollectionSubscription
this.collection._sync.unloadSubset(options)
}
this.loadedSubsets = []
this.convertedDeleteValues.clear()

this.emitInner(`unsubscribed`, {
type: `unsubscribed`,
Expand Down
1 change: 1 addition & 0 deletions packages/db/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ export * from './strategies/index.js'
export {
type VirtualRowProps,
type VirtualOrigin,
type PendingOperationType,
type WithVirtualProps,
type WithoutVirtualProps,
hasVirtualProps,
Expand Down
2 changes: 1 addition & 1 deletion packages/db/src/indexes/auto-index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { DEFAULT_COMPARE_OPTIONS } from '../utils'
import { checkCollectionSizeForIndex, isDevModeEnabled } from './index-registry'
import { hasVirtualPropPath } from '../virtual-props'
import { checkCollectionSizeForIndex, isDevModeEnabled } from './index-registry'
import type { CompareOptions } from '../query/builder/types'
import type { BasicExpression } from '../query/ir'
import type { CollectionImpl } from '../collection/index.js'
Expand Down
Loading
Loading