Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/includes-aggregates.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@tanstack/db': patch
---

fix: support aggregates (e.g. count) in child/includes subqueries with per-parent scoping
89 changes: 60 additions & 29 deletions packages/db/src/query/compiler/group-by.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ export function processGroupBy(
havingClauses?: Array<Having>,
selectClause?: Select,
fnHavingClauses?: Array<(row: any) => any>,
mainSource?: string,
): NamespacedAndKeyedStream {
// Handle empty GROUP BY (single-group aggregation)
if (groupByClause.length === 0) {
Expand Down Expand Up @@ -110,8 +111,15 @@ export function processGroupBy(
}
}

// Use a constant key for single group
const keyExtractor = () => ({ __singleGroup: true })
// Use a constant key for single group.
// When mainSource is set (includes mode), include __correlationKey so that
// rows from different parents aggregate separately.
const keyExtractor = mainSource
? ([, row]: [string, NamespacedRow]) => ({
__singleGroup: true,
__correlationKey: (row as any)?.[mainSource]?.__correlationKey,
})
: () => ({ __singleGroup: true })

// Apply the groupBy operator with single group
pipeline = pipeline.pipe(
Expand Down Expand Up @@ -139,14 +147,24 @@ export function processGroupBy(
)
}

// Use a single key for the result and update $selected
return [
`single_group`,
{
...aggregatedRow,
$selected: finalResults,
},
] as [unknown, Record<string, any>]
// Use a single key for the result and update $selected.
// When in includes mode, restore the namespaced source structure with
// __correlationKey so output extraction can route results per-parent.
const correlationKey = mainSource
? (aggregatedRow as any).__correlationKey
: undefined
const resultKey =
correlationKey !== undefined
? `single_group_${serializeValue(correlationKey)}`
: `single_group`
const resultRow: Record<string, any> = {
...aggregatedRow,
$selected: finalResults,
}
if (mainSource && correlationKey !== undefined) {
resultRow[mainSource] = { __correlationKey: correlationKey }
}
return [resultKey, resultRow] as [unknown, Record<string, any>]
}),
)

Expand Down Expand Up @@ -196,7 +214,9 @@ export function processGroupBy(
compileExpression(e),
)

// Create a key extractor function using simple __key_X format
// Create a key extractor function using simple __key_X format.
// When mainSource is set (includes mode), include __correlationKey so that
// rows from different parents with the same group key aggregate separately.
const keyExtractor = ([, row]: [
string,
NamespacedRow & { $selected?: any },
Expand All @@ -214,6 +234,10 @@ export function processGroupBy(
key[`__key_${i}`] = value
}

if (mainSource) {
key.__correlationKey = (row as any)?.[mainSource]?.__correlationKey
}

return key
}

Expand Down Expand Up @@ -278,25 +302,32 @@ export function processGroupBy(
}
}

// Generate a simple key for the live collection using group values
let finalKey: unknown
if (groupByClause.length === 1) {
finalKey = aggregatedRow[`__key_0`]
} else {
const keyParts: Array<unknown> = []
for (let i = 0; i < groupByClause.length; i++) {
keyParts.push(aggregatedRow[`__key_${i}`])
}
finalKey = serializeValue(keyParts)
// Generate a simple key for the live collection using group values.
// When in includes mode, include the correlation key so that groups
// from different parents don't collide.
const correlationKey = mainSource
? (aggregatedRow as any).__correlationKey
: undefined
const keyParts: Array<unknown> = []
for (let i = 0; i < groupByClause.length; i++) {
keyParts.push(aggregatedRow[`__key_${i}`])
}

return [
finalKey,
{
...aggregatedRow,
$selected: finalResults,
},
] as [unknown, Record<string, any>]
if (correlationKey !== undefined) {
keyParts.push(correlationKey)
}
const finalKey =
keyParts.length === 1 ? keyParts[0] : serializeValue(keyParts)

// When in includes mode, restore the namespaced source structure with
// __correlationKey so output extraction can route results per-parent.
const resultRow: Record<string, any> = {
...aggregatedRow,
$selected: finalResults,
}
if (mainSource && correlationKey !== undefined) {
resultRow[mainSource] = { __correlationKey: correlationKey }
}
return [finalKey, resultRow] as [unknown, Record<string, any>]
}),
)

