1
- #-----------------------------------------------------------------------------
1
+ # -----------------------------------------------------------------------------
2
2
# Copyright (c) 2012 - 2022, Anaconda, Inc., and Bokeh Contributors.
3
3
# All rights reserved.
4
4
#
5
5
# The full license is in the file LICENSE.txt, distributed with this software.
6
- #-----------------------------------------------------------------------------
6
+ # -----------------------------------------------------------------------------
7
7
8
- #-----------------------------------------------------------------------------
8
+ # -----------------------------------------------------------------------------
9
9
# Boilerplate
10
- #-----------------------------------------------------------------------------
10
+ # -----------------------------------------------------------------------------
11
11
from __future__ import annotations
12
12
13
13
import logging # isort:skip
14
14
log = logging .getLogger (__name__ )
15
15
16
- #-----------------------------------------------------------------------------
16
+ # -----------------------------------------------------------------------------
17
17
# Imports
18
- #-----------------------------------------------------------------------------
18
+ # -----------------------------------------------------------------------------
19
19
20
20
# Standard library imports
21
21
import asyncio
56
56
get_token_payload ,
57
57
)
58
58
59
- #-----------------------------------------------------------------------------
59
+ # -----------------------------------------------------------------------------
60
60
# Globals and constants
61
- #-----------------------------------------------------------------------------
61
+ # -----------------------------------------------------------------------------
62
62
63
63
__all__ = (
64
64
'DocConsumer' ,
65
65
'AutoloadJsConsumer' ,
66
66
'WSConsumer' ,
67
67
)
68
68
69
- #-----------------------------------------------------------------------------
69
+ # -----------------------------------------------------------------------------
70
70
# General API
71
- #-----------------------------------------------------------------------------
71
+ # -----------------------------------------------------------------------------
72
+
72
73
73
74
class ConsumerHelper (AsyncConsumer ):
74
75
@@ -95,34 +96,45 @@ def resources(self, absolute_url: str | None = None) -> Resources:
95
96
return Resources (mode = "server" , root_url = root_url , path_versioner = StaticHandler .append_version )
96
97
return Resources (mode = mode )
97
98
99
+
98
100
class SessionConsumer (AsyncHttpConsumer , ConsumerHelper ):
99
101
100
- application_context : ApplicationContext
102
+ _application_context : ApplicationContext
101
103
102
- def __init__ (self , scope : Dict [str , Any ]) -> None :
103
- super ().__init__ (scope )
104
+ def __init__ (self , * args : Any , ** kwargs : Any ) -> None :
105
+ super ().__init__ (* args , ** kwargs )
106
+ self ._application_context = kwargs .get ('app_context' )
104
107
105
- kwargs = self .scope ["url_route" ]["kwargs" ]
106
- self .application_context = kwargs ["app_context" ]
108
+ @property
109
+ def application_context (self ) -> ApplicationContext :
110
+ # backwards compatibility
111
+ if self ._application_context is None :
112
+ self ._application_context = self .scope ["url_route" ]["kwargs" ]["app_context" ]
107
113
108
114
# XXX: accessing asyncio's IOLoop directly doesn't work
109
- if self .application_context .io_loop is None :
110
- self .application_context ._loop = IOLoop .current ()
115
+ if self ._application_context .io_loop is None :
116
+ self ._application_context ._loop = IOLoop .current ()
117
+ return self ._application_context
111
118
112
119
async def _get_session (self ) -> ServerSession :
113
120
session_id = self .arguments .get ('bokeh-session-id' ,
114
121
generate_session_id (secret_key = None , signed = False ))
115
- payload = {'headers' : {k .decode ('utf-8' ): v .decode ('utf-8' )
116
- for k , v in self .request .headers },
117
- 'cookies' : dict (self .request .cookies )}
122
+ payload = dict (
123
+ headers = {k .decode ('utf-8' ): v .decode ('utf-8' ) for k , v in self .request .headers },
124
+ cookies = dict (self .request .cookies ),
125
+ )
118
126
token = generate_jwt_token (session_id ,
119
127
secret_key = None ,
120
128
signed = False ,
121
129
expiration = 300 ,
122
130
extra_payload = payload )
123
- session = await self .application_context .create_session_if_needed (session_id , self .request , token )
131
+ try :
132
+ session = await self .application_context .create_session_if_needed (session_id , self .request , token )
133
+ except Exception as e :
134
+ log .exception (e )
124
135
return session
125
136
137
+
126
138
class AutoloadJsConsumer (SessionConsumer ):
127
139
128
140
async def handle (self , body : bytes ) -> None :
@@ -143,7 +155,12 @@ async def handle(self, body: bytes) -> None:
143
155
144
156
resources_param = self .get_argument ("resources" , "default" )
145
157
resources = self .resources (server_url ) if resources_param != "none" else None
146
- bundle = bundle_for_objs_and_resources (None , resources )
158
+
159
+ root_url = urljoin (absolute_url , self ._prefix ) if absolute_url else self ._prefix
160
+ try :
161
+ bundle = bundle_for_objs_and_resources (None , resources , root_url = root_url )
162
+ except TypeError :
163
+ bundle = bundle_for_objs_and_resources (None , resources )
147
164
148
165
render_items = [RenderItem (token = session .token , elementid = element_id , use_for_title = False )]
149
166
bundle .add (Script (script_for_render_items ({}, render_items , app_path = app_path , absolute_url = absolute_url )))
@@ -157,34 +174,42 @@ async def handle(self, body: bytes) -> None:
157
174
]
158
175
await self .send_response (200 , js .encode (), headers = headers )
159
176
177
+
160
178
class DocConsumer (SessionConsumer ):
161
179
162
180
async def handle (self , body : bytes ) -> None :
163
181
session = await self ._get_session ()
164
- page = server_html_page_for_session (session ,
165
- resources = self .resources (),
166
- title = session .document .title ,
167
- template = session .document .template ,
168
- template_variables = session .document .template_variables )
182
+ page = server_html_page_for_session (
183
+ session ,
184
+ resources = self .resources (),
185
+ title = session .document .title ,
186
+ template = session .document .template ,
187
+ template_variables = session .document .template_variables
188
+ )
169
189
await self .send_response (200 , page .encode (), headers = [(b"Content-Type" , b"text/html" )])
170
190
191
+
171
192
class WSConsumer (AsyncWebsocketConsumer , ConsumerHelper ):
172
193
173
194
_clients : Set [ServerConnection ]
174
195
175
- application_context : ApplicationContext
196
+ _application_context : ApplicationContext | None
176
197
177
- def __init__ (self , scope : Dict [str , Any ]) -> None :
178
- super ().__init__ (scope )
198
+ def __init__ (self , * args : Any , ** kwargs : Any ) -> None :
199
+ super ().__init__ (* args , ** kwargs )
200
+ self ._application_context = kwargs .get ('app_context' )
201
+ self ._clients = set ()
202
+ self .lock = locks .Lock ()
179
203
180
- kwargs = self .scope ['url_route' ]["kwargs" ]
181
- self .application_context = kwargs ["app_context" ]
204
+ @property
205
+ def application_context (self ) -> ApplicationContext :
206
+ # backward compatiblity
207
+ if self ._application_context is None :
208
+ self ._application_context = self .scope ["url_route" ]["kwargs" ]["app_context" ]
182
209
183
- if self .application_context .io_loop is None :
210
+ if self ._application_context .io_loop is None :
184
211
raise RuntimeError ("io_loop should already been set" )
185
-
186
- self ._clients = set ()
187
- self .lock = locks .Lock ()
212
+ return self ._application_context
188
213
189
214
async def connect (self ):
190
215
log .info ('WebSocket connection opened' )
@@ -279,39 +304,44 @@ async def _send_bokeh_message(self, message: Message) -> int:
279
304
await self .send (text_data = message .content_json )
280
305
sent += len (message .content_json )
281
306
282
- for header , payload in message ._buffers :
307
+ for buffer in message ._buffers :
308
+ if isinstance (buffer , tuple ):
309
+ header , payload = buffer
310
+ else :
311
+ # buffer is bokeh.core.serialization.Buffer (Bokeh 3)
312
+ header = {'id' : buffer .id }
313
+ payload = buffer .data .tobytes ()
314
+
283
315
await self .send (text_data = json .dumps (header ))
284
316
await self .send (bytes_data = payload )
285
317
sent += len (header ) + len (payload )
286
- except Exception : # Tornado 4.x may raise StreamClosedError
318
+
319
+ except Exception as e : # Tornado 4.x may raise StreamClosedError
287
320
# on_close() is / will be called anyway
288
- log .warn ("Failed sending message as connection was closed" )
321
+ log .exception (e )
322
+ log .warning ("Failed sending message as connection was closed" )
289
323
return sent
290
324
325
+ async def send_message (self , message : Message ) -> int :
326
+ return await self ._send_bokeh_message (message )
327
+
291
328
def _new_connection (self ,
292
329
protocol : Protocol ,
293
330
socket : AsyncConsumer ,
294
331
application_context : ApplicationContext ,
295
332
session : ServerSession ) -> ServerConnection :
296
- connection = AsyncServerConnection (protocol , socket , application_context , session )
333
+ connection = ServerConnection (protocol , socket , application_context , session )
297
334
self ._clients .add (connection )
298
335
return connection
299
336
300
- #-----------------------------------------------------------------------------
337
+ # -----------------------------------------------------------------------------
301
338
# Dev API
302
- #-----------------------------------------------------------------------------
339
+ # -----------------------------------------------------------------------------
303
340
304
- #-----------------------------------------------------------------------------
341
+ # -----------------------------------------------------------------------------
305
342
# Private API
306
- #-----------------------------------------------------------------------------
307
-
308
- # TODO: remove this when coroutines are dropped
309
- class AsyncServerConnection (ServerConnection ):
343
+ # -----------------------------------------------------------------------------
310
344
311
- async def send_patch_document (self , event ):
312
- """ Sends a PATCH-DOC message, returning a Future that's completed when it's written out. """
313
- msg = self .protocol .create ('PATCH-DOC' , [event ])
314
- await self ._socket ._send_bokeh_message (msg )
315
345
316
346
class AttrDict (dict ):
317
347
""" Provide a dict subclass that supports access by named attributes.
@@ -321,6 +351,6 @@ class AttrDict(dict):
321
351
def __getattr__ (self , key ):
322
352
return self [key ]
323
353
324
- #-----------------------------------------------------------------------------
354
+ # -----------------------------------------------------------------------------
325
355
# Code
326
- #-----------------------------------------------------------------------------
356
+ # -----------------------------------------------------------------------------
0 commit comments