Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/fix-on-demand-isready.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@tanstack/db': patch
'@tanstack/query-db-collection': patch
---

Fix `isReady` tracking for on-demand live queries without orderBy. Previously, non-ordered live queries using `syncMode: 'on-demand'` were incorrectly marked as ready before data finished loading. Also fix `preload()` promises hanging when cleanup occurs before the collection becomes ready. Additionally, fix concurrent live queries subscribing to the same source collection - each now independently tracks loading state.
104 changes: 104 additions & 0 deletions packages/db-collection-e2e/src/suites/pagination.suite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,110 @@ export function createPaginationTestSuite(
await query2.cleanup()
})

it(`should load distinct windows across multiple live queries with multi-column orderBy`, async () => {
const config = await getConfig()
const usersCollection = config.collections.onDemand.users

const page1 = createLiveQueryCollection((q) =>
q
.from({ user: usersCollection })
.orderBy(({ user }) => user.isActive, `desc`)
.orderBy(({ user }) => user.age, `asc`)
.limit(10)
.offset(0),
)

const page2 = createLiveQueryCollection((q) =>
q
.from({ user: usersCollection })
.orderBy(({ user }) => user.isActive, `desc`)
.orderBy(({ user }) => user.age, `asc`)
.limit(10)
.offset(10),
)

await page1.preload()
await waitForQueryData(page1, { minSize: 10 })
await page2.preload()
await waitForQueryData(page2, { minSize: 10 })

const page1Results = Array.from(page1.state.values())
const page2Results = Array.from(page2.state.values())

expect(page1Results).toHaveLength(10)
expect(page2Results).toHaveLength(10)

const page1Ids = new Set(page1Results.map((user) => user.id))
for (const user of page2Results) {
expect(page1Ids.has(user.id)).toBe(false)
}

for (const page of [page1Results, page2Results]) {
for (let i = 1; i < page.length; i++) {
const prev = page[i - 1]!
const curr = page[i]!

if (prev.isActive !== curr.isActive) {
expect(prev.isActive ? 1 : 0).toBeGreaterThanOrEqual(
curr.isActive ? 1 : 0,
)
} else {
expect(prev.age).toBeLessThanOrEqual(curr.age)
}
}
}

await page1.cleanup()
await page2.cleanup()
})

it(`should allow paging a second live query without affecting the first`, async () => {
const config = await getConfig()
const usersCollection = config.collections.onDemand.users

const baseQuery = createLiveQueryCollection((q) =>
q
.from({ user: usersCollection })
.orderBy(({ user }) => user.isActive, `desc`)
.orderBy(({ user }) => user.age, `asc`)
.limit(10)
.offset(0),
)

const pagedQuery = createLiveQueryCollection((q) =>
q
.from({ user: usersCollection })
.orderBy(({ user }) => user.isActive, `desc`)
.orderBy(({ user }) => user.age, `asc`)
.limit(10)
.offset(0),
)

await baseQuery.preload()
await waitForQueryData(baseQuery, { minSize: 10 })
await pagedQuery.preload()
await waitForQueryData(pagedQuery, { minSize: 10 })

const baseIds = new Set(
Array.from(baseQuery.state.values()).map((user) => user.id),
)

const moveResult = pagedQuery.utils.setWindow({ offset: 10, limit: 10 })
if (moveResult !== true) {
await moveResult
}
await waitForQueryData(pagedQuery, { minSize: 10 })

const pagedResults = Array.from(pagedQuery.state.values())
expect(pagedResults).toHaveLength(10)
for (const user of pagedResults) {
expect(baseIds.has(user.id)).toBe(false)
}

await baseQuery.cleanup()
await pagedQuery.cleanup()
})

