Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial version of DBFS Scanner to help with DBFS disablement #379

Merged
merged 1 commit into from
Apr 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions dbfs-scanner/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# A workspace scanner in preparation to DBFS disablement

This folder contains code that could help with preparing to complete disablement of DBFS. There are two tools available (each tool has a command-line variant and as notebook with `_nb` suffix):

* `scan_compute.py` (and `scan_compute_nb`) - performs scanning of compute resources (interactive clusters, jobs, DLT pipelines, policies) for use of resources on DBFS (init scripts, libraries, etc.) and output findings.
* `scan_dbfs.py` (and `scan_dbfs_nb`) - performs scanning of DBFS content and trying to classify it and estimate size (Delta tables, Structure Streaming checkpoints, DLT pipelines, ...). By default it scans the whole DBFS Root (user-accessible part), but you can pass an optional start directory as parameter.


## Usage as command-line utility

1. Install `databricks-sdk` Python package.
1. Setup [environment variables with authentication parameters](https://docs.databricks.com/aws/en/dev-tools/auth/) for a workspace that will be analyzed.
1. Run a specific tool:

1. `python scan_compute.py` will scan all compute resources and output results to console and also write them into `compute_scan_results.json` file.
1. `python scan_dbfs.py [start_directory]` will scan DBFS Root (or only a `start_directory` if specified), and output results to console and also write them into `dbfs_scan_results.json` file.

## Usage inside the workspace

1. As **workspace administrator** open the corresponding notebook (`scan_compute_nb` or `scan_dbfs_nb`).
1. Attach to a cluster (i.e. Serverless).
1. Specify parameters in widgets (start directory for `scan_dbfs_nb`, or output directory if you want to persist result to UC Volume (`/Volumes/<catalog>/<schema>/<volume>/`).
1. Press "Run all" and wait for finishing the execution.
1. If output directory is specified, files `compute_scan_results.json` or `dbfs_scan_results.json` will be stored in that directory.


## Known issues

* Scan of DBFS is very slow due to the single-threaded implementation.
* Output file names for results are hardcoded.


## TODOs

* \[ \] Use `blueprints` library for logging
* \[ \] For all compute objects try to find last run/update timestamp...
* \[ \] Parallelize scan of DBFS
* \[ \] Allow to customize output file name in command line utilities
Empty file.
289 changes: 289 additions & 0 deletions dbfs-scanner/helpers/compute_analyzer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,289 @@
import base64
import json
import re
from typing import Optional, Tuple

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.compute import ClusterSource, ClusterDetails, ClusterSpec
from databricks.sdk.service.jobs import Task
from databricks.sdk.service.pipelines import PipelineCluster
from databricks.sdk.service.workspace import ExportFormat


__notebook_pip_install_from_dbfs__ = re.compile(r"^#\s+MAGIC\s+%pip\s+install.*(/dbfs/.*)$")
__notebook_dbfs_fuse_use__ = re.compile(r"^.*[\"'](/dbfs/[^\"']*)[\"'].*$")
__notebook_dbfs_hdfs_use__ = re.compile(r"^.*[\"'](dbfs:/[^\"']*)[\"'].*$")


def _analyze_notebook_or_wsfile(wc: WorkspaceClient, notebook_path: str) -> Tuple[list, list]:
libs = []
dbfs_file_refs = []
try:
b64 = wc.workspace.export(notebook_path, format=ExportFormat.SOURCE).content
content = base64.b64decode(b64).decode("utf-8")
for l in content.split("\n"):
m = __notebook_pip_install_from_dbfs__.match(l)
if m:
libs.append(m.group(1))
elif not l.lstrip().startswith("#"):
m = __notebook_dbfs_fuse_use__.match(l)
if m:
dbfs_file_refs.append(m.group(1))
m = __notebook_dbfs_hdfs_use__.match(l)
if m:
dbfs_file_refs.append(m.group(1))

except Exception as e:
# print(f"Error occurred while analyzing notebook {notebook_path}: {e}")
pass

return libs, dbfs_file_refs


def analyze_dlt_pipelines(wc: WorkspaceClient) -> dict:
finds = {}
i = 0
for l in wc.pipelines.list_pipelines(max_results=100):
i += 1
if i % 100 == 0:
print(f"Scanned {i} DLT pipelines")
# print("Analyzing pipeline:", l.pipeline_id, l.name)
p = wc.pipelines.get(l.pipeline_id)
if not p:
continue
p_finds = {}
# analyze clusters
clusters = {}
for cl in p.spec.clusters:
cl_finds = _analyze_cluster_spec(cl, {})
if cl_finds:
clusters[cl.label] = cl_finds
if clusters:
p_finds["clusters"] = clusters
# analyze storage
if p.spec.storage:
p_finds["storage"] = p.spec.storage
# analyze libraries
lib_finds = {}
for lib in p.spec.libraries:
lib_path = ""
if lib.notebook:
lib_path = lib.notebook.path
elif lib.file:
lib_path = lib.file.path
if lib_path:
libs, dbfs_file_refs = _analyze_notebook_or_wsfile(wc, lib_path)
if libs or dbfs_file_refs:
d = {}
if libs:
d["libraries"] = libs
if dbfs_file_refs:
d["dbfs_file_refs"] = dbfs_file_refs
lib_finds[lib_path] = d
if lib_finds:
p_finds["libraries"] = lib_finds
if p_finds:
p_finds["name"] = l.name
finds[l.pipeline_id] = p_finds

print(f"Total {i} DLT pipelines")

return finds


def _analyze_task(wc: WorkspaceClient, task: Task ) -> dict:
finds = {}
if task.spark_python_task:
if task.spark_python_task.python_file.startswith("dbfs:/"):
finds["python_file_on_dbfs"] = task.spark_python_task.python_file
elif task.spark_python_task.python_file.startswith("/"):
libs, dbfs_file_refs = _analyze_notebook_or_wsfile(wc, task.spark_python_task.python_file)
if libs or dbfs_file_refs:
finds["python_file_in_ws"] = {
"path": task.spark_python_task.python_file
}
if libs:
finds["python_file_in_ws"]["libraries"] = libs
if dbfs_file_refs:
finds["python_file_in_ws"]["dbfs_file_refs"] = dbfs_file_refs

if task.notebook_task:
libs, dbfs_file_refs = _analyze_notebook_or_wsfile(wc, task.notebook_task.notebook_path)
if libs or dbfs_file_refs:
finds["notebook"] = {
"path": task.notebook_task.notebook_path,
}
if libs:
finds["notebook"]["libraries"] = libs
if dbfs_file_refs:
finds["notebook"]["dbfs_file_refs"] = dbfs_file_refs

if task.for_each_task:
fe_finds = _analyze_task(wc, task.for_each_task.task)
if fe_finds:
finds["for_each_task"] = fe_finds

for lib in (task.libraries or []):
dbfs_lib = ""
if lib.jar and lib.jar.startswith("dbfs:/"):
dbfs_lib = lib.jar
if lib.whl and lib.whl.startswith("dbfs:/"):
dbfs_lib = lib.whl
if lib.egg and lib.egg.startswith("dbfs:/"):
dbfs_lib = lib.egg
if dbfs_lib:
r = finds.get("libraries", [])
r.append(dbfs_lib)
finds["libraries"] = r

if task.new_cluster:
finds = _analyze_cluster_spec(task.new_cluster, finds)

return finds


def analyze_jobs(wc: WorkspaceClient) -> dict:
res = {}
i = 0
for job in wc.jobs.list(expand_tasks=True, limit=100):
i += 1
if i % 100 == 0:
print(f"Scanned {i} jobs")
finds = {}
js = job.settings
# print("Analyzing job:", js.name)
for task in (js.tasks or []):
task_res = _analyze_task(wc, task)

if task_res:
t = finds.get("tasks", {})
t[task.task_key] = task_res
finds["tasks"] = t

jcs_finds = {}
for jc in (js.job_clusters or []):
jc_finds = {}
if jc.new_cluster:
jc_finds = _analyze_cluster_spec(jc.new_cluster, jc_finds)
if jc_finds:
jcs_finds[jc.job_cluster_key] = jc_finds

if jcs_finds:
finds["job_clusters"] = jcs_finds

if finds:
finds["job_name"] = js.name
res[job.job_id] = finds

print(f"Total {i} jobs")
return res


def _analyze_cluster_spec(cl: ClusterDetails | ClusterSpec | PipelineCluster, finds: dict):
# check if we have any init scripts pointing to DBFS
for init_script in (cl.init_scripts or []):
if init_script.dbfs:
r = finds.get("init_scripts", [])
r.append(init_script.dbfs.destination)
finds["init_scripts"] = r
# check if we have cluster conf pointing to DBFS
if cl.cluster_log_conf and cl.cluster_log_conf.dbfs and cl.cluster_log_conf.dbfs.destination:
finds["cluster_log_conf"] = cl.cluster_log_conf.dbfs.destination

return finds


def analyze_clusters(wc: WorkspaceClient) -> dict:
res = {}
i = 0
for cl in wc.clusters.list(page_size=100):
i += 1
if i % 100 == 0:
print(f"Scanned {i} clusters")
if cl.cluster_source not in [ClusterSource.UI, ClusterSource.API]:
continue
# print("Analyzing cluster:", cl.cluster_name)
finds = {}

# check if we have any libraries pointing to DBFS
for lib in wc.libraries.cluster_status(cl.cluster_id):
dbfs_lib = ""
if lib.library.jar and lib.library.jar.startswith("dbfs:/"):
dbfs_lib = lib.library.jar
if lib.library.whl and lib.library.whl.startswith("dbfs:/"):
dbfs_lib = lib.library.whl
if dbfs_lib:
r = finds.get("libraries", [])
r.append(dbfs_lib)
finds["libraries"] = r

finds = _analyze_cluster_spec(cl, finds)

# if we found anything, add it to the results
if finds:
finds["cluster_name"] = cl.cluster_name
res[cl.cluster_id] = finds

print(f"Total {i} clusters")
return res


def _check_policy_definition(sdef: Optional[str], finds: dict):
policy_def = json.loads(sdef or "{}")
for k, v in policy_def.items():
if not isinstance(v, dict):
continue
typ = v.get("type")
if not typ:
continue
val = v.get("value") or v.get("defaultValue")
if typ == "fixed" and k == "cluster_log_conf.path" and val and val.startswith("dbfs:/"):
finds["cluster_log_conf"] = val
if typ == "fixed" and k.startswith("init_scripts.") and k.endswith(".dbfs.destination"):
r = finds.get("init_scripts", [])
r.append(val)
finds["init_scripts"] = r

return finds


def analyze_cluster_policies(wc: WorkspaceClient) -> dict:
res = {}
i = 0
for pf in wc.policy_families.list():
i += 1
if i % 100 == 0:
print(f"Scanned {i} policies")
finds = {}
# print("Analyzing cluster policy family:", pf.name)
finds = _check_policy_definition(pf.definition, finds)

if finds:
finds["policy_name"] = pf.name
res[pf.policy_id] = finds

for pl in wc.cluster_policies.list():
i += 1
if i % 100 == 0:
print(f"Scanned {i} policies")
# print("Analyzing cluster policy:", pl.name)
finds = {}
for lib in (pl.libraries or []):
dbfs_lib = ""
if lib.jar and lib.jar.startswith("dbfs:/"):
dbfs_lib = lib.jar
if lib.whl and lib.whl.startswith("dbfs:/"):
dbfs_lib = lib.whl
if dbfs_lib:
r = finds.get("libraries", [])
r.append(dbfs_lib)
finds["libraries"] = r

finds = _check_policy_definition(pl.definition, finds)

if finds:
finds["policy_name"] = pl.name
res[pl.policy_id] = finds

print(f"Total {i} cluster policies and families")
return res
Loading