Skip to content

Commit ff36940

Browse files
authored
Initial version of DBFS Scanner to help with DBFS disablement (#379)
For upcoming functionality of full DBFS disablement we need to allow customers to analyze their workspace and identify DBFS use to store data and in the compute resources. See README for details
1 parent d9e4248 commit ff36940

9 files changed

+1083
-0
lines changed

dbfs-scanner/README.md

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
# A workspace scanner in preparation to DBFS disablement
2+
3+
This folder contains code that could help with preparing to complete disablement of DBFS. There are two tools available (each tool has a command-line variant and as notebook with `_nb` suffix):
4+
5+
* `scan_compute.py` (and `scan_compute_nb`) - performs scanning of compute resources (interactive clusters, jobs, DLT pipelines, policies) for use of resources on DBFS (init scripts, libraries, etc.) and output findings.
6+
* `scan_dbfs.py` (and `scan_dbfs_nb`) - performs scanning of DBFS content and trying to classify it and estimate size (Delta tables, Structure Streaming checkpoints, DLT pipelines, ...). By default it scans the whole DBFS Root (user-accessible part), but you can pass an optional start directory as parameter.
7+
8+
9+
## Usage as command-line utility
10+
11+
1. Install `databricks-sdk` Python package.
12+
1. Setup [environment variables with authentication parameters](https://docs.databricks.com/aws/en/dev-tools/auth/) for a workspace that will be analyzed.
13+
1. Run a specific tool:
14+
15+
1. `python scan_compute.py` will scan all compute resources and output results to console and also write them into `compute_scan_results.json` file.
16+
1. `python scan_dbfs.py [start_directory]` will scan DBFS Root (or only a `start_directory` if specified), and output results to console and also write them into `dbfs_scan_results.json` file.
17+
18+
## Usage inside the workspace
19+
20+
1. As **workspace administrator** open the corresponding notebook (`scan_compute_nb` or `scan_dbfs_nb`).
21+
1. Attach to a cluster (i.e. Serverless).
22+
1. Specify parameters in widgets (start directory for `scan_dbfs_nb`, or output directory if you want to persist result to UC Volume (`/Volumes/<catalog>/<schema>/<volume>/`).
23+
1. Press "Run all" and wait for finishing the execution.
24+
1. If output directory is specified, files `compute_scan_results.json` or `dbfs_scan_results.json` will be stored in that directory.
25+
26+
27+
## Known issues
28+
29+
* Scan of DBFS is very slow due to the single-threaded implementation.
30+
* Output file names for results are hardcoded.
31+
32+
33+
## TODOs
34+
35+
* \[ \] Use `blueprints` library for logging
36+
* \[ \] For all compute objects try to find last run/update timestamp...
37+
* \[ \] Parallelize scan of DBFS
38+
* \[ \] Allow to customize output file name in command line utilities

dbfs-scanner/helpers/__init__.py

Whitespace-only changes.
Lines changed: 289 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,289 @@
1+
import base64
2+
import json
3+
import re
4+
from typing import Optional, Tuple
5+
6+
from databricks.sdk import WorkspaceClient
7+
from databricks.sdk.service.compute import ClusterSource, ClusterDetails, ClusterSpec
8+
from databricks.sdk.service.jobs import Task
9+
from databricks.sdk.service.pipelines import PipelineCluster
10+
from databricks.sdk.service.workspace import ExportFormat
11+
12+
13+
__notebook_pip_install_from_dbfs__ = re.compile(r"^#\s+MAGIC\s+%pip\s+install.*(/dbfs/.*)$")
14+
__notebook_dbfs_fuse_use__ = re.compile(r"^.*[\"'](/dbfs/[^\"']*)[\"'].*$")
15+
__notebook_dbfs_hdfs_use__ = re.compile(r"^.*[\"'](dbfs:/[^\"']*)[\"'].*$")
16+
17+
18+
def _analyze_notebook_or_wsfile(wc: WorkspaceClient, notebook_path: str) -> Tuple[list, list]:
19+
libs = []
20+
dbfs_file_refs = []
21+
try:
22+
b64 = wc.workspace.export(notebook_path, format=ExportFormat.SOURCE).content
23+
content = base64.b64decode(b64).decode("utf-8")
24+
for l in content.split("\n"):
25+
m = __notebook_pip_install_from_dbfs__.match(l)
26+
if m:
27+
libs.append(m.group(1))
28+
elif not l.lstrip().startswith("#"):
29+
m = __notebook_dbfs_fuse_use__.match(l)
30+
if m:
31+
dbfs_file_refs.append(m.group(1))
32+
m = __notebook_dbfs_hdfs_use__.match(l)
33+
if m:
34+
dbfs_file_refs.append(m.group(1))
35+
36+
except Exception as e:
37+
# print(f"Error occurred while analyzing notebook {notebook_path}: {e}")
38+
pass
39+
40+
return libs, dbfs_file_refs
41+
42+
43+
def analyze_dlt_pipelines(wc: WorkspaceClient) -> dict:
44+
finds = {}
45+
i = 0
46+
for l in wc.pipelines.list_pipelines(max_results=100):
47+
i += 1
48+
if i % 100 == 0:
49+
print(f"Scanned {i} DLT pipelines")
50+
# print("Analyzing pipeline:", l.pipeline_id, l.name)
51+
p = wc.pipelines.get(l.pipeline_id)
52+
if not p:
53+
continue
54+
p_finds = {}
55+
# analyze clusters
56+
clusters = {}
57+
for cl in p.spec.clusters:
58+
cl_finds = _analyze_cluster_spec(cl, {})
59+
if cl_finds:
60+
clusters[cl.label] = cl_finds
61+
if clusters:
62+
p_finds["clusters"] = clusters
63+
# analyze storage
64+
if p.spec.storage:
65+
p_finds["storage"] = p.spec.storage
66+
# analyze libraries
67+
lib_finds = {}
68+
for lib in p.spec.libraries:
69+
lib_path = ""
70+
if lib.notebook:
71+
lib_path = lib.notebook.path
72+
elif lib.file:
73+
lib_path = lib.file.path
74+
if lib_path:
75+
libs, dbfs_file_refs = _analyze_notebook_or_wsfile(wc, lib_path)
76+
if libs or dbfs_file_refs:
77+
d = {}
78+
if libs:
79+
d["libraries"] = libs
80+
if dbfs_file_refs:
81+
d["dbfs_file_refs"] = dbfs_file_refs
82+
lib_finds[lib_path] = d
83+
if lib_finds:
84+
p_finds["libraries"] = lib_finds
85+
if p_finds:
86+
p_finds["name"] = l.name
87+
finds[l.pipeline_id] = p_finds
88+
89+
print(f"Total {i} DLT pipelines")
90+
91+
return finds
92+
93+
94+
def _analyze_task(wc: WorkspaceClient, task: Task ) -> dict:
95+
finds = {}
96+
if task.spark_python_task:
97+
if task.spark_python_task.python_file.startswith("dbfs:/"):
98+
finds["python_file_on_dbfs"] = task.spark_python_task.python_file
99+
elif task.spark_python_task.python_file.startswith("/"):
100+
libs, dbfs_file_refs = _analyze_notebook_or_wsfile(wc, task.spark_python_task.python_file)
101+
if libs or dbfs_file_refs:
102+
finds["python_file_in_ws"] = {
103+
"path": task.spark_python_task.python_file
104+
}
105+
if libs:
106+
finds["python_file_in_ws"]["libraries"] = libs
107+
if dbfs_file_refs:
108+
finds["python_file_in_ws"]["dbfs_file_refs"] = dbfs_file_refs
109+
110+
if task.notebook_task:
111+
libs, dbfs_file_refs = _analyze_notebook_or_wsfile(wc, task.notebook_task.notebook_path)
112+
if libs or dbfs_file_refs:
113+
finds["notebook"] = {
114+
"path": task.notebook_task.notebook_path,
115+
}
116+
if libs:
117+
finds["notebook"]["libraries"] = libs
118+
if dbfs_file_refs:
119+
finds["notebook"]["dbfs_file_refs"] = dbfs_file_refs
120+
121+
if task.for_each_task:
122+
fe_finds = _analyze_task(wc, task.for_each_task.task)
123+
if fe_finds:
124+
finds["for_each_task"] = fe_finds
125+
126+
for lib in (task.libraries or []):
127+
dbfs_lib = ""
128+
if lib.jar and lib.jar.startswith("dbfs:/"):
129+
dbfs_lib = lib.jar
130+
if lib.whl and lib.whl.startswith("dbfs:/"):
131+
dbfs_lib = lib.whl
132+
if lib.egg and lib.egg.startswith("dbfs:/"):
133+
dbfs_lib = lib.egg
134+
if dbfs_lib:
135+
r = finds.get("libraries", [])
136+
r.append(dbfs_lib)
137+
finds["libraries"] = r
138+
139+
if task.new_cluster:
140+
finds = _analyze_cluster_spec(task.new_cluster, finds)
141+
142+
return finds
143+
144+
145+
def analyze_jobs(wc: WorkspaceClient) -> dict:
146+
res = {}
147+
i = 0
148+
for job in wc.jobs.list(expand_tasks=True, limit=100):
149+
i += 1
150+
if i % 100 == 0:
151+
print(f"Scanned {i} jobs")
152+
finds = {}
153+
js = job.settings
154+
# print("Analyzing job:", js.name)
155+
for task in (js.tasks or []):
156+
task_res = _analyze_task(wc, task)
157+
158+
if task_res:
159+
t = finds.get("tasks", {})
160+
t[task.task_key] = task_res
161+
finds["tasks"] = t
162+
163+
jcs_finds = {}
164+
for jc in (js.job_clusters or []):
165+
jc_finds = {}
166+
if jc.new_cluster:
167+
jc_finds = _analyze_cluster_spec(jc.new_cluster, jc_finds)
168+
if jc_finds:
169+
jcs_finds[jc.job_cluster_key] = jc_finds
170+
171+
if jcs_finds:
172+
finds["job_clusters"] = jcs_finds
173+
174+
if finds:
175+
finds["job_name"] = js.name
176+
res[job.job_id] = finds
177+
178+
print(f"Total {i} jobs")
179+
return res
180+
181+
182+
def _analyze_cluster_spec(cl: ClusterDetails | ClusterSpec | PipelineCluster, finds: dict):
183+
# check if we have any init scripts pointing to DBFS
184+
for init_script in (cl.init_scripts or []):
185+
if init_script.dbfs:
186+
r = finds.get("init_scripts", [])
187+
r.append(init_script.dbfs.destination)
188+
finds["init_scripts"] = r
189+
# check if we have cluster conf pointing to DBFS
190+
if cl.cluster_log_conf and cl.cluster_log_conf.dbfs and cl.cluster_log_conf.dbfs.destination:
191+
finds["cluster_log_conf"] = cl.cluster_log_conf.dbfs.destination
192+
193+
return finds
194+
195+
196+
def analyze_clusters(wc: WorkspaceClient) -> dict:
197+
res = {}
198+
i = 0
199+
for cl in wc.clusters.list(page_size=100):
200+
i += 1
201+
if i % 100 == 0:
202+
print(f"Scanned {i} clusters")
203+
if cl.cluster_source not in [ClusterSource.UI, ClusterSource.API]:
204+
continue
205+
# print("Analyzing cluster:", cl.cluster_name)
206+
finds = {}
207+
208+
# check if we have any libraries pointing to DBFS
209+
for lib in wc.libraries.cluster_status(cl.cluster_id):
210+
dbfs_lib = ""
211+
if lib.library.jar and lib.library.jar.startswith("dbfs:/"):
212+
dbfs_lib = lib.library.jar
213+
if lib.library.whl and lib.library.whl.startswith("dbfs:/"):
214+
dbfs_lib = lib.library.whl
215+
if dbfs_lib:
216+
r = finds.get("libraries", [])
217+
r.append(dbfs_lib)
218+
finds["libraries"] = r
219+
220+
finds = _analyze_cluster_spec(cl, finds)
221+
222+
# if we found anything, add it to the results
223+
if finds:
224+
finds["cluster_name"] = cl.cluster_name
225+
res[cl.cluster_id] = finds
226+
227+
print(f"Total {i} clusters")
228+
return res
229+
230+
231+
def _check_policy_definition(sdef: Optional[str], finds: dict):
232+
policy_def = json.loads(sdef or "{}")
233+
for k, v in policy_def.items():
234+
if not isinstance(v, dict):
235+
continue
236+
typ = v.get("type")
237+
if not typ:
238+
continue
239+
val = v.get("value") or v.get("defaultValue")
240+
if typ == "fixed" and k == "cluster_log_conf.path" and val and val.startswith("dbfs:/"):
241+
finds["cluster_log_conf"] = val
242+
if typ == "fixed" and k.startswith("init_scripts.") and k.endswith(".dbfs.destination"):
243+
r = finds.get("init_scripts", [])
244+
r.append(val)
245+
finds["init_scripts"] = r
246+
247+
return finds
248+
249+
250+
def analyze_cluster_policies(wc: WorkspaceClient) -> dict:
251+
res = {}
252+
i = 0
253+
for pf in wc.policy_families.list():
254+
i += 1
255+
if i % 100 == 0:
256+
print(f"Scanned {i} policies")
257+
finds = {}
258+
# print("Analyzing cluster policy family:", pf.name)
259+
finds = _check_policy_definition(pf.definition, finds)
260+
261+
if finds:
262+
finds["policy_name"] = pf.name
263+
res[pf.policy_id] = finds
264+
265+
for pl in wc.cluster_policies.list():
266+
i += 1
267+
if i % 100 == 0:
268+
print(f"Scanned {i} policies")
269+
# print("Analyzing cluster policy:", pl.name)
270+
finds = {}
271+
for lib in (pl.libraries or []):
272+
dbfs_lib = ""
273+
if lib.jar and lib.jar.startswith("dbfs:/"):
274+
dbfs_lib = lib.jar
275+
if lib.whl and lib.whl.startswith("dbfs:/"):
276+
dbfs_lib = lib.whl
277+
if dbfs_lib:
278+
r = finds.get("libraries", [])
279+
r.append(dbfs_lib)
280+
finds["libraries"] = r
281+
282+
finds = _check_policy_definition(pl.definition, finds)
283+
284+
if finds:
285+
finds["policy_name"] = pl.name
286+
res[pl.policy_id] = finds
287+
288+
print(f"Total {i} cluster policies and families")
289+
return res

0 commit comments

Comments
 (0)