@@ -31,6 +31,8 @@ use crate::{
31
31
} ;
32
32
use bytes:: Bytes ;
33
33
use futures:: StreamExt ;
34
+ #[ cfg( feature = "web" ) ]
35
+ use futures:: { pin_mut, FutureExt } ;
34
36
use futures_channel:: mpsc;
35
37
use http:: Uri ;
36
38
use spacetimedb_client_api_messages:: websocket as ws;
@@ -40,9 +42,11 @@ use std::{
40
42
collections:: HashMap ,
41
43
sync:: { atomic:: AtomicU32 , Arc , Mutex as StdMutex , OnceLock } ,
42
44
} ;
43
- use tokio:: runtime:: { self , Runtime } ;
44
45
#[ cfg( not( feature = "web" ) ) ]
45
- use tokio:: sync:: Mutex as TokioMutex ;
46
+ use tokio:: {
47
+ runtime:: { self , Runtime } ,
48
+ sync:: Mutex as TokioMutex ,
49
+ } ;
46
50
47
51
pub ( crate ) type SharedCell < T > = Arc < StdMutex < T > > ;
48
52
@@ -52,6 +56,7 @@ pub(crate) type SharedCell<T> = Arc<StdMutex<T>>;
52
56
/// This must be relatively cheaply `Clone`-able, and have internal sharing,
53
57
/// as numerous operations will clone it to get new handles on the connection.
54
58
pub struct DbContextImpl < M : SpacetimeModule > {
59
+ #[ cfg( not( feature = "web" ) ) ]
55
60
runtime : runtime:: Handle ,
56
61
57
62
/// All the state which is safe to hold a lock on while running callbacks.
@@ -93,6 +98,7 @@ pub struct DbContextImpl<M: SpacetimeModule> {
93
98
impl < M : SpacetimeModule > Clone for DbContextImpl < M > {
94
99
fn clone ( & self ) -> Self {
95
100
Self {
101
+ #[ cfg( not( feature = "web" ) ) ]
96
102
runtime : self . runtime . clone ( ) ,
97
103
// Being very explicit with `Arc::clone` here,
98
104
// since we'll be doing `DbContextImpl::clone` very frequently,
@@ -516,15 +522,28 @@ impl<M: SpacetimeModule> DbContextImpl<M> {
516
522
return Message :: Local ( pending_mutation. unwrap ( ) ) ;
517
523
}
518
524
525
+ #[ cfg( not( feature = "web" ) ) ]
519
526
tokio:: select! {
520
527
pending_mutation = pending_mutations. next( ) => Message :: Local ( pending_mutation. unwrap( ) ) ,
521
528
incoming_message = recv. next( ) => Message :: Ws ( incoming_message) ,
522
529
}
530
+
531
+ #[ cfg( feature = "web" ) ]
532
+ {
533
+ let ( pending_fut, recv_fut) = ( pending_mutations. next ( ) . fuse ( ) , recv. next ( ) . fuse ( ) ) ;
534
+ pin_mut ! ( pending_fut, recv_fut) ;
535
+
536
+ futures:: select! {
537
+ pending_mutation = pending_fut => Message :: Local ( pending_mutation. unwrap( ) ) ,
538
+ incoming_message = recv_fut => Message :: Ws ( incoming_message) ,
539
+ }
540
+ }
523
541
}
524
542
525
543
/// Like [`Self::advance_one_message`], but sleeps the thread until a message is available.
526
544
///
527
545
/// Called by the autogenerated `DbConnection` method of the same name.
546
+ #[ cfg( not( feature = "web" ) ) ]
528
547
pub fn advance_one_message_blocking ( & self ) -> crate :: Result < ( ) > {
529
548
match self . runtime . block_on ( self . get_message ( ) ) {
530
549
Message :: Local ( pending) => self . apply_mutation ( pending) ,
@@ -563,6 +582,7 @@ impl<M: SpacetimeModule> DbContextImpl<M> {
563
582
/// Spawn a thread which does [`Self::advance_one_message_blocking`] in a loop.
564
583
///
565
584
/// Called by the autogenerated `DbConnection` method of the same name.
585
+ #[ cfg( not( feature = "web" ) ) ]
566
586
pub fn run_threaded ( & self ) -> std:: thread:: JoinHandle < ( ) > {
567
587
let this = self . clone ( ) ;
568
588
std:: thread:: spawn ( move || loop {
@@ -714,6 +734,7 @@ pub(crate) struct DbContextImplInner<M: SpacetimeModule> {
714
734
/// `Some` if not within the context of an outer runtime. The `Runtime` must
715
735
/// then live as long as `Self`.
716
736
#[ allow( unused) ]
737
+ #[ cfg( not( feature = "web" ) ) ]
717
738
runtime : Option < Runtime > ,
718
739
719
740
db_callbacks : DbCallbacks < M > ,
@@ -917,7 +938,6 @@ but you must call one of them, or else the connection will never progress.
917
938
918
939
#[ cfg( feature = "web" ) ]
919
940
pub async fn build_impl ( self ) -> crate :: Result < DbContextImpl < M > > {
920
- let ( runtime, handle) = enter_or_create_runtime ( ) ?;
921
941
let db_callbacks = DbCallbacks :: default ( ) ;
922
942
let reducer_callbacks = ReducerCallbacks :: default ( ) ;
923
943
@@ -937,8 +957,6 @@ but you must call one of them, or else the connection will never progress.
937
957
let parsed_recv_chan = spawn_parse_loop :: < M > ( raw_msg_recv) ;
938
958
939
959
let inner = Arc :: new ( StdMutex :: new ( DbContextImplInner {
940
- runtime,
941
-
942
960
db_callbacks,
943
961
reducer_callbacks,
944
962
subscriptions : SubscriptionManager :: default ( ) ,
@@ -956,7 +974,6 @@ but you must call one of them, or else the connection will never progress.
956
974
957
975
let ( pending_mutations_send, pending_mutations_recv) = mpsc:: unbounded ( ) ;
958
976
let ctx_imp = DbContextImpl {
959
- runtime : handle,
960
977
inner,
961
978
send_chan,
962
979
cache,
@@ -1081,15 +1098,11 @@ Instead of registering multiple `on_disconnect` callbacks, register a single cal
1081
1098
// When called from within an async context, return a handle to it (and no
1082
1099
// `Runtime`), otherwise create a fresh `Runtime` and return it along with a
1083
1100
// handle to it.
1101
+ #[ cfg( not( feature = "web" ) ) ]
1084
1102
fn enter_or_create_runtime ( ) -> crate :: Result < ( Option < Runtime > , runtime:: Handle ) > {
1085
1103
match runtime:: Handle :: try_current ( ) {
1086
1104
Err ( e) if e. is_missing_context ( ) => {
1087
- #[ cfg( not( feature = "web" ) ) ]
1088
- let mut rt = tokio:: runtime:: Builder :: new_multi_thread ( ) ;
1089
- #[ cfg( feature = "web" ) ]
1090
- let mut rt = tokio:: runtime:: Builder :: new_current_thread ( ) ;
1091
-
1092
- let rt = rt
1105
+ let rt = tokio:: runtime:: Builder :: new_multi_thread ( )
1093
1106
. enable_all ( )
1094
1107
. worker_threads ( 1 )
1095
1108
. thread_name ( "spacetimedb-background-connection" )
0 commit comments