@@ -14,13 +14,12 @@ use tokio::{
14
14
15
15
/// A [StateroomContext] implementation for [StateroomService]s hosted in the
16
16
/// context of a [ServiceActor].
17
- #[ derive( Clone ) ]
18
- pub struct ServiceActorContext {
17
+ pub struct ServerStateroomContext {
19
18
senders : Arc < DashMap < ClientId , Sender < Message > > > ,
20
- event_sender : Sender < Event > ,
19
+ event_sender : Arc < Sender < Event > > ,
21
20
}
22
21
23
- impl ServiceActorContext {
22
+ impl ServerStateroomContext {
24
23
pub fn try_send ( & self , recipient : MessageRecipient , message : Message ) {
25
24
match recipient {
26
25
MessageRecipient :: Broadcast => {
@@ -39,14 +38,14 @@ impl ServiceActorContext {
39
38
if let Some ( sender) = self . senders . get ( & client_id) {
40
39
sender. try_send ( message) . unwrap ( ) ;
41
40
} else {
42
- println ! ( "No sender for client {:?}" , client_id ) ;
41
+ tracing :: error! ( ?client_id , "No sender for client." ) ;
43
42
}
44
43
}
45
44
}
46
45
}
47
46
}
48
47
49
- impl StateroomContext for ServiceActorContext {
48
+ impl StateroomContext for ServerStateroomContext {
50
49
fn send_message ( & self , recipient : impl Into < MessageRecipient > , message : & str ) {
51
50
self . try_send ( recipient. into ( ) , Message :: Text ( message. to_string ( ) ) ) ;
52
51
}
@@ -60,7 +59,7 @@ impl StateroomContext for ServiceActorContext {
60
59
let sender = self . event_sender . clone ( ) ;
61
60
tokio:: spawn ( async move {
62
61
tokio:: time:: sleep ( Duration :: from_millis ( ms_delay as u64 ) ) . await ;
63
- sender. send ( Event :: TimerEvent ) . await . unwrap ( ) ;
62
+ sender. send ( Event :: Timer ) . await . unwrap ( ) ;
64
63
} ) ;
65
64
}
66
65
}
@@ -78,44 +77,39 @@ pub enum Event {
78
77
Message { client : ClientId , message : Message } ,
79
78
Join { client : ClientId } ,
80
79
Leave { client : ClientId } ,
81
- TimerEvent ,
80
+ Timer ,
82
81
}
83
82
84
83
impl ServerState {
85
- pub fn new < T : StateroomService + Send + Sync + ' static > (
86
- service_factory : impl StateroomServiceFactory < ServiceActorContext , Service = T > + Send + ' static ,
87
- ) -> Self {
84
+ pub fn new ( factory : impl StateroomServiceFactory ) -> Self {
88
85
let ( tx, mut rx) = tokio:: sync:: mpsc:: channel :: < Event > ( 100 ) ;
89
86
90
87
let senders = Arc :: new ( DashMap :: new ( ) ) ;
91
88
92
89
let senders_ = senders. clone ( ) ;
93
90
let tx_ = tx. clone ( ) ;
94
91
let handle = tokio:: spawn ( async move {
95
- let mut service = service_factory
96
- . build (
97
- "" ,
98
- ServiceActorContext {
99
- senders : senders_. clone ( ) ,
100
- event_sender : tx_,
101
- } ,
102
- )
103
- . unwrap ( ) ;
104
-
92
+ let context = Arc :: new ( ServerStateroomContext {
93
+ senders : senders_. clone ( ) ,
94
+ event_sender : Arc :: new ( tx_) ,
95
+ } ) ;
96
+
97
+ let mut service = factory. build ( "" , context. clone ( ) ) . unwrap ( ) ;
98
+ service. init ( context. as_ref ( ) ) ;
99
+
105
100
loop {
106
101
let msg = rx. recv ( ) . await ;
107
- println ! ( "{:?}" , msg) ;
108
102
match msg {
109
103
Some ( Event :: Message { client, message } ) => match message {
110
- Message :: Text ( msg) => service. message ( client, & msg) ,
111
- Message :: Binary ( msg) => service. binary ( client, & msg) ,
104
+ Message :: Text ( msg) => service. message ( client, & msg, context . as_ref ( ) ) ,
105
+ Message :: Binary ( msg) => service. binary ( client, & msg, context . as_ref ( ) ) ,
112
106
Message :: Close ( _) => { }
113
- msg => println ! ( "Ignoring unhandled message: {:?}" , msg) ,
107
+ msg => tracing :: warn !( "Ignoring unhandled message: {:?}" , msg) ,
114
108
} ,
115
- Some ( Event :: Join { client } ) => service. connect ( client) ,
116
- Some ( Event :: Leave { client } ) => service. disconnect ( client) ,
117
- Some ( Event :: TimerEvent ) => {
118
- service. timer ( ) ;
109
+ Some ( Event :: Join { client } ) => service. connect ( client, context . as_ref ( ) ) ,
110
+ Some ( Event :: Leave { client } ) => service. disconnect ( client, context . as_ref ( ) ) ,
111
+ Some ( Event :: Timer ) => {
112
+ service. timer ( context . as_ref ( ) ) ;
119
113
}
120
114
None => break ,
121
115
}
@@ -133,7 +127,7 @@ impl ServerState {
133
127
pub fn remove ( & self , client : & ClientId ) {
134
128
self . inbound_sender
135
129
. try_send ( Event :: Leave {
136
- client : client. clone ( ) ,
130
+ client : * client,
137
131
} )
138
132
. unwrap ( ) ;
139
133
self . senders . remove ( client) ;
0 commit comments