Skip to content

query optimiser that pushes where clauses down to subqueries closer to the source #256

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/add-query-optimiser.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@tanstack/db": patch
---

Add query optimizer with predicate pushdown

Implements automatic query optimization that moves WHERE clauses closer to data sources, reducing intermediate result sizes and improving performance for queries with joins.
103 changes: 85 additions & 18 deletions packages/db/src/query/compiler/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { distinct, filter, map } from "@electric-sql/d2mini"
import { optimizeQuery } from "../optimizer.js"
import { compileExpression } from "./evaluators.js"
import { processJoins } from "./joins.js"
import { processGroupBy } from "./group-by.js"
Expand All @@ -10,30 +11,35 @@ import type {
NamespacedAndKeyedStream,
ResultStream,
} from "../../types.js"

/**
* Cache for compiled subqueries to avoid duplicate compilation
*/
type QueryCache = WeakMap<QueryIR, ResultStream>
import type { QueryCache, QueryMapping } from "./types.js"

/**
* Compiles a query2 IR into a D2 pipeline
* @param query The query IR to compile
* @param rawQuery The query IR to compile
* @param inputs Mapping of collection names to input streams
* @param cache Optional cache for compiled subqueries (used internally for recursion)
* @param queryMapping Optional mapping from optimized queries to original queries
* @returns A stream builder representing the compiled query
*/
export function compileQuery(
query: QueryIR,
rawQuery: QueryIR,
inputs: Record<string, KeyedStream>,
cache: QueryCache = new WeakMap()
cache: QueryCache = new WeakMap(),
queryMapping: QueryMapping = new WeakMap()
): ResultStream {
// Check if this query has already been compiled
const cachedResult = cache.get(query)
// Check if the original raw query has already been compiled
const cachedResult = cache.get(rawQuery)
if (cachedResult) {
return cachedResult
}

// Optimize the query before compilation
const query = optimizeQuery(rawQuery)

// Create mapping from optimized query to original for caching
queryMapping.set(query, rawQuery)
mapNestedQueries(query, rawQuery, queryMapping)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really like that we're passing around mutable state and relying on mapNestedQueries to mutate that state. It feels like this should be a class since it keeps some state and provides methods over that state.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, it's getting quite complex. I would like to refactor the compiler into a class, but let's do it in a follow-up PR or this will blow up.


// Create a copy of the inputs map to avoid modifying the original
const allInputs = { ...inputs }

Expand All @@ -44,7 +50,8 @@ export function compileQuery(
const { alias: mainTableAlias, input: mainInput } = processFrom(
query.from,
allInputs,
cache
cache,
queryMapping
)
tables[mainTableAlias] = mainInput

Expand All @@ -68,7 +75,8 @@ export function compileQuery(
tables,
mainTableAlias,
allInputs,
cache
cache,
queryMapping
)
}

Expand Down Expand Up @@ -218,8 +226,8 @@ export function compileQuery(
)

const result = resultPipeline
// Cache the result before returning
cache.set(query, result)
// Cache the result before returning (use original query as key)
cache.set(rawQuery, result)
return result
} else if (query.limit !== undefined || query.offset !== undefined) {
// If there's a limit or offset without orderBy, throw an error
Expand All @@ -241,8 +249,8 @@ export function compileQuery(
)

const result = resultPipeline
// Cache the result before returning
cache.set(query, result)
// Cache the result before returning (use original query as key)
cache.set(rawQuery, result)
return result
}

