Skip to content

[HACKATHON] DBFS Async Deletes #282

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions databricks_cli/dbfs/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,18 @@ def cat(self, src):
click.echo(f.read(), nl=False)




def async_delete_start(self, dbfs_path, recursive, cluster_id, headers=None):
return self.client.async_delete_start(dbfs_path.absolute_path, recursive, cluster_id, headers=headers)

def async_delete_status(self, async_delete_id=None, limit=None, headers=None):
return self.client.async_delete_status(async_delete_id, limit, headers=headers)

def async_delete_cancel(self, async_delete_id, headers=None):
return self.client.async_delete_cancel(async_delete_id, headers=headers)


class TempDir(object):
def __init__(self, remove_on_exit=True):
self._dir = None
Expand Down
204 changes: 201 additions & 3 deletions databricks_cli/dbfs/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import print_function

import click
from click import UsageError
import json
import time
from datetime import datetime, timedelta
from tabulate import tabulate

from databricks_cli.utils import eat_exceptions, error_and_quit, CONTEXT_SETTINGS
Expand Down Expand Up @@ -74,19 +80,195 @@ def mkdirs_cli(api_client, dbfs_path):


@click.command(context_settings=CONTEXT_SETTINGS)
@click.option('--recursive', '-r', is_flag=True, default=False)
@click.argument('dbfs_path', type=DbfsPathClickType())
@click.option('--recursive', '-r', is_flag=True, default=False)
@click.option('--as-job', is_flag=True, default=False)
@click.option('--async', is_flag=True, default=False)
@click.option('--cluster-id', required=False)
@debug_option
@profile_option
@eat_exceptions
@provide_api_client
def rm_cli(api_client, recursive, dbfs_path):
def rm_cli(api_client, dbfs_path, recursive, as_job, async, cluster_id):
"""
Remove files from dbfs.

To remove a directory you must provide the --recursive flag.
"""
DbfsApi(api_client).delete(dbfs_path, recursive)
if async or (cluster_id is not None):
as_job = True

if as_job:
id = async_rm_start_impl(api_client, dbfs_path, recursive, cluster_id)
if async:
click.echo(async_rm_status_impl(api_client, id))
else:
async_rm_wait_impl(api_client, id)
else:
DbfsApi(api_client).delete(dbfs_path, recursive)







def async_rm_start_impl(api_client, dbfs_path, recursive, cluster_id):
delete_job_id = DbfsApi(api_client).async_delete_start(dbfs_path, recursive, cluster_id)
return delete_job_id['delete_job_id']


@click.command(context_settings=CONTEXT_SETTINGS)
@click.argument('dbfs_path', type=DbfsPathClickType())
@click.option('--recursive', '-r', is_flag=True, default=False)
@click.option('--cluster-id', '-c', required=False)
@debug_option
@profile_option
@eat_exceptions
@provide_api_client
def async_rm_start_cli(api_client, dbfs_path, recursive, cluster_id):
"""
Start a rm-async request.

To remove a directory you must provide the --recursive flag.
"""
id = async_rm_start_impl(api_client, dbfs_path, recursive, cluster_id)
click.echo(async_rm_status_impl(api_client, id))


def truncate_string(s, length=100):
if len(s) <= length:
return s
return s[:length] + '...'

def parse_timestamp(ts):
t = int(ts) / 1000
return datetime.utcfromtimestamp(t).strftime('%Y-%m-%d %H:%M:%S')


def async_rm_status_to_row(run_json):
r = json.loads(run_json)
params = r['task']['notebook_task']['base_parameters']
state = r['state']
if 'result_state' in state:
result = state['result_state']
duration_ms = r['setup_duration'] + r['execution_duration'] + r['cleanup_duration']
duration = timedelta(milliseconds=duration_ms)
else:
result = ""
duration = ""

cluster_type = 'new' if 'new_cluster' in r['cluster_spec'] else "existing"
cluster_id = r['cluster_instance']['cluster_id'] if 'cluster_instance' in r else ""
return (
r['run_id'], params['path'], params['recursive'],
r['run_page_url'], cluster_type, cluster_id,
parse_timestamp(r['start_time']), state['life_cycle_state'], result, duration
)


def async_rm_status_to_table(id, runs_json):
ret = []
if id is not None:
r = runs_json['delete_job_run']
ret.append(async_rm_status_to_row(r))
else:
for r in runs_json['delete_job_runs']:
ret.append(async_rm_status_to_row(r))
return ret


