Skip to content

Commit 0789550

Browse files
committed
Install hooks
1 parent 6f25bc5 commit 0789550

File tree

6 files changed

+243
-4
lines changed

6 files changed

+243
-4
lines changed

crates/core/src/crud_vtab.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -334,14 +334,12 @@ extern "C" fn begin(vtab: *mut sqlite::vtab) -> c_int {
334334

335335
extern "C" fn commit(vtab: *mut sqlite::vtab) -> c_int {
336336
let tab = unsafe { &mut *(vtab.cast::<VirtualTable>()) };
337-
tab.state.track_commit();
338337
tab.end_transaction();
339338
ResultCode::OK as c_int
340339
}
341340

342341
extern "C" fn rollback(vtab: *mut sqlite::vtab) -> c_int {
343342
let tab = unsafe { &mut *(vtab.cast::<VirtualTable>()) };
344-
tab.end_transaction();
345343
tab.state.track_rollback();
346344
// ps_tx will be rolled back automatically
347345
ResultCode::OK as c_int

crates/core/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ mod schema;
3434
mod state;
3535
mod sync;
3636
mod sync_local;
37+
mod update_hooks;
3738
mod util;
3839
mod uuid;
3940
mod version;
@@ -80,6 +81,7 @@ fn init_extension(db: *mut sqlite::sqlite3) -> Result<(), PowerSyncError> {
8081
crate::kv::register(db)?;
8182
crate::state::register(db, state.clone())?;
8283
sync::register(db, state.clone())?;
84+
update_hooks::register(db, state.clone())?;
8385

8486
crate::schema::register(db)?;
8587
crate::operations_vtab::register(db, state.clone())?;

crates/core/src/state.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ impl DatabaseState {
4848
ClearOnDrop(self)
4949
}
5050

51-
pub fn track_update(self, tbl: &str) {
51+
pub fn track_update(&self, tbl: &str) {
5252
let mut set = self.pending_updates.borrow_mut();
5353
set.get_or_insert_with(tbl, str::to_string);
5454
}
@@ -67,6 +67,11 @@ impl DatabaseState {
6767
}
6868
}
6969

70+
pub fn take_updates(&self) -> BTreeSet<String> {
71+
let mut committed = self.commited_updates.borrow_mut();
72+
core::mem::replace(&mut *committed, Default::default())
73+
}
74+
7075
pub unsafe extern "C" fn destroy_arc(ptr: *mut c_void) {
7176
drop(unsafe { Arc::from_raw(ptr.cast::<DatabaseState>()) });
7277
}

crates/core/src/update_hooks.rs

Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
use core::{
2+
ffi::{CStr, c_char, c_int, c_void},
3+
ptr::null_mut,
4+
sync::atomic::{AtomicBool, Ordering},
5+
};
6+
7+
use alloc::{boxed::Box, sync::Arc};
8+
use sqlite_nostd::{
9+
self as sqlite, Connection, Context, ResultCode, Value, bindings::SQLITE_RESULT_SUBTYPE,
10+
};
11+
12+
use crate::{constants::SUBTYPE_JSON, error::PowerSyncError, state::DatabaseState};
13+
14+
/// The `powersync_update_hooks` methods works like this:
15+
///
16+
/// 1. `powersync_update_hooks('install')` installs update hooks on the database, failing if
17+
/// another hook already exists.
18+
/// 2. `powersync_update_hooks('get')` returns a JSON array of table names that have been changed
19+
/// and comitted since the last `powersync_update_hooks` call.
20+
///
21+
/// The update hooks don't have to be uninstalled manually, that happens when the connection is
22+
/// closed and the function is unregistered.
23+
pub fn register(db: *mut sqlite::sqlite3, state: Arc<DatabaseState>) -> Result<(), ResultCode> {
24+
let state = Box::new(HookState {
25+
has_registered_hooks: AtomicBool::new(false),
26+
db,
27+
state,
28+
});
29+
30+
db.create_function_v2(
31+
"powersync_update_hooks",
32+
1,
33+
sqlite::UTF8 | sqlite::DETERMINISTIC | SQLITE_RESULT_SUBTYPE,
34+
Some(Box::into_raw(state) as *mut c_void),
35+
Some(powersync_update_hooks),
36+
None,
37+
None,
38+
Some(destroy_function),
39+
)?;
40+
Ok(())
41+
}
42+
43+
struct HookState {
44+
has_registered_hooks: AtomicBool,
45+
db: *mut sqlite::sqlite3,
46+
state: Arc<DatabaseState>,
47+
}
48+
49+
extern "C" fn destroy_function(ctx: *mut c_void) {
50+
let state = unsafe { Box::from_raw(ctx as *mut HookState) };
51+
52+
if state.has_registered_hooks.load(Ordering::Relaxed) {
53+
check_previous(
54+
"update",
55+
&state.state,
56+
state.db.update_hook(None, null_mut()),
57+
);
58+
check_previous(
59+
"commit",
60+
&state.state,
61+
state.db.commit_hook(None, null_mut()),
62+
);
63+
check_previous(
64+
"rollback",
65+
&state.state,
66+
state.db.rollback_hook(None, null_mut()),
67+
);
68+
}
69+
}
70+
71+
extern "C" fn powersync_update_hooks(
72+
ctx: *mut sqlite::context,
73+
argc: c_int,
74+
argv: *mut *mut sqlite::value,
75+
) {
76+
let args = sqlite::args!(argc, argv);
77+
let op = args[0].text();
78+
let db = ctx.db_handle();
79+
let user_data = ctx.user_data() as *const HookState;
80+
81+
match op {
82+
"install" => {
83+
let state = unsafe { user_data.as_ref().unwrap_unchecked() };
84+
let db_state = &state.state;
85+
86+
check_previous(
87+
"update",
88+
db_state,
89+
db.update_hook(
90+
Some(update_hook_impl),
91+
Arc::into_raw(db_state.clone()) as *mut c_void,
92+
),
93+
);
94+
check_previous(
95+
"commit",
96+
db_state,
97+
db.commit_hook(
98+
Some(commit_hook_impl),
99+
Arc::into_raw(db_state.clone()) as *mut c_void,
100+
),
101+
);
102+
check_previous(
103+
"rollback",
104+
db_state,
105+
db.rollback_hook(
106+
Some(rollback_hook_impl),
107+
Arc::into_raw(db_state.clone()) as *mut c_void,
108+
),
109+
);
110+
state.has_registered_hooks.store(true, Ordering::Relaxed);
111+
}
112+
"get" => {
113+
let state = unsafe { user_data.as_ref().unwrap_unchecked() };
114+
let formatted = serde_json::to_string(&state.state.take_updates())
115+
.map_err(PowerSyncError::internal);
116+
match formatted {
117+
Ok(result) => {
118+
ctx.result_text_transient(&result);
119+
ctx.result_subtype(SUBTYPE_JSON);
120+
}
121+
Err(e) => e.apply_to_ctx("powersync_update_hooks", ctx),
122+
}
123+
}
124+
_ => {
125+
ctx.result_error("Unknown operation");
126+
ctx.result_error_code(ResultCode::MISUSE);
127+
}
128+
};
129+
}
130+
131+
unsafe extern "C" fn update_hook_impl(
132+
ctx: *mut c_void,
133+
_kind: c_int,
134+
_db: *const c_char,
135+
table: *const c_char,
136+
_rowid: i64,
137+
) {
138+
let state = unsafe { (ctx as *const DatabaseState).as_ref().unwrap_unchecked() };
139+
let table = unsafe { CStr::from_ptr(table) };
140+
let Ok(table) = table.to_str() else {
141+
return;
142+
};
143+
144+
state.track_update(table);
145+
}
146+
147+
unsafe extern "C" fn commit_hook_impl(ctx: *mut c_void) -> c_int {
148+
let state = unsafe { (ctx as *const DatabaseState).as_ref().unwrap_unchecked() };
149+
state.track_commit();
150+
return 0; // Allow commit to continue normally
151+
}
152+
153+
unsafe extern "C" fn rollback_hook_impl(ctx: *mut c_void) {
154+
let state = unsafe { (ctx as *const DatabaseState).as_ref().unwrap_unchecked() };
155+
state.track_rollback();
156+
}
157+
158+
fn check_previous(desc: &'static str, expected: &Arc<DatabaseState>, previous: *const c_void) {
159+
let expected = Arc::as_ptr(expected);
160+
161+
assert!(
162+
previous.is_null() || previous == expected.cast(),
163+
"Previous call to {desc} hook outside of PowerSync: Expected {expected:p}, installed was {previous:p}",
164+
);
165+
if !previous.is_null() {
166+
// The hook callback own an Arc<DatabaseState> that needs to be dropped now.
167+
unsafe {
168+
Arc::decrement_strong_count(previous);
169+
}
170+
}
171+
}

dart/test/update_hooks_test.dart

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
import 'dart:convert';
2+
3+
import 'package:sqlite3/common.dart';
4+
import 'package:test/test.dart';
5+
6+
import 'utils/native_test_utils.dart';
7+
8+
void main() {
9+
late CommonDatabase db;
10+
11+
setUp(() async {
12+
db = openTestDatabase()
13+
..select('select powersync_init()')
14+
..execute('CREATE TABLE foo (bar INTEGER);')
15+
..select("SELECT powersync_update_hooks('install')");
16+
});
17+
18+
tearDown(() {
19+
db.dispose();
20+
});
21+
22+
List<String> collectUpdates() {
23+
final [row] = db.select("SELECT powersync_update_hooks('get')");
24+
return (json.decode(row.values[0] as String) as List).cast();
25+
}
26+
27+
test('is empty initially', () {
28+
expect(collectUpdates(), isEmpty);
29+
});
30+
31+
test('reports changed tables', () {
32+
db.execute('INSERT INTO foo DEFAULT VALUES');
33+
expect(collectUpdates(), ['foo']);
34+
});
35+
36+
test('deduplicates tables', () {
37+
final stmt = db.prepare('INSERT INTO foo (bar) VALUES (?)');
38+
for (var i = 0; i < 1000; i++) {
39+
stmt.execute([i]);
40+
}
41+
stmt.dispose();
42+
43+
expect(collectUpdates(), ['foo']);
44+
});
45+
46+
test('does not report changes before end of transaction', () {
47+
db.execute('BEGIN');
48+
db.execute('INSERT INTO foo DEFAULT VALUES');
49+
expect(collectUpdates(), isEmpty);
50+
db.execute('COMMIT');
51+
52+
expect(collectUpdates(), ['foo']);
53+
});
54+
55+
test('does not report rollbacks', () {
56+
db.execute('BEGIN');
57+
db.execute('INSERT INTO foo DEFAULT VALUES');
58+
expect(collectUpdates(), isEmpty);
59+
db.execute('ROLLBACK');
60+
61+
expect(collectUpdates(), isEmpty);
62+
});
63+
}

0 commit comments

Comments
 (0)