Skip to content

Commit c7bd2f8

Browse files
committed
Add initial TrailBase integration and tests.
1 parent 836c46e commit c7bd2f8

File tree

6 files changed

+1522
-37
lines changed

6 files changed

+1522
-37
lines changed

packages/db-collections/package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@
66
"@tanstack/db": "workspace:*",
77
"@tanstack/query-core": "^5.75.7",
88
"@standard-schema/spec": "^1.0.0",
9-
"@tanstack/store": "^0.7.0"
9+
"@tanstack/store": "^0.7.0",
10+
"trailbase": "^0.7.1"
1011
},
1112
"devDependencies": {
1213
"@electric-sql/client": "1.0.0",

packages/db-collections/src/index.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,8 @@ export {
1515
type StorageApi,
1616
type StorageEventApi,
1717
} from "./local-storage"
18+
export {
19+
trailBaseCollectionOptions,
20+
type TrailBaseCollectionConfig,
21+
type TrailBaseCollectionUtils,
22+
} from "./trailbase"
Lines changed: 243 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,243 @@
1+
/* eslint-disable @typescript-eslint/no-unnecessary-condition */
2+
import { Store } from "@tanstack/store"
3+
import type { Event, RecordApi } from "trailbase"
4+
5+
import type { CollectionConfig, SyncConfig, UtilsRecord } from "@tanstack/db"
6+
7+
/**
8+
* Configuration interface for TrailbaseCollection
9+
*/
10+
export interface TrailBaseCollectionConfig<
11+
TItem extends object,
12+
TKey extends string | number = string | number,
13+
> extends Omit<
14+
CollectionConfig<TItem, TKey>,
15+
`sync` | `onInsert` | `onUpdate` | `onDelete`
16+
> {
17+
/**
18+
* Record API name
19+
*/
20+
recordApi: RecordApi<TItem>
21+
}
22+
23+
export type AwaitTxIdFn = (txId: string, timeout?: number) => Promise<boolean>
24+
25+
export interface TrailBaseCollectionUtils extends UtilsRecord {
26+
cancel: () => void
27+
}
28+
29+
export function trailBaseCollectionOptions<TItem extends object>(
30+
config: TrailBaseCollectionConfig<TItem>
31+
): CollectionConfig<TItem> & { utils: TrailBaseCollectionUtils } {
32+
const getKey = config.getKey
33+
34+
const seenIds = new Store(new Map<string, number>())
35+
36+
const awaitIds = (
37+
ids: Array<string>,
38+
timeout: number = 120 * 1000
39+
): Promise<void> => {
40+
const completed = (value: Map<string, number>) =>
41+
ids.every((id) => value.has(id))
42+
if (completed(seenIds.state)) {
43+
return Promise.resolve()
44+
}
45+
46+
return new Promise<void>((resolve, reject) => {
47+
const timeoutId = setTimeout(() => {
48+
unsubscribe()
49+
reject(new Error(`Timeout waiting for ids: ${ids}`))
50+
}, timeout)
51+
52+
const unsubscribe = seenIds.subscribe((value) => {
53+
if (completed(value.currentVal)) {
54+
clearTimeout(timeoutId)
55+
unsubscribe()
56+
resolve()
57+
}
58+
})
59+
})
60+
}
61+
62+
const weakSeenIds = new WeakRef(seenIds)
63+
const cleanupTimer = setInterval(() => {
64+
const seen = weakSeenIds.deref()
65+
if (seen) {
66+
seen.setState((curr) => {
67+
const now = Date.now()
68+
let anyExpired = false
69+
70+
const notExpired = Array.from(curr.entries()).filter(([_, v]) => {
71+
const expired = now - v > 300 * 1000
72+
anyExpired = anyExpired || expired
73+
return !expired
74+
})
75+
76+
if (anyExpired) {
77+
return new Map(notExpired)
78+
}
79+
return curr
80+
})
81+
} else {
82+
clearInterval(cleanupTimer)
83+
}
84+
}, 120 * 1000)
85+
86+
type SyncParams = Parameters<SyncConfig<TItem>[`sync`]>[0]
87+
88+
let eventReader: ReadableStreamDefaultReader<Event> | undefined
89+
const sync = {
90+
sync: (params: SyncParams) => {
91+
const { begin, write, commit } = params
92+
93+
// Initial fetch.
94+
async function initialFetch() {
95+
const limit = 256
96+
let response = await config.recordApi.list({
97+
pagination: {
98+
limit,
99+
},
100+
})
101+
let cursor = response.cursor
102+
let got = 0
103+
104+
begin()
105+
106+
while (true) {
107+
const length = response.records.length
108+
if (length === 0) break
109+
110+
got = got + length
111+
for (const item of response.records) {
112+
write({ type: `insert`, value: item })
113+
}
114+
115+
if (length < limit) break
116+
117+
response = await config.recordApi.list({
118+
pagination: {
119+
limit,
120+
cursor,
121+
offset: cursor === undefined ? got : undefined,
122+
},
123+
})
124+
cursor = response.cursor
125+
}
126+
127+
commit()
128+
}
129+
130+
// Afterwards subscribe.
131+
async function subscribe() {
132+
const eventStream = await config.recordApi.subscribe(`*`)
133+
const reader = (eventReader = eventStream.getReader())
134+
135+
while (true) {
136+
const { done, value: event } = await reader.read()
137+
138+
if (done || !event) {
139+
reader.releaseLock()
140+
eventReader = undefined
141+
return
142+
}
143+
144+
begin()
145+
let value: TItem | undefined
146+
if (`Insert` in event) {
147+
value = event.Insert as TItem
148+
write({ type: `insert`, value })
149+
} else if (`Delete` in event) {
150+
value = event.Delete as TItem
151+
write({ type: `delete`, value })
152+
} else if (`Update` in event) {
153+
value = event.Update as TItem
154+
write({ type: `update`, value })
155+
} else {
156+
console.error(`Error: ${event.Error}`)
157+
}
158+
commit()
159+
160+
if (value) {
161+
seenIds.setState((curr) => {
162+
const newIds = new Map(curr)
163+
newIds.set(String(getKey(value)), Date.now())
164+
return newIds
165+
})
166+
}
167+
}
168+
}
169+
170+
initialFetch().then(() => subscribe())
171+
},
172+
// Expose the getSyncMetadata function
173+
getSyncMetadata: undefined,
174+
}
175+
176+
return {
177+
sync,
178+
getKey,
179+
onInsert: async (params): Promise<Array<number | string>> => {
180+
const ids = await config.recordApi.createBulk(
181+
params.transaction.mutations.map((tx) => {
182+
const { type, changes } = tx
183+
if (type !== `insert`) {
184+
throw new Error(`Expected 'insert', got: ${type}`)
185+
}
186+
return changes as TItem
187+
})
188+
)
189+
190+
// The optimistic mutation overlay is removed on return, so at this point
191+
// we have to ensure that the new record was properly added to the local
192+
// DB by the subscription.
193+
await awaitIds(ids.map((id) => String(id)))
194+
195+
return ids
196+
},
197+
onUpdate: async (params) => {
198+
const ids: Array<string> = await Promise.all(
199+
params.transaction.mutations.map(async (tx) => {
200+
const { type, changes, key } = tx
201+
if (type !== `update`) {
202+
throw new Error(`Expected 'update', got: ${type}`)
203+
}
204+
205+
await config.recordApi.update(key, changes)
206+
return String(key)
207+
})
208+
)
209+
210+
// The optimistic mutation overlay is removed on return, so at this point
211+
// we have to ensure that the new record was properly updated in the local
212+
// DB by the subscription.
213+
await awaitIds(ids)
214+
},
215+
onDelete: async (params) => {
216+
const ids: Array<string> = await Promise.all(
217+
params.transaction.mutations.map(async (tx) => {
218+
const { type, key } = tx
219+
if (type !== `delete`) {
220+
throw new Error(`Expected 'delete', got: ${type}`)
221+
}
222+
223+
await config.recordApi.delete(key)
224+
return String(key)
225+
})
226+
)
227+
228+
// The optimistic mutation overlay is removed on return, so at this point
229+
// we have to ensure that the new record was properly updated in the local
230+
// DB by the subscription.
231+
await awaitIds(ids)
232+
},
233+
utils: {
234+
cancel: () => {
235+
if (eventReader) {
236+
eventReader.cancel()
237+
eventReader.releaseLock()
238+
eventReader = undefined
239+
}
240+
},
241+
},
242+
}
243+
}

0 commit comments

Comments
 (0)