def async_rm_status_impl(api_client, id, limit=None):
status = DbfsApi(api_client).async_delete_status(id, limit)
if id is not None:
return tabulate(async_rm_status_to_table(id, status), tablefmt="plain")
else:
headers = (
"id", "path", "recursive",
"run_page_url", "cluster_type", "cluster_id",
"start_time", "state", "result", "duration")
return tabulate(async_rm_status_to_table(id, status), headers, tablefmt='simple')


@click.command(context_settings=CONTEXT_SETTINGS)
@click.option('--id', required=False)
@click.option('--limit', required=False)
@debug_option
@profile_option
@eat_exceptions
@provide_api_client
def async_rm_status_cli(api_client, id, limit):
"""
Check the status of your rm-async request(s).
"""
if id is not None and limit is not None:
raise UsageError("You cannot specify both --id and --limit.")

click.echo(async_rm_status_impl(api_client, id, limit))


@click.command(context_settings=CONTEXT_SETTINGS)
@click.option('--id', required=True)
@debug_option
@profile_option
@eat_exceptions
@provide_api_client
def async_rm_cancel_cli(api_client, id):
"""
Cancel your rm-async request.
"""
DbfsApi(api_client).async_delete_cancel(id)


def async_rm_wait_impl(api_client, id):
i = 0
progress_chars = ['/', '-', '\\', '|']
while True:
status = DbfsApi(api_client).async_delete_status(id)
click.echo("\r" + async_rm_status_impl(api_client, id), nl=False)
r = json.loads(status['delete_job_run'])
if r['state'].get('result_state') is not None:
click.echo(" ")
break
i = (i + 1) % len(progress_chars)
click.echo(" " + progress_chars[i], nl=False)
time.sleep(0.5)

@click.command(context_settings=CONTEXT_SETTINGS)
@click.option('--id', required=True)
@debug_option
@profile_option
@eat_exceptions
@provide_api_client
def async_rm_wait_cli(api_client, id):
"""
Wait until your rm-async request is complete.
"""
async_rm_wait_impl(api_client, id)


@click.group(context_settings=CONTEXT_SETTINGS, short_help='Remove files from DBFS asynchronously.')
@debug_option
@profile_option
@eat_exceptions
def async_rm_group():
"""
Remove files from dbfs asynchronously.
"""
pass


async_rm_group.add_command(async_rm_start_cli, name="start")
async_rm_group.add_command(async_rm_status_cli, name="status")
async_rm_group.add_command(async_rm_cancel_cli, name="cancel")
async_rm_group.add_command(async_rm_wait_cli, name="wait")









@click.command(context_settings=CONTEXT_SETTINGS)
Expand Down Expand Up @@ -158,10 +340,26 @@ def cat_cli(api_client, src):
DbfsApi(api_client).cat(src)



@click.group(context_settings=CONTEXT_SETTINGS, short_help='Perform asynchronous DBFS operations.')
@debug_option
@profile_option
@eat_exceptions
def async_group():
"""
Remove files from dbfs asynchronously.
"""
pass


async_group.add_command(async_rm_group, name='rm')


dbfs_group.add_command(configure_cli, name='configure')
dbfs_group.add_command(ls_cli, name='ls')
dbfs_group.add_command(mkdirs_cli, name='mkdirs')
dbfs_group.add_command(rm_cli, name='rm')
dbfs_group.add_command(async_group, name='async')
dbfs_group.add_command(cp_cli, name='cp')
dbfs_group.add_command(mv_cli, name='mv')
dbfs_group.add_command(cat_cli, name='cat')
33 changes: 33 additions & 0 deletions databricks_cli/sdk/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,39 @@ def close(self, handle, headers=None):
return self.client.perform_query('POST', '/dbfs/close', data=_data, headers=headers)







def async_delete_start(self, dbfs_path, recursive=None, cluster_id=None, headers=None):
_data = {}
_data['path'] = dbfs_path
if recursive is not None:
_data['recursive'] = recursive
if cluster_id is not None:
_data['cluster_id'] = cluster_id
return self.client.perform_query('POST', '/dbfs-async/delete/submit', data=_data, headers=headers)

def async_delete_status(self, async_delete_id=None, limit=None, headers=None):
_data = {}
if async_delete_id is not None:
_data['delete_job_id'] = async_delete_id
return self.client.perform_query('GET', '/dbfs-async/delete/get', data=_data, headers=headers)
else:
if limit is not None:
_data['limit'] = limit
return self.client.perform_query('GET', '/dbfs-async/delete/list', data=_data, headers=headers)

def async_delete_cancel(self, async_delete_id, headers=None):
_data = {}
_data['delete_job_id'] = async_delete_id
return self.client.perform_query('POST', '/dbfs-async/delete/cancel', data=_data, headers=headers)





class WorkspaceService(object):
def __init__(self, client):
self.client = client
Expand Down