Expand Down
7 changes: 6 additions & 1 deletion packages/db/src/query/compiler/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -372,14 +372,18 @@ export function compileQuery(
)
}

// Process the GROUP BY clause if it exists
// Process the GROUP BY clause if it exists.
// When in includes mode (parentKeyStream), pass mainSource so that groupBy
// preserves __correlationKey for per-parent aggregation.
const groupByMainSource = parentKeyStream ? mainSource : undefined
if (query.groupBy && query.groupBy.length > 0) {
pipeline = processGroupBy(
pipeline,
query.groupBy,
query.having,
query.select,
query.fnHaving,
groupByMainSource,
)
} else if (query.select) {
// Check if SELECT contains aggregates but no GROUP BY (implicit single-group aggregation)
Expand All @@ -394,6 +398,7 @@ export function compileQuery(
query.having,
query.select,
query.fnHaving,
groupByMainSource,
)
}
}
Expand Down
215 changes: 215 additions & 0 deletions packages/db/tests/query/includes.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { beforeEach, describe, expect, it } from 'vitest'
import {
count,
createLiveQueryCollection,
eq,
toArray,
Expand Down Expand Up @@ -1512,4 +1513,218 @@ describe(`includes subqueries`, () => {
expect(issue11.comments).toEqual([{ id: 110, body: `Great feature` }])
})
})