it(`should handle multi-column orderBy with duplicate values in first column`, async () => {
const config = await getConfig()
const usersCollection = config.collections.onDemand.users
Expand Down
1 change: 0 additions & 1 deletion packages/db-collection-e2e/src/suites/predicates.suite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ export function createPredicatesTestSuite(
)

await query.preload()

await waitForQueryData(query, { minSize: 1 })

const results = Array.from(query.state.values())
Expand Down
7 changes: 6 additions & 1 deletion packages/db/src/collection/changes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,12 @@ export class CollectionChangesManager<
}

if (options.includeInitialState) {
subscription.requestSnapshot({ trackLoadSubsetPromise: false })
subscription.requestSnapshot({
trackLoadSubsetPromise: false,
orderBy: options.orderBy,
limit: options.limit,
onLoadSubsetResult: options.onLoadSubsetResult,
})
} else if (options.includeInitialState === false) {
// When explicitly set to false (not just undefined), mark all state as "seen"
// so that all future changes (including deletes) pass through unfiltered.
Expand Down
14 changes: 14 additions & 0 deletions packages/db/src/collection/lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,21 @@ export class CollectionLifecycleManager<
}

this.hasBeenReady = false

// Call any pending onFirstReady callbacks before clearing them.
// This ensures preload() promises resolve during cleanup instead of hanging.
const callbacks = [...this.onFirstReadyCallbacks]
this.onFirstReadyCallbacks = []
callbacks.forEach((callback) => {
try {
callback()
} catch (error) {
console.error(
`${this.config.id ? `[${this.config.id}] ` : ``}Error in onFirstReady callback during cleanup:`,
error,
)
}
})

// Set status to cleaned-up after everything is cleaned up
// This fires the status:change event to notify listeners
Expand Down
30 changes: 27 additions & 3 deletions packages/db/src/collection/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ type RequestSnapshotOptions = {
orderBy?: OrderBy
/** Optional limit to pass to loadSubset for backend optimization */
limit?: number
/** Callback that receives the raw loadSubset result for external tracking */
onLoadSubsetResult?: (result: Promise<void> | true) => void
}

type RequestLimitedSnapshotOptions = {
Expand All @@ -37,6 +39,10 @@ type RequestLimitedSnapshotOptions = {
minValues?: Array<unknown>
/** Row offset for offset-based pagination (passed to sync layer) */
offset?: number
/** Whether to track the loadSubset promise on this subscription (default: true) */
trackLoadSubsetPromise?: boolean
/** Callback that receives the raw loadSubset result for external tracking */
onLoadSubsetResult?: (result: Promise<void> | true) => void
}

type CollectionSubscriptionOptions = {
Expand Down Expand Up @@ -365,6 +371,9 @@ export class CollectionSubscription
}
const syncResult = this.collection._sync.loadSubset(loadOptions)

// Pass the raw loadSubset result to the caller for external tracking
opts?.onLoadSubsetResult?.(syncResult)

// Track this loadSubset call so we can unload it later
this.loadedSubsets.push(loadOptions)

Expand Down Expand Up @@ -416,6 +425,8 @@ export class CollectionSubscription
limit,
minValues,
offset,
trackLoadSubsetPromise: shouldTrackLoadSubsetPromise = true,
onLoadSubsetResult,
}: RequestLimitedSnapshotOptions) {
if (!limit) throw new Error(`limit is required`)

Expand Down Expand Up @@ -594,9 +605,14 @@ export class CollectionSubscription
}
const syncResult = this.collection._sync.loadSubset(loadOptions)

// Pass the raw loadSubset result to the caller for external tracking
onLoadSubsetResult?.(syncResult)

// Track this loadSubset call
this.loadedSubsets.push(loadOptions)
this.trackLoadSubsetPromise(syncResult)
if (shouldTrackLoadSubsetPromise) {
this.trackLoadSubsetPromise(syncResult)
}
}

// TODO: also add similar test but that checks that it can also load it from the collection's loadSubset function
Expand Down Expand Up @@ -667,13 +683,21 @@ export class CollectionSubscription

for (const change of changes) {
if (change.type === `delete`) {
// Remove deleted keys from sentKeys so future re-inserts are allowed
this.sentKeys.delete(change.key)
} else {
// For inserts and updates, track the key as sent
this.sentKeys.add(change.key)
}
}

// Keep the limited snapshot offset in sync with keys we've actually sent.
// This matters when loadSubset resolves asynchronously and requestLimitedSnapshot
// didn't have local rows to count yet.
if (this.orderByIndex) {
this.limitedSnapshotRowCount = Math.max(
this.limitedSnapshotRowCount,
this.sentKeys.size,
)
}
}

/**
Expand Down
9 changes: 4 additions & 5 deletions packages/db/src/query/live/collection-config-builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -830,17 +830,16 @@ export class CollectionConfigBuilder<
return
}

const subscribedToAll = this.currentSyncState?.subscribedToAllCollections
const allReady = this.allCollectionsReady()
const isLoading = this.liveQueryCollection?.isLoadingSubset
// Mark ready when:
// 1. All subscriptions are set up (subscribedToAllCollections)
// 2. All source collections are ready
// 3. The live query collection is not loading subset data
// This prevents marking the live query ready before its data is processed
// (fixes issue where useLiveQuery returns isReady=true with empty data)
if (
this.currentSyncState?.subscribedToAllCollections &&
this.allCollectionsReady() &&
!this.liveQueryCollection?.isLoadingSubset
) {
if (subscribedToAll && allReady && !isLoading) {
markReady()
}
}
Expand Down
Loading
Loading