Skip to content

Commit c04d359

Browse files
committed
Add initial TrailBase integration and tests.
1 parent 836c46e commit c04d359

File tree

6 files changed

+1539
-37
lines changed

6 files changed

+1539
-37
lines changed

packages/db-collections/package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@
66
"@tanstack/db": "workspace:*",
77
"@tanstack/query-core": "^5.75.7",
88
"@standard-schema/spec": "^1.0.0",
9-
"@tanstack/store": "^0.7.0"
9+
"@tanstack/store": "^0.7.0",
10+
"trailbase": "^0.7.1"
1011
},
1112
"devDependencies": {
1213
"@electric-sql/client": "1.0.0",

packages/db-collections/src/index.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,8 @@ export {
1515
type StorageApi,
1616
type StorageEventApi,
1717
} from "./local-storage"
18+
export {
19+
trailBaseCollectionOptions,
20+
type TrailBaseCollectionConfig,
21+
type TrailBaseCollectionUtils,
22+
} from "./trailbase"
Lines changed: 259 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,259 @@
1+
/* eslint-disable @typescript-eslint/no-unnecessary-condition */
2+
import { Store } from "@tanstack/store"
3+
import type { Event, RecordApi } from "trailbase"
4+
5+
import type { CollectionConfig, SyncConfig, UtilsRecord } from "@tanstack/db"
6+
7+
/**
8+
* Configuration interface for TrailbaseCollection
9+
*/
10+
export interface TrailBaseCollectionConfig<
11+
TItem extends object,
12+
TKey extends string | number = string | number,
13+
> extends Omit<
14+
CollectionConfig<TItem, TKey>,
15+
`sync` | `onInsert` | `onUpdate` | `onDelete`
16+
> {
17+
/**
18+
* Record API name
19+
*/
20+
recordApi: RecordApi<TItem>
21+
}
22+
23+
export type AwaitTxIdFn = (txId: string, timeout?: number) => Promise<boolean>
24+
25+
export interface TrailBaseCollectionUtils extends UtilsRecord {
26+
cancel: () => void
27+
}
28+
29+
export function trailBaseCollectionOptions<TItem extends object>(
30+
config: TrailBaseCollectionConfig<TItem>
31+
): CollectionConfig<TItem> & { utils: TrailBaseCollectionUtils } {
32+
const getKey = config.getKey
33+
34+
const seenIds = new Store(new Map<string, number>())
35+
36+
const awaitIds = (
37+
ids: Array<string>,
38+
timeout: number = 120 * 1000
39+
): Promise<void> => {
40+
const completed = (value: Map<string, number>) =>
41+
ids.every((id) => value.has(id))
42+
if (completed(seenIds.state)) {
43+
return Promise.resolve()
44+
}
45+
46+
return new Promise<void>((resolve, reject) => {
47+
const timeoutId = setTimeout(() => {
48+
unsubscribe()
49+
reject(new Error(`Timeout waiting for ids: ${ids}`))
50+
}, timeout)
51+
52+
const unsubscribe = seenIds.subscribe((value) => {
53+
if (completed(value.currentVal)) {
54+
clearTimeout(timeoutId)
55+
unsubscribe()
56+
resolve()
57+
}
58+
})
59+
})
60+
}
61+
62+
const weakSeenIds = new WeakRef(seenIds)
63+
const cleanupTimer = setInterval(() => {
64+
const seen = weakSeenIds.deref()
65+
if (seen) {
66+
seen.setState((curr) => {
67+
const now = Date.now()
68+
let anyExpired = false
69+
70+
const notExpired = Array.from(curr.entries()).filter(([_, v]) => {
71+
const expired = now - v > 300 * 1000
72+
anyExpired = anyExpired || expired
73+
return !expired
74+
})
75+
76+
if (anyExpired) {
77+
return new Map(notExpired)
78+
}
79+
return curr
80+
})
81+
} else {
82+
clearInterval(cleanupTimer)
83+
}
84+
}, 120 * 1000)
85+
86+
type SyncParams = Parameters<SyncConfig<TItem>[`sync`]>[0]
87+
88+
let eventReader: ReadableStreamDefaultReader<Event> | undefined
89+
const cancel = () => {
90+
if (eventReader) {
91+
eventReader.cancel()
92+
eventReader.releaseLock()
93+
eventReader = undefined
94+
}
95+
}
96+
97+
const sync = {
98+
sync: (params: SyncParams) => {
99+
const { begin, write, commit } = params
100+
101+
// Initial fetch.
102+
async function initialFetch() {
103+
const limit = 256
104+
let response = await config.recordApi.list({
105+
pagination: {
106+
limit,
107+
},
108+
})
109+
let cursor = response.cursor
110+
let got = 0
111+
112+
begin()
113+
114+
while (true) {
115+
const length = response.records.length
116+
if (length === 0) break
117+
118+
got = got + length
119+
for (const item of response.records) {
120+
write({ type: `insert`, value: item })
121+
}
122+
123+
if (length < limit) break
124+
125+
response = await config.recordApi.list({
126+
pagination: {
127+
limit,
128+
cursor,
129+
offset: cursor === undefined ? got : undefined,
130+
},
131+
})
132+
cursor = response.cursor
133+
}
134+
135+
commit()
136+
}
137+
138+
// Afterwards subscribe.
139+
async function listen(reader: ReadableStreamDefaultReader<Event>) {
140+
while (true) {
141+
const { done, value: event } = await reader.read()
142+
143+
if (done || !event) {
144+
reader.releaseLock()
145+
eventReader = undefined
146+
return
147+
}
148+
149+
begin()
150+
let value: TItem | undefined
151+
if (`Insert` in event) {
152+
value = event.Insert as TItem
153+
write({ type: `insert`, value })
154+
} else if (`Delete` in event) {
155+
value = event.Delete as TItem
156+
write({ type: `delete`, value })
157+
} else if (`Update` in event) {
158+
value = event.Update as TItem
159+
write({ type: `update`, value })
160+
} else {
161+
console.error(`Error: ${event.Error}`)
162+
}
163+
commit()
164+
165+
if (value) {
166+
seenIds.setState((curr) => {
167+
const newIds = new Map(curr)
168+
newIds.set(String(getKey(value)), Date.now())
169+
return newIds
170+
})
171+
}
172+
}
173+
}
174+
175+
async function start() {
176+
const eventStream = await config.recordApi.subscribe(`*`)
177+
const reader = (eventReader = eventStream.getReader())
178+
179+
// Start listening for subscriptions first. Otherwise, we'd risk a gap
180+
// between the initial fetch and starting to listen.
181+
listen(reader)
182+
183+
try {
184+
await initialFetch()
185+
} catch (e) {
186+
cancel()
187+
throw e
188+
}
189+
}
190+
191+
start()
192+
},
193+
// Expose the getSyncMetadata function
194+
getSyncMetadata: undefined,
195+
}
196+
197+
return {
198+
...config,
199+
sync,
200+
getKey,
201+
onInsert: async (params): Promise<Array<number | string>> => {
202+
const ids = await config.recordApi.createBulk(
203+
params.transaction.mutations.map((tx) => {
204+
const { type, changes } = tx
205+
if (type !== `insert`) {
206+
throw new Error(`Expected 'insert', got: ${type}`)
207+
}
208+
return changes
209+
})
210+
)
211+
212+
// The optimistic mutation overlay is removed on return, so at this point
213+
// we have to ensure that the new record was properly added to the local
214+
// DB by the subscription.
215+
await awaitIds(ids.map((id) => String(id)))
216+
217+
return ids
218+
},
219+
onUpdate: async (params) => {
220+
const ids: Array<string> = await Promise.all(
221+
params.transaction.mutations.map(async (tx) => {
222+
const { type, changes, key } = tx
223+
if (type !== `update`) {
224+
throw new Error(`Expected 'update', got: ${type}`)
225+
}
226+
227+
await config.recordApi.update(key, changes)
228+
return String(key)
229+
})
230+
)
231+
232+
// The optimistic mutation overlay is removed on return, so at this point
233+
// we have to ensure that the new record was properly updated in the local
234+
// DB by the subscription.
235+
await awaitIds(ids)
236+
},
237+
onDelete: async (params) => {
238+
const ids: Array<string> = await Promise.all(
239+
params.transaction.mutations.map(async (tx) => {
240+
const { type, key } = tx
241+
if (type !== `delete`) {
242+
throw new Error(`Expected 'delete', got: ${type}`)
243+
}
244+
245+
await config.recordApi.delete(key)
246+
return String(key)
247+
})
248+
)
249+
250+
// The optimistic mutation overlay is removed on return, so at this point
251+
// we have to ensure that the new record was properly updated in the local
252+
// DB by the subscription.
253+
await awaitIds(ids)
254+
},
255+
utils: {
256+
cancel,
257+
},
258+
}
259+
}

0 commit comments

Comments
 (0)