// Aggregates in child queries: the aggregate (e.g. count) should be computed
// per-parent, not globally across all parents. Currently, the correlation key
// is lost after GROUP BY, causing all child rows to aggregate into a single
// global result rather than per-parent results.
describe(`aggregates in child queries`, () => {
describe(`single-group aggregate: count issues per project (as Collection)`, () => {
function buildAggregateQuery() {
return createLiveQueryCollection((q) =>
q.from({ p: projects }).select(({ p }) => ({
id: p.id,
name: p.name,
issueCount: q
.from({ i: issues })
.where(({ i }) => eq(i.projectId, p.id))
.select(({ i }) => ({ total: count(i.id) })),
})),
)
}

it(`each project gets its own aggregate result`, async () => {
const collection = buildAggregateQuery()
await collection.preload()

// Alpha has 2 issues
const alpha = collection.get(1) as any
expect(childItems(alpha.issueCount, `total`)).toEqual([{ total: 2 }])

// Beta has 1 issue
const beta = collection.get(2) as any
expect(childItems(beta.issueCount, `total`)).toEqual([{ total: 1 }])

// Gamma has 0 issues — no matching rows means empty Collection
const gamma = collection.get(3) as any
expect(childItems(gamma.issueCount, `total`)).toEqual([])
})

it(`adding an issue updates the count for that parent`, async () => {
const collection = buildAggregateQuery()
await collection.preload()

// Gamma starts with 0 issues
expect(
childItems((collection.get(3) as any).issueCount, `total`),
).toEqual([])

issues.utils.begin()
issues.utils.write({
type: `insert`,
value: { id: 30, projectId: 3, title: `Gamma issue` },
})
issues.utils.commit()

// Gamma now has 1 issue
expect(
childItems((collection.get(3) as any).issueCount, `total`),
).toEqual([{ total: 1 }])

// Alpha should still have 2
expect(
childItems((collection.get(1) as any).issueCount, `total`),
).toEqual([{ total: 2 }])
})

it(`removing an issue updates the count for that parent`, async () => {
const collection = buildAggregateQuery()
await collection.preload()

// Alpha starts with 2 issues
expect(
childItems((collection.get(1) as any).issueCount, `total`),
).toEqual([{ total: 2 }])

issues.utils.begin()
issues.utils.write({
type: `delete`,
value: sampleIssues.find((i) => i.id === 10)!,
})
issues.utils.commit()

// Alpha now has 1 issue
expect(
childItems((collection.get(1) as any).issueCount, `total`),
).toEqual([{ total: 1 }])

// Beta should still have 1
expect(
childItems((collection.get(2) as any).issueCount, `total`),
).toEqual([{ total: 1 }])
})
})

describe(`single-group aggregate: count issues per project (as toArray)`, () => {
function buildAggregateToArrayQuery() {
return createLiveQueryCollection((q) =>
q.from({ p: projects }).select(({ p }) => ({
id: p.id,
name: p.name,
issueCount: toArray(
q
.from({ i: issues })
.where(({ i }) => eq(i.projectId, p.id))
.select(({ i }) => ({ total: count(i.id) })),
),
})),
)
}

it(`each project gets its own aggregate result as an array`, async () => {
const collection = buildAggregateToArrayQuery()
await collection.preload()

// Alpha has 2 issues
const alpha = collection.get(1) as any
expect(alpha.issueCount).toEqual([{ total: 2 }])

// Beta has 1 issue
const beta = collection.get(2) as any
expect(beta.issueCount).toEqual([{ total: 1 }])

// Gamma has 0 issues — empty array
const gamma = collection.get(3) as any
expect(gamma.issueCount).toEqual([])
})
})

describe(`nested aggregate: count comments per issue (as Collection)`, () => {
function buildNestedAggregateQuery() {
return createLiveQueryCollection((q) =>
q.from({ p: projects }).select(({ p }) => ({
id: p.id,
name: p.name,
issues: q
.from({ i: issues })
.where(({ i }) => eq(i.projectId, p.id))
.select(({ i }) => ({
id: i.id,
title: i.title,
commentCount: q
.from({ c: comments })
.where(({ c }) => eq(c.issueId, i.id))
.select(({ c }) => ({ total: count(c.id) })),
})),
})),
)
}

it(`each issue gets its own comment count`, async () => {
const collection = buildNestedAggregateQuery()
await collection.preload()

// Alpha's issues
const alpha = collection.get(1) as any
const issue10 = alpha.issues.get(10)
expect(childItems(issue10.commentCount, `total`)).toEqual([
{ total: 2 },
])

const issue11 = alpha.issues.get(11)
// Issue 11 has 0 comments — empty Collection
expect(childItems(issue11.commentCount, `total`)).toEqual([])

// Beta's issue
const beta = collection.get(2) as any
const issue20 = beta.issues.get(20)
expect(childItems(issue20.commentCount, `total`)).toEqual([
{ total: 1 },
])
})
})

describe(`nested aggregate: count comments per issue (as toArray)`, () => {
function buildNestedAggregateToArrayQuery() {
return createLiveQueryCollection((q) =>
q.from({ p: projects }).select(({ p }) => ({
id: p.id,
name: p.name,
issues: q
.from({ i: issues })
.where(({ i }) => eq(i.projectId, p.id))
.select(({ i }) => ({
id: i.id,
title: i.title,
commentCount: toArray(
q
.from({ c: comments })
.where(({ c }) => eq(c.issueId, i.id))
.select(({ c }) => ({ total: count(c.id) })),
),
})),
})),
)
}

it(`each issue gets its own comment count as an array`, async () => {
const collection = buildNestedAggregateToArrayQuery()
await collection.preload()

// Alpha's issues
const alpha = collection.get(1) as any
const issue10 = alpha.issues.get(10)
expect(issue10.commentCount).toEqual([{ total: 2 }])

const issue11 = alpha.issues.get(11)
// Issue 11 has 0 comments — empty array
expect(issue11.commentCount).toEqual([])

// Beta's issue
const beta = collection.get(2) as any
const issue20 = beta.issues.get(20)
expect(issue20.commentCount).toEqual([{ total: 1 }])
})
})
})
})
Loading