-
-
Notifications
You must be signed in to change notification settings - Fork 65
/
Copy pathdashboardhandler.py
208 lines (174 loc) · 7.64 KB
/
dashboardhandler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
"""
A Dashboard handler for the Dask labextension.
This proxies the bokeh server http and ws requests through the notebook
server, preventing CORS issues.
"""
import json
from inspect import isawaitable
from urllib import parse
from tornado import httpclient, web
from jupyter_server.base.handlers import APIHandler
from jupyter_server.utils import url_path_join
from jupyter_server_proxy.handlers import ProxyHandler
from .manager import DaskClusterManager
class DaskDashboardCheckHandler(APIHandler):
"""
A handler for checking validity of a dask dashboard.
"""
manager: DaskClusterManager
async def prepare(self):
r = super().prepare()
if isawaitable(r):
await r
self.manager = await self.settings["dask_cluster_manager"]
@web.authenticated
async def get(self, url) -> None:
"""
Test if a given url string hosts a dask dashboard. Should always return a
200 code, any errors are presumed to result from an invalid/inactive dashboard.
"""
try:
client = httpclient.AsyncHTTPClient()
# Extract query (if any) from URL, this will then be appended after path.
# This allows using (eg) "?token=[...]" in URL for authentication.
if "?" in url:
pos = url.find("?")
url, query = url[:pos], url[pos:]
else:
query = ""
# First check for the individual-plots endpoint at user-provided url.
# We don't check for the root URL because that can trigger a lot of
# object creation in the bokeh document.
url = _normalize_dashboard_link(parse.unquote(url), self.request)
effective_url = None
individual_plots_url = url_path_join(
url,
f"individual-plots.json{query}",
)
try:
self.log.debug(
f"Checking for individual plots at {individual_plots_url}"
)
individual_plots_response = await client.fetch(individual_plots_url)
self.log.debug(f"{individual_plots_response.code}")
except httpclient.HTTPError as err:
# If we didn't get individual plots, we may have to follow a redirect first.
self.log.debug(f"Checking for redirect at {url}")
response = await client.fetch(url)
effective_url = (
_normalize_dashboard_link(response.effective_url, self.request)
if response.effective_url != url
else None
)
# If there was no redirect, raise.
if not effective_url:
raise err
individual_plots_url = url_path_join(
effective_url,
"individual-plots.json",
)
self.log.debug(f"Found redirect at {effective_url}")
self.log.debug(
f"Checking for individual plots at {individual_plots_url}"
)
individual_plots_response = await client.fetch(individual_plots_url)
# If we didn't get individual plots, it may not be a dask dashboard
if individual_plots_response.code != 200:
raise ValueError("Does not seem to host a dask dashboard")
individual_plots = json.loads(individual_plots_response.body)
# If there was query in original URL, append to URLs returned
if query:
for name, plot_url in individual_plots.items():
individual_plots[name] = f"{plot_url}{query}"
url = f"{url}{query}"
if effective_url:
effective_url = f"{effective_url}{query}"
self.set_status(200)
self.finish(
json.dumps(
{
"url": url,
"isActive": True,
"effectiveUrl": effective_url,
"plots": individual_plots,
}
)
)
except Exception:
self.log.debug(f"{url} does not seem to host a dask dashboard")
self.set_status(200)
self.finish(
json.dumps(
{
"url": url,
"isActive": False,
"plots": {},
}
)
)
class DaskDashboardHandler(ProxyHandler):
"""
A handler that proxies the dask dashboard to the notebook server.
Currently the dashboard is assumed to be running on `localhost`.
The functions `http_get`, `open`, `post`, `put`, `delete`,
`head`, `patch`, `options`, and `proxy` are all overriding
the base class with our own request handler parameters
of `cluster_id` and `proxied_path`.
The `proxy` function uses the cluster ID to get the port
for the bokeh server from the Dask cluster manager. This
port is then used to call the proxy method on the base class.
"""
async def http_get(self, cluster_id, proxied_path):
return await self.proxy(cluster_id, proxied_path)
async def open(self, cluster_id, proxied_path):
host, port = await self._get_parsed(cluster_id)
return await super().proxy_open(host, port, proxied_path)
# We have to duplicate all these for now, I've no idea why!
# Figure out a way to not do that?
def post(self, cluster_id, proxied_path):
return self.proxy(cluster_id, proxied_path)
def put(self, cluster_id, proxied_path):
return self.proxy(cluster_id, proxied_path)
def delete(self, cluster_id, proxied_path):
return self.proxy(cluster_id, proxied_path)
def head(self, cluster_id, proxied_path):
return self.proxy(cluster_id, proxied_path)
def patch(self, cluster_id, proxied_path):
return self.proxy(cluster_id, proxied_path)
def options(self, cluster_id, proxied_path):
return self.proxy(cluster_id, proxied_path)
async def proxy(self, cluster_id, proxied_path):
host, port = await self._get_parsed(cluster_id)
return super().proxy(host, port, proxied_path)
async def _get_parsed(self, cluster_id):
"""
Given a cluster ID, get the hostname and port of its bokeh server.
"""
# Get the cluster by ID. If it is not found,
# raise an error.
cluster_model = await self.manager.get_cluster(cluster_id)
if not cluster_model:
raise web.HTTPError(404, f"Dask cluster {cluster_id} not found")
# Construct the proper websocket proxy link from the cluster dashboard
dashboard_link = cluster_model["dashboard_link"]
dashboard_link = _normalize_dashboard_link(dashboard_link, self.request)
# Parse the url and return
parsed = parse.urlparse(dashboard_link)
port = parsed.port
if not port:
port = 443 if parsed.scheme == "https" else 80
if not parsed.hostname:
raise web.HTTPError(500, "Dask dashboard URI malformed")
return parsed.hostname, port
def _normalize_dashboard_link(link, request):
"""
Given a dashboard link, make sure it conforms to what we expect.
"""
if not link.startswith("http"):
# If a local url is given, assume it is using the same host
# as the application, and prepend that.
link = url_path_join(f"{request.protocol}://{request.host}", link)
if link.endswith("/status"):
# If the default "status" dashboard is given, strip it.
link = link[: -len("/status")]
return link