Expand All @@ -252,7 +260,8 @@ export function compileQuery(
function processFrom(
from: CollectionRef | QueryRef,
allInputs: Record<string, KeyedStream>,
cache: QueryCache
cache: QueryCache,
queryMapping: QueryMapping
): { alias: string; input: KeyedStream } {
switch (from.type) {
case `collectionRef`: {
Expand All @@ -265,8 +274,16 @@ function processFrom(
return { alias: from.alias, input }
}
case `queryRef`: {
// Find the original query for caching purposes
const originalQuery = queryMapping.get(from.query) || from.query

// Recursively compile the sub-query with cache
const subQueryInput = compileQuery(from.query, allInputs, cache)
const subQueryInput = compileQuery(
originalQuery,
allInputs,
cache,
queryMapping
)

// Subqueries may return [key, [value, orderByIndex]] (with ORDER BY) or [key, value] (without ORDER BY)
// We need to extract just the value for use in parent queries
Expand All @@ -283,3 +300,53 @@ function processFrom(
throw new Error(`Unsupported FROM type: ${(from as any).type}`)
}
}

/**
* Recursively maps optimized subqueries to their original queries for proper caching.
* This ensures that when we encounter the same QueryRef object in different contexts,
* we can find the original query to check the cache.
*/
function mapNestedQueries(
optimizedQuery: QueryIR,
originalQuery: QueryIR,
queryMapping: QueryMapping
): void {
// Map the FROM clause if it's a QueryRef
if (
optimizedQuery.from.type === `queryRef` &&
originalQuery.from.type === `queryRef`
) {
queryMapping.set(optimizedQuery.from.query, originalQuery.from.query)
// Recursively map nested queries
mapNestedQueries(
optimizedQuery.from.query,
originalQuery.from.query,
queryMapping
)
}

// Map JOIN clauses if they exist
if (optimizedQuery.join && originalQuery.join) {
for (
let i = 0;
i < optimizedQuery.join.length && i < originalQuery.join.length;
i++
) {
const optimizedJoin = optimizedQuery.join[i]!
const originalJoin = originalQuery.join[i]!

if (
optimizedJoin.from.type === `queryRef` &&
originalJoin.from.type === `queryRef`
) {
queryMapping.set(optimizedJoin.from.query, originalJoin.from.query)
// Recursively map nested queries in joins
mapNestedQueries(
optimizedJoin.from.query,
originalJoin.from.query,
queryMapping
)
}
}
}
}
34 changes: 21 additions & 13 deletions packages/db/src/query/compiler/joins.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,13 @@ import {
import { compileExpression } from "./evaluators.js"
import { compileQuery } from "./index.js"
import type { IStreamBuilder, JoinType } from "@electric-sql/d2mini"
import type { CollectionRef, JoinClause, QueryIR, QueryRef } from "../ir.js"
import type { CollectionRef, JoinClause, QueryRef } from "../ir.js"
import type {
KeyedStream,
NamespacedAndKeyedStream,
NamespacedRow,
ResultStream,
} from "../../types.js"

/**
* Cache for compiled subqueries to avoid duplicate compilation
*/
type QueryCache = WeakMap<QueryIR, ResultStream>
import type { QueryCache, QueryMapping } from "./types.js"

/**
* Processes all join clauses in a query
Expand All @@ -29,7 +24,8 @@ export function processJoins(
tables: Record<string, KeyedStream>,
mainTableAlias: string,
allInputs: Record<string, KeyedStream>,
cache: QueryCache
cache: QueryCache,
queryMapping: QueryMapping
): NamespacedAndKeyedStream {
let resultPipeline = pipeline

Expand All @@ -40,7 +36,8 @@ export function processJoins(
tables,
mainTableAlias,
allInputs,
cache
cache,
queryMapping
)
}

Expand All @@ -56,13 +53,15 @@ function processJoin(
tables: Record<string, KeyedStream>,
mainTableAlias: string,
allInputs: Record<string, KeyedStream>,
cache: QueryCache
cache: QueryCache,
queryMapping: QueryMapping
): NamespacedAndKeyedStream {
// Get the joined table alias and input stream
const { alias: joinedTableAlias, input: joinedInput } = processJoinSource(
joinClause.from,
allInputs,
cache
cache,
queryMapping
)

// Add the joined table to the tables map
Expand Down Expand Up @@ -128,7 +127,8 @@ function processJoin(
function processJoinSource(
from: CollectionRef | QueryRef,
allInputs: Record<string, KeyedStream>,
cache: QueryCache
cache: QueryCache,
queryMapping: QueryMapping
): { alias: string; input: KeyedStream } {
switch (from.type) {
case `collectionRef`: {
Expand All @@ -141,8 +141,16 @@ function processJoinSource(
return { alias: from.alias, input }
}
case `queryRef`: {
// Find the original query for caching purposes
const originalQuery = queryMapping.get(from.query) || from.query

// Recursively compile the sub-query with cache
const subQueryInput = compileQuery(from.query, allInputs, cache)
const subQueryInput = compileQuery(
originalQuery,
allInputs,
cache,
queryMapping
)

// Subqueries may return [key, [value, orderByIndex]] (with ORDER BY) or [key, value] (without ORDER BY)
// We need to extract just the value for use in parent queries
Expand Down
12 changes: 12 additions & 0 deletions packages/db/src/query/compiler/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import type { QueryIR } from "../ir.js"
import type { ResultStream } from "../../types.js"

/**
* Cache for compiled subqueries to avoid duplicate compilation
*/
export type QueryCache = WeakMap<QueryIR, ResultStream>

/**
* Mapping from optimized queries back to their original queries for caching
*/
export type QueryMapping = WeakMap<QueryIR, QueryIR>
Loading