@@ -12,6 +12,7 @@ use crate::identity::Identity;
12
12
use crate :: messages:: control_db:: Database ;
13
13
use crate :: replica_context:: ReplicaContext ;
14
14
use crate :: sql:: ast:: SchemaViewer ;
15
+ use crate :: sql:: parser:: RowLevelExpr ;
15
16
use crate :: subscription:: module_subscription_actor:: ModuleSubscriptions ;
16
17
use crate :: subscription:: tx:: DeltaTx ;
17
18
use crate :: subscription:: { execute_plan, record_exec_metrics} ;
@@ -39,6 +40,7 @@ use spacetimedb_sats::ProductValue;
39
40
use spacetimedb_schema:: auto_migrate:: AutoMigrateError ;
40
41
use spacetimedb_schema:: def:: deserialize:: ReducerArgsDeserializeSeed ;
41
42
use spacetimedb_schema:: def:: { ModuleDef , ReducerDef } ;
43
+ use spacetimedb_schema:: schema:: { Schema , TableSchema } ;
42
44
use spacetimedb_vm:: relation:: RelValue ;
43
45
use std:: fmt;
44
46
use std:: sync:: { Arc , Weak } ;
@@ -263,9 +265,6 @@ pub trait Module: Send + Sync + 'static {
263
265
pub trait ModuleInstance : Send + ' static {
264
266
fn trapped ( & self ) -> bool ;
265
267
266
- /// If the module instance's replica_ctx is uninitialized, initialize it.
267
- fn init_database ( & mut self , program : Program ) -> anyhow:: Result < Option < ReducerCallResult > > ;
268
-
269
268
/// Update the module instance's database to match the schema of the module instance.
270
269
fn update_database (
271
270
& mut self ,
@@ -276,6 +275,79 @@ pub trait ModuleInstance: Send + 'static {
276
275
fn call_reducer ( & mut self , tx : Option < MutTxId > , params : CallReducerParams ) -> ReducerCallResult ;
277
276
}
278
277
278
+ /// If the module instance's replica_ctx is uninitialized, initialize it.
279
+ fn init_database (
280
+ replica_ctx : & ReplicaContext ,
281
+ module_def : & ModuleDef ,
282
+ inst : & mut dyn ModuleInstance ,
283
+ program : Program ,
284
+ ) -> anyhow:: Result < Option < ReducerCallResult > > {
285
+ log:: debug!( "init database" ) ;
286
+ let timestamp = Timestamp :: now ( ) ;
287
+ let stdb = & * replica_ctx. relational_db ;
288
+ let logger = replica_ctx. logger . system_logger ( ) ;
289
+
290
+ let tx = stdb. begin_mut_tx ( IsolationLevel :: Serializable , Workload :: Internal ) ;
291
+ let auth_ctx = AuthCtx :: for_current ( replica_ctx. database . owner_identity ) ;
292
+ let ( tx, ( ) ) = stdb
293
+ . with_auto_rollback ( tx, |tx| {
294
+ let mut table_defs: Vec < _ > = module_def. tables ( ) . collect ( ) ;
295
+ table_defs. sort_by ( |a, b| a. name . cmp ( & b. name ) ) ;
296
+
297
+ for def in table_defs {
298
+ let table_name = & def. name ;
299
+ logger. info ( & format ! ( "Creating table `{table_name}`" ) ) ;
300
+ let schema = TableSchema :: from_module_def ( module_def, def, ( ) , TableId :: SENTINEL ) ;
301
+ stdb. create_table ( tx, schema)
302
+ . with_context ( || format ! ( "failed to create table {table_name}" ) ) ?;
303
+ }
304
+ // Insert the late-bound row-level security expressions.
305
+ for rls in module_def. row_level_security ( ) {
306
+ logger. info ( & format ! ( "Creating row level security `{}`" , rls. sql) ) ;
307
+
308
+ let rls = RowLevelExpr :: build_row_level_expr ( tx, & auth_ctx, rls)
309
+ . with_context ( || format ! ( "failed to create row-level security: `{}`" , rls. sql) ) ?;
310
+ let table_id = rls. def . table_id ;
311
+ let sql = rls. def . sql . clone ( ) ;
312
+ stdb. create_row_level_security ( tx, rls. def )
313
+ . with_context ( || format ! ( "failed to create row-level security for table `{table_id}`: `{sql}`" , ) ) ?;
314
+ }
315
+
316
+ stdb. set_initialized ( tx, replica_ctx. host_type , program) ?;
317
+
318
+ anyhow:: Ok ( ( ) )
319
+ } )
320
+ . inspect_err ( |e| log:: error!( "{e:?}" ) ) ?;
321
+
322
+ let rcr = match module_def. lifecycle_reducer ( Lifecycle :: Init ) {
323
+ None => {
324
+ stdb. commit_tx ( tx) ?;
325
+ None
326
+ }
327
+
328
+ Some ( ( reducer_id, _) ) => {
329
+ logger. info ( "Invoking `init` reducer" ) ;
330
+ let caller_identity = replica_ctx. database . owner_identity ;
331
+ Some ( inst. call_reducer (
332
+ Some ( tx) ,
333
+ CallReducerParams {
334
+ timestamp,
335
+ caller_identity,
336
+ caller_connection_id : ConnectionId :: ZERO ,
337
+ client : None ,
338
+ request_id : None ,
339
+ timer : None ,
340
+ reducer_id,
341
+ args : ArgsTuple :: nullary ( ) ,
342
+ } ,
343
+ ) )
344
+ }
345
+ } ;
346
+
347
+ logger. info ( "Database initialized" ) ;
348
+ Ok ( rcr)
349
+ }
350
+
279
351
pub struct CallReducerParams {
280
352
pub timestamp : Timestamp ,
281
353
pub caller_identity : Identity ,
@@ -306,11 +378,6 @@ impl<T: Module> ModuleInstance for AutoReplacingModuleInstance<T> {
306
378
fn trapped ( & self ) -> bool {
307
379
self . inst . trapped ( )
308
380
}
309
- fn init_database ( & mut self , program : Program ) -> anyhow:: Result < Option < ReducerCallResult > > {
310
- let ret = self . inst . init_database ( program) ;
311
- self . check_trap ( ) ;
312
- ret
313
- }
314
381
fn update_database (
315
382
& mut self ,
316
383
program : Program ,
@@ -877,9 +944,13 @@ impl ModuleHost {
877
944
}
878
945
879
946
pub async fn init_database ( & self , program : Program ) -> Result < Option < ReducerCallResult > , InitDatabaseError > {
880
- self . call ( "<init_database>" , move |inst| inst. init_database ( program) )
881
- . await ?
882
- . map_err ( InitDatabaseError :: Other )
947
+ let replica_ctx = self . inner . replica_ctx ( ) . clone ( ) ;
948
+ let info = self . info . clone ( ) ;
949
+ self . call ( "<init_database>" , move |inst| {
950
+ init_database ( & replica_ctx, & info. module_def , inst, program)
951
+ } )
952
+ . await ?
953
+ . map_err ( InitDatabaseError :: Other )
883
954
}
884
955
885
956
pub async fn update_database (
0 commit comments