Skip to content

Commit 111cc4b

Browse files
authored
query optimiser that pushes where clauses down to subqueries closer to the source (#256)
1 parent 6812da3 commit 111cc4b

File tree

7 files changed

+2343
-31
lines changed

7 files changed

+2343
-31
lines changed

.changeset/add-query-optimiser.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
"@tanstack/db": patch
3+
---
4+
5+
Add query optimizer with predicate pushdown
6+
7+
Implements automatic query optimization that moves WHERE clauses closer to data sources, reducing intermediate result sizes and improving performance for queries with joins.

packages/db/src/query/compiler/index.ts

Lines changed: 85 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { distinct, filter, map } from "@electric-sql/d2mini"
2+
import { optimizeQuery } from "../optimizer.js"
23
import { compileExpression } from "./evaluators.js"
34
import { processJoins } from "./joins.js"
45
import { processGroupBy } from "./group-by.js"
@@ -10,30 +11,35 @@ import type {
1011
NamespacedAndKeyedStream,
1112
ResultStream,
1213
} from "../../types.js"
13-
14-
/**
15-
* Cache for compiled subqueries to avoid duplicate compilation
16-
*/
17-
type QueryCache = WeakMap<QueryIR, ResultStream>
14+
import type { QueryCache, QueryMapping } from "./types.js"
1815

1916
/**
2017
* Compiles a query2 IR into a D2 pipeline
21-
* @param query The query IR to compile
18+
* @param rawQuery The query IR to compile
2219
* @param inputs Mapping of collection names to input streams
2320
* @param cache Optional cache for compiled subqueries (used internally for recursion)
21+
* @param queryMapping Optional mapping from optimized queries to original queries
2422
* @returns A stream builder representing the compiled query
2523
*/
2624
export function compileQuery(
27-
query: QueryIR,
25+
rawQuery: QueryIR,
2826
inputs: Record<string, KeyedStream>,
29-
cache: QueryCache = new WeakMap()
27+
cache: QueryCache = new WeakMap(),
28+
queryMapping: QueryMapping = new WeakMap()
3029
): ResultStream {
31-
// Check if this query has already been compiled
32-
const cachedResult = cache.get(query)
30+
// Check if the original raw query has already been compiled
31+
const cachedResult = cache.get(rawQuery)
3332
if (cachedResult) {
3433
return cachedResult
3534
}
3635

36+
// Optimize the query before compilation
37+
const query = optimizeQuery(rawQuery)
38+
39+
// Create mapping from optimized query to original for caching
40+
queryMapping.set(query, rawQuery)
41+
mapNestedQueries(query, rawQuery, queryMapping)
42+
3743
// Create a copy of the inputs map to avoid modifying the original
3844
const allInputs = { ...inputs }
3945

@@ -44,7 +50,8 @@ export function compileQuery(
4450
const { alias: mainTableAlias, input: mainInput } = processFrom(
4551
query.from,
4652
allInputs,
47-
cache
53+
cache,
54+
queryMapping
4855
)
4956
tables[mainTableAlias] = mainInput
5057

@@ -68,7 +75,8 @@ export function compileQuery(
6875
tables,
6976
mainTableAlias,
7077
allInputs,
71-
cache
78+
cache,
79+
queryMapping
7280
)
7381
}
7482

@@ -218,8 +226,8 @@ export function compileQuery(
218226
)
219227

220228
const result = resultPipeline
221-
// Cache the result before returning
222-
cache.set(query, result)
229+
// Cache the result before returning (use original query as key)
230+
cache.set(rawQuery, result)
223231
return result
224232
} else if (query.limit !== undefined || query.offset !== undefined) {
225233
// If there's a limit or offset without orderBy, throw an error
@@ -241,8 +249,8 @@ export function compileQuery(
241249
)
242250

243251
const result = resultPipeline
244-
// Cache the result before returning
245-
cache.set(query, result)
252+
// Cache the result before returning (use original query as key)
253+
cache.set(rawQuery, result)
246254
return result
247255
}
248256

@@ -252,7 +260,8 @@ export function compileQuery(
252260
function processFrom(
253261
from: CollectionRef | QueryRef,
254262
allInputs: Record<string, KeyedStream>,
255-
cache: QueryCache
263+
cache: QueryCache,
264+
queryMapping: QueryMapping
256265
): { alias: string; input: KeyedStream } {
257266
switch (from.type) {
258267
case `collectionRef`: {
@@ -265,8 +274,16 @@ function processFrom(
265274
return { alias: from.alias, input }
266275
}
267276
case `queryRef`: {
277+
// Find the original query for caching purposes
278+
const originalQuery = queryMapping.get(from.query) || from.query
279+
268280
// Recursively compile the sub-query with cache
269-
const subQueryInput = compileQuery(from.query, allInputs, cache)
281+
const subQueryInput = compileQuery(
282+
originalQuery,
283+
allInputs,
284+
cache,
285+
queryMapping
286+
)
270287

271288
// Subqueries may return [key, [value, orderByIndex]] (with ORDER BY) or [key, value] (without ORDER BY)
272289
// We need to extract just the value for use in parent queries
@@ -283,3 +300,53 @@ function processFrom(
283300
throw new Error(`Unsupported FROM type: ${(from as any).type}`)
284301
}
285302
}
303+
304+
/**
305+
* Recursively maps optimized subqueries to their original queries for proper caching.
306+
* This ensures that when we encounter the same QueryRef object in different contexts,
307+
* we can find the original query to check the cache.
308+
*/
309+
function mapNestedQueries(
310+
optimizedQuery: QueryIR,
311+
originalQuery: QueryIR,
312+
queryMapping: QueryMapping
313+
): void {
314+
// Map the FROM clause if it's a QueryRef
315+
if (
316+
optimizedQuery.from.type === `queryRef` &&
317+
originalQuery.from.type === `queryRef`
318+
) {
319+
queryMapping.set(optimizedQuery.from.query, originalQuery.from.query)
320+
// Recursively map nested queries
321+
mapNestedQueries(
322+
optimizedQuery.from.query,
323+
originalQuery.from.query,
324+
queryMapping
325+
)
326+
}
327+
328+
// Map JOIN clauses if they exist
329+
if (optimizedQuery.join && originalQuery.join) {
330+
for (
331+
let i = 0;
332+
i < optimizedQuery.join.length && i < originalQuery.join.length;
333+
i++
334+
) {
335+
const optimizedJoin = optimizedQuery.join[i]!
336+
const originalJoin = originalQuery.join[i]!
337+
338+
if (
339+
optimizedJoin.from.type === `queryRef` &&
340+
originalJoin.from.type === `queryRef`
341+
) {
342+
queryMapping.set(optimizedJoin.from.query, originalJoin.from.query)
343+
// Recursively map nested queries in joins
344+
mapNestedQueries(
345+
optimizedJoin.from.query,
346+
originalJoin.from.query,
347+
queryMapping
348+
)
349+
}
350+
}
351+
}
352+
}

packages/db/src/query/compiler/joins.ts

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,18 +7,13 @@ import {
77
import { compileExpression } from "./evaluators.js"
88
import { compileQuery } from "./index.js"
99
import type { IStreamBuilder, JoinType } from "@electric-sql/d2mini"
10-
import type { CollectionRef, JoinClause, QueryIR, QueryRef } from "../ir.js"
10+
import type { CollectionRef, JoinClause, QueryRef } from "../ir.js"
1111
import type {
1212
KeyedStream,
1313
NamespacedAndKeyedStream,
1414
NamespacedRow,
15-
ResultStream,
1615
} from "../../types.js"
17-
18-
/**
19-
* Cache for compiled subqueries to avoid duplicate compilation
20-
*/
21-
type QueryCache = WeakMap<QueryIR, ResultStream>
16+
import type { QueryCache, QueryMapping } from "./types.js"
2217

2318
/**
2419
* Processes all join clauses in a query
@@ -29,7 +24,8 @@ export function processJoins(
2924
tables: Record<string, KeyedStream>,
3025
mainTableAlias: string,
3126
allInputs: Record<string, KeyedStream>,
32-
cache: QueryCache
27+
cache: QueryCache,
28+
queryMapping: QueryMapping
3329
): NamespacedAndKeyedStream {
3430
let resultPipeline = pipeline
3531

@@ -40,7 +36,8 @@ export function processJoins(
4036
tables,
4137
mainTableAlias,
4238
allInputs,
43-
cache
39+
cache,
40+
queryMapping
4441
)
4542
}
4643

@@ -56,13 +53,15 @@ function processJoin(
5653
tables: Record<string, KeyedStream>,
5754
mainTableAlias: string,
5855
allInputs: Record<string, KeyedStream>,
59-
cache: QueryCache
56+
cache: QueryCache,
57+
queryMapping: QueryMapping
6058
): NamespacedAndKeyedStream {
6159
// Get the joined table alias and input stream
6260
const { alias: joinedTableAlias, input: joinedInput } = processJoinSource(
6361
joinClause.from,
6462
allInputs,
65-
cache
63+
cache,
64+
queryMapping
6665
)
6766

6867
// Add the joined table to the tables map
@@ -128,7 +127,8 @@ function processJoin(
128127
function processJoinSource(
129128
from: CollectionRef | QueryRef,
130129
allInputs: Record<string, KeyedStream>,
131-
cache: QueryCache
130+
cache: QueryCache,
131+
queryMapping: QueryMapping
132132
): { alias: string; input: KeyedStream } {
133133
switch (from.type) {
134134
case `collectionRef`: {
@@ -141,8 +141,16 @@ function processJoinSource(
141141
return { alias: from.alias, input }
142142
}
143143
case `queryRef`: {
144+
// Find the original query for caching purposes
145+
const originalQuery = queryMapping.get(from.query) || from.query
146+
144147
// Recursively compile the sub-query with cache
145-
const subQueryInput = compileQuery(from.query, allInputs, cache)
148+
const subQueryInput = compileQuery(
149+
originalQuery,
150+
allInputs,
151+
cache,
152+
queryMapping
153+
)
146154

147155
// Subqueries may return [key, [value, orderByIndex]] (with ORDER BY) or [key, value] (without ORDER BY)
148156
// We need to extract just the value for use in parent queries
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
import type { QueryIR } from "../ir.js"
2+
import type { ResultStream } from "../../types.js"
3+
4+
/**
5+
* Cache for compiled subqueries to avoid duplicate compilation
6+
*/
7+
export type QueryCache = WeakMap<QueryIR, ResultStream>
8+
9+
/**
10+
* Mapping from optimized queries back to their original queries for caching
11+
*/
12+
export type QueryMapping = WeakMap<QueryIR, QueryIR>

0 commit comments

Comments
 (0)