From e5e34bab11cecb59abd79bfa92b165d7ca3c4798 Mon Sep 17 00:00:00 2001 From: Adrian Ionescu Date: Sat, 14 Mar 2020 18:54:25 +0100 Subject: [PATCH 1/4] skeleton + working status --- databricks_cli/dbfs/api.py | 4 ++ databricks_cli/dbfs/cli.py | 93 +++++++++++++++++++++++++++++++++++ databricks_cli/sdk/service.py | 14 ++++++ 3 files changed, 111 insertions(+) diff --git a/databricks_cli/dbfs/api.py b/databricks_cli/dbfs/api.py index a2466310..8609d45a 100644 --- a/databricks_cli/dbfs/api.py +++ b/databricks_cli/dbfs/api.py @@ -240,6 +240,10 @@ def cat(self, src): with open(temp_path) as f: click.echo(f.read(), nl=False) + def delete_async_status(self, rm_async_id, headers=None): + return self.client.delete_async_status(rm_async_id, headers=headers) + + class TempDir(object): def __init__(self, remove_on_exit=True): diff --git a/databricks_cli/dbfs/cli.py b/databricks_cli/dbfs/cli.py index dfa22753..d838856e 100644 --- a/databricks_cli/dbfs/cli.py +++ b/databricks_cli/dbfs/cli.py @@ -22,6 +22,7 @@ # limitations under the License. import click +import json from tabulate import tabulate from databricks_cli.utils import eat_exceptions, error_and_quit, CONTEXT_SETTINGS @@ -89,6 +90,97 @@ def rm_cli(api_client, recursive, dbfs_path): DbfsApi(api_client).delete(dbfs_path, recursive) +@click.command(context_settings=CONTEXT_SETTINGS) +@click.option('--recursive', '-r', is_flag=True, default=False) +@click.argument('dbfs_path', type=DbfsPathClickType()) +@debug_option +@profile_option +@eat_exceptions +@provide_api_client +def rm_async_start_cli(api_client, recursive, dbfs_path): + """ + Start a rm-async request. + + To remove a directory you must provide the --recursive flag. + """ + print("rm async start") + print(dbfs_path) + print(recursive) + + +def truncate_string(s, length=100): + if len(s) <= length: + return s + return s[:length] + '...' + + +def _rm_async_status_to_row(run_json): + r = json.loads(run_json) + params = r['task']['notebook_task']['base_parameters'] + state = r['state'] + return ( + r['run_id'], + params['path'], params['recursive'], + state['life_cycle_state'], state['result_state'], + r['run_page_url'] + ) + + +def _rm_async_status_to_table(rm_async_id, runs_json): + ret = [] + if rm_async_id is not None: + r = runs_json['delete_job_run'] + ret.append(_rm_async_status_to_row(r)) + else: + for r in runs_json['delete_job_runs']: + ret.append(_rm_async_status_to_row(r)) + return ret + + +@click.command(context_settings=CONTEXT_SETTINGS) +@click.option('--rm-async-id', required=False) +@debug_option +@profile_option +@eat_exceptions +@provide_api_client +def rm_async_status_cli(api_client, rm_async_id): + """ + Check the status of your rm-async request(s). + """ + status = DbfsApi(api_client).delete_async_status(rm_async_id) + click.echo(tabulate(_rm_async_status_to_table(rm_async_id, status))) + + +@click.command(context_settings=CONTEXT_SETTINGS) +@click.option('--rm-async-id', required=True) +@debug_option +@profile_option +@eat_exceptions +@provide_api_client +def rm_async_cancel_cli(api_client, rm_async_id): + """ + Cancel your rm-async request. + """ + print("rm async cancel") + print(rm_async_id) + + +@click.group(context_settings=CONTEXT_SETTINGS, short_help='Remove files from DBFS asynchronously.') +@debug_option +@profile_option +@eat_exceptions +def rm_async_group(): + """ + Remove files from dbfs asynchronously. + """ + pass + + +rm_async_group.add_command(rm_async_start_cli, name="start") +rm_async_group.add_command(rm_async_status_cli, name="status") +rm_async_group.add_command(rm_async_cancel_cli, name="cancel") + + @click.command(context_settings=CONTEXT_SETTINGS) @click.option('--recursive', '-r', is_flag=True, default=False) @click.option('--overwrite', is_flag=True, default=False) @@ -162,6 +254,7 @@ def cat_cli(api_client, src): dbfs_group.add_command(ls_cli, name='ls') dbfs_group.add_command(mkdirs_cli, name='mkdirs') dbfs_group.add_command(rm_cli, name='rm') +dbfs_group.add_command(rm_async_group, name='rm-async') dbfs_group.add_command(cp_cli, name='cp') dbfs_group.add_command(mv_cli, name='mv') dbfs_group.add_command(cat_cli, name='cat') diff --git a/databricks_cli/sdk/service.py b/databricks_cli/sdk/service.py index de769a34..f501971e 100644 --- a/databricks_cli/sdk/service.py +++ b/databricks_cli/sdk/service.py @@ -514,6 +514,20 @@ def close(self, handle, headers=None): return self.client.perform_query('POST', '/dbfs/close', data=_data, headers=headers) + + + + + + def delete_async_status(self, rm_async_id=None, headers=None): + _data = {} + if rm_async_id is not None: + _data['delete_job_id'] = rm_async_id + return self.client.perform_query('GET', '/dbfs-async/delete/get', data=_data, headers=headers) + else: + return self.client.perform_query('GET', '/dbfs-async/delete/list', data=_data, headers=headers) + + class WorkspaceService(object): def __init__(self, client): self.client = client From 721080279819594421dba531986cccfda3aee903 Mon Sep 17 00:00:00 2001 From: Adrian Ionescu Date: Sat, 14 Mar 2020 20:36:13 +0100 Subject: [PATCH 2/4] all works, except for end-to-end rm --- databricks_cli/dbfs/api.py | 8 +++++ databricks_cli/dbfs/cli.py | 62 +++++++++++++++++++++++++++++------ databricks_cli/sdk/service.py | 17 ++++++++++ 3 files changed, 77 insertions(+), 10 deletions(-) diff --git a/databricks_cli/dbfs/api.py b/databricks_cli/dbfs/api.py index 8609d45a..75b108d9 100644 --- a/databricks_cli/dbfs/api.py +++ b/databricks_cli/dbfs/api.py @@ -240,9 +240,17 @@ def cat(self, src): with open(temp_path) as f: click.echo(f.read(), nl=False) + + + + def delete_async_start(self, dbfs_path, recursive, cluster_id, headers=None): + return self.client.delete_async_start(dbfs_path.absolute_path, recursive, cluster_id, headers=headers) + def delete_async_status(self, rm_async_id, headers=None): return self.client.delete_async_status(rm_async_id, headers=headers) + def delete_async_cancel(self, rm_async_id, headers=None): + return self.client.delete_async_cancel(rm_async_id, headers=headers) class TempDir(object): diff --git a/databricks_cli/dbfs/cli.py b/databricks_cli/dbfs/cli.py index d838856e..dfe7a1ec 100644 --- a/databricks_cli/dbfs/cli.py +++ b/databricks_cli/dbfs/cli.py @@ -21,8 +21,11 @@ # See the License for the specific language governing permissions and # limitations under the License. +from __future__ import print_function + import click import json +import time from tabulate import tabulate from databricks_cli.utils import eat_exceptions, error_and_quit, CONTEXT_SETTINGS @@ -90,22 +93,30 @@ def rm_cli(api_client, recursive, dbfs_path): DbfsApi(api_client).delete(dbfs_path, recursive) + + + + + + @click.command(context_settings=CONTEXT_SETTINGS) -@click.option('--recursive', '-r', is_flag=True, default=False) @click.argument('dbfs_path', type=DbfsPathClickType()) +@click.option('--recursive', '-r', is_flag=True, default=False) +@click.option('--cluster-id', '-c', required=False) @debug_option @profile_option @eat_exceptions @provide_api_client -def rm_async_start_cli(api_client, recursive, dbfs_path): +def rm_async_start_cli(api_client, dbfs_path, recursive, cluster_id): """ Start a rm-async request. To remove a directory you must provide the --recursive flag. """ - print("rm async start") - print(dbfs_path) - print(recursive) + delete_job_id = DbfsApi(api_client).delete_async_start(dbfs_path, recursive, cluster_id) + # rm_async_id = json.dumps({'rm_async_id': delete_job_id['delete_job_id']}) + rm_async_id = delete_job_id['delete_job_id'] + click.echo(_rm_async_status(api_client, rm_async_id)) def truncate_string(s, length=100): @@ -121,7 +132,7 @@ def _rm_async_status_to_row(run_json): return ( r['run_id'], params['path'], params['recursive'], - state['life_cycle_state'], state['result_state'], + state['life_cycle_state'], state.get('result_state'), r['run_page_url'] ) @@ -137,6 +148,11 @@ def _rm_async_status_to_table(rm_async_id, runs_json): return ret +def _rm_async_status(api_client, rm_async_id): + status = DbfsApi(api_client).delete_async_status(rm_async_id) + return tabulate(_rm_async_status_to_table(rm_async_id, status), tablefmt="plain") + + @click.command(context_settings=CONTEXT_SETTINGS) @click.option('--rm-async-id', required=False) @debug_option @@ -147,8 +163,7 @@ def rm_async_status_cli(api_client, rm_async_id): """ Check the status of your rm-async request(s). """ - status = DbfsApi(api_client).delete_async_status(rm_async_id) - click.echo(tabulate(_rm_async_status_to_table(rm_async_id, status))) + click.echo(_rm_async_status(api_client, rm_async_id)) @click.command(context_settings=CONTEXT_SETTINGS) @@ -161,8 +176,27 @@ def rm_async_cancel_cli(api_client, rm_async_id): """ Cancel your rm-async request. """ - print("rm async cancel") - print(rm_async_id) + DbfsApi(api_client).delete_async_cancel(rm_async_id) + + +@click.command(context_settings=CONTEXT_SETTINGS) +@click.option('--rm-async-id', required=True) +@debug_option +@profile_option +@eat_exceptions +@provide_api_client +def rm_async_wait_cli(api_client, rm_async_id): + """ + Wait until your rm-async request is complete. + """ + while True: + click.echo("\r" + _rm_async_status(api_client, rm_async_id), nl=False) + status = DbfsApi(api_client).delete_async_status(rm_async_id) + r = json.loads(status['delete_job_run']) + if r['state'].get('result_state') is not None: + break + time.sleep(1) + click.echo("") @click.group(context_settings=CONTEXT_SETTINGS, short_help='Remove files from DBFS asynchronously.') @@ -179,6 +213,14 @@ def rm_async_group(): rm_async_group.add_command(rm_async_start_cli, name="start") rm_async_group.add_command(rm_async_status_cli, name="status") rm_async_group.add_command(rm_async_cancel_cli, name="cancel") +rm_async_group.add_command(rm_async_wait_cli, name="wait") + + + + + + + @click.command(context_settings=CONTEXT_SETTINGS) diff --git a/databricks_cli/sdk/service.py b/databricks_cli/sdk/service.py index f501971e..db00842e 100644 --- a/databricks_cli/sdk/service.py +++ b/databricks_cli/sdk/service.py @@ -519,6 +519,15 @@ def close(self, handle, headers=None): + def delete_async_start(self, dbfs_path, recursive=None, cluster_id=None, headers=None): + _data = {} + _data['path'] = dbfs_path + if recursive is not None: + _data['recursive'] = recursive + if cluster_id is not None: + _data['cluster_id'] = cluster_id + return self.client.perform_query('POST', '/dbfs-async/delete/submit', data=_data, headers=headers) + def delete_async_status(self, rm_async_id=None, headers=None): _data = {} if rm_async_id is not None: @@ -527,6 +536,14 @@ def delete_async_status(self, rm_async_id=None, headers=None): else: return self.client.perform_query('GET', '/dbfs-async/delete/list', data=_data, headers=headers) + def delete_async_cancel(self, rm_async_id, headers=None): + _data = {} + _data['delete_job_id'] = rm_async_id + return self.client.perform_query('POST', '/dbfs-async/delete/cancel', data=_data, headers=headers) + + + + class WorkspaceService(object): def __init__(self, client): From d3cc7f0e5d90db2813dde8507a1755d14e4a6cda Mon Sep 17 00:00:00 2001 From: Adrian Ionescu Date: Sun, 15 Mar 2020 07:20:35 +0100 Subject: [PATCH 3/4] async group --- databricks_cli/dbfs/cli.py | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/databricks_cli/dbfs/cli.py b/databricks_cli/dbfs/cli.py index dfe7a1ec..9bad4d5d 100644 --- a/databricks_cli/dbfs/cli.py +++ b/databricks_cli/dbfs/cli.py @@ -292,11 +292,26 @@ def cat_cli(api_client, src): DbfsApi(api_client).cat(src) + +@click.group(context_settings=CONTEXT_SETTINGS, short_help='Perform asynchronous DBFS operations.') +@debug_option +@profile_option +@eat_exceptions +def async_group(): + """ + Remove files from dbfs asynchronously. + """ + pass + + +async_group.add_command(rm_async_group, name='rm') + + dbfs_group.add_command(configure_cli, name='configure') dbfs_group.add_command(ls_cli, name='ls') dbfs_group.add_command(mkdirs_cli, name='mkdirs') dbfs_group.add_command(rm_cli, name='rm') -dbfs_group.add_command(rm_async_group, name='rm-async') +dbfs_group.add_command(async_group, name='async') dbfs_group.add_command(cp_cli, name='cp') dbfs_group.add_command(mv_cli, name='mv') dbfs_group.add_command(cat_cli, name='cat') From f0fcdaf992663330b556fad341d3a70d9b86ff75 Mon Sep 17 00:00:00 2001 From: Adrian Ionescu Date: Sun, 15 Mar 2020 18:23:07 +0100 Subject: [PATCH 4/4] final? --- databricks_cli/dbfs/api.py | 12 ++-- databricks_cli/dbfs/cli.py | 132 +++++++++++++++++++++++----------- databricks_cli/sdk/service.py | 14 ++-- 3 files changed, 104 insertions(+), 54 deletions(-) diff --git a/databricks_cli/dbfs/api.py b/databricks_cli/dbfs/api.py index 75b108d9..238ef662 100644 --- a/databricks_cli/dbfs/api.py +++ b/databricks_cli/dbfs/api.py @@ -243,14 +243,14 @@ def cat(self, src): - def delete_async_start(self, dbfs_path, recursive, cluster_id, headers=None): - return self.client.delete_async_start(dbfs_path.absolute_path, recursive, cluster_id, headers=headers) + def async_delete_start(self, dbfs_path, recursive, cluster_id, headers=None): + return self.client.async_delete_start(dbfs_path.absolute_path, recursive, cluster_id, headers=headers) - def delete_async_status(self, rm_async_id, headers=None): - return self.client.delete_async_status(rm_async_id, headers=headers) + def async_delete_status(self, async_delete_id=None, limit=None, headers=None): + return self.client.async_delete_status(async_delete_id, limit, headers=headers) - def delete_async_cancel(self, rm_async_id, headers=None): - return self.client.delete_async_cancel(rm_async_id, headers=headers) + def async_delete_cancel(self, async_delete_id, headers=None): + return self.client.async_delete_cancel(async_delete_id, headers=headers) class TempDir(object): diff --git a/databricks_cli/dbfs/cli.py b/databricks_cli/dbfs/cli.py index 9bad4d5d..7138522e 100644 --- a/databricks_cli/dbfs/cli.py +++ b/databricks_cli/dbfs/cli.py @@ -24,8 +24,10 @@ from __future__ import print_function import click +from click import UsageError import json import time +from datetime import datetime, timedelta from tabulate import tabulate from databricks_cli.utils import eat_exceptions, error_and_quit, CONTEXT_SETTINGS @@ -78,19 +80,32 @@ def mkdirs_cli(api_client, dbfs_path): @click.command(context_settings=CONTEXT_SETTINGS) -@click.option('--recursive', '-r', is_flag=True, default=False) @click.argument('dbfs_path', type=DbfsPathClickType()) +@click.option('--recursive', '-r', is_flag=True, default=False) +@click.option('--as-job', is_flag=True, default=False) +@click.option('--async', is_flag=True, default=False) +@click.option('--cluster-id', required=False) @debug_option @profile_option @eat_exceptions @provide_api_client -def rm_cli(api_client, recursive, dbfs_path): +def rm_cli(api_client, dbfs_path, recursive, as_job, async, cluster_id): """ Remove files from dbfs. To remove a directory you must provide the --recursive flag. """ - DbfsApi(api_client).delete(dbfs_path, recursive) + if async or (cluster_id is not None): + as_job = True + + if as_job: + id = async_rm_start_impl(api_client, dbfs_path, recursive, cluster_id) + if async: + click.echo(async_rm_status_impl(api_client, id)) + else: + async_rm_wait_impl(api_client, id) + else: + DbfsApi(api_client).delete(dbfs_path, recursive) @@ -98,6 +113,10 @@ def rm_cli(api_client, recursive, dbfs_path): +def async_rm_start_impl(api_client, dbfs_path, recursive, cluster_id): + delete_job_id = DbfsApi(api_client).async_delete_start(dbfs_path, recursive, cluster_id) + return delete_job_id['delete_job_id'] + @click.command(context_settings=CONTEXT_SETTINGS) @click.argument('dbfs_path', type=DbfsPathClickType()) @@ -107,16 +126,14 @@ def rm_cli(api_client, recursive, dbfs_path): @profile_option @eat_exceptions @provide_api_client -def rm_async_start_cli(api_client, dbfs_path, recursive, cluster_id): +def async_rm_start_cli(api_client, dbfs_path, recursive, cluster_id): """ Start a rm-async request. To remove a directory you must provide the --recursive flag. """ - delete_job_id = DbfsApi(api_client).delete_async_start(dbfs_path, recursive, cluster_id) - # rm_async_id = json.dumps({'rm_async_id': delete_job_id['delete_job_id']}) - rm_async_id = delete_job_id['delete_job_id'] - click.echo(_rm_async_status(api_client, rm_async_id)) + id = async_rm_start_impl(api_client, dbfs_path, recursive, cluster_id) + click.echo(async_rm_status_impl(api_client, id)) def truncate_string(s, length=100): @@ -124,96 +141,127 @@ def truncate_string(s, length=100): return s return s[:length] + '...' +def parse_timestamp(ts): + t = int(ts) / 1000 + return datetime.utcfromtimestamp(t).strftime('%Y-%m-%d %H:%M:%S') + -def _rm_async_status_to_row(run_json): +def async_rm_status_to_row(run_json): r = json.loads(run_json) params = r['task']['notebook_task']['base_parameters'] state = r['state'] + if 'result_state' in state: + result = state['result_state'] + duration_ms = r['setup_duration'] + r['execution_duration'] + r['cleanup_duration'] + duration = timedelta(milliseconds=duration_ms) + else: + result = "" + duration = "" + + cluster_type = 'new' if 'new_cluster' in r['cluster_spec'] else "existing" + cluster_id = r['cluster_instance']['cluster_id'] if 'cluster_instance' in r else "" return ( - r['run_id'], - params['path'], params['recursive'], - state['life_cycle_state'], state.get('result_state'), - r['run_page_url'] + r['run_id'], params['path'], params['recursive'], + r['run_page_url'], cluster_type, cluster_id, + parse_timestamp(r['start_time']), state['life_cycle_state'], result, duration ) -def _rm_async_status_to_table(rm_async_id, runs_json): +def async_rm_status_to_table(id, runs_json): ret = [] - if rm_async_id is not None: + if id is not None: r = runs_json['delete_job_run'] - ret.append(_rm_async_status_to_row(r)) + ret.append(async_rm_status_to_row(r)) else: for r in runs_json['delete_job_runs']: - ret.append(_rm_async_status_to_row(r)) + ret.append(async_rm_status_to_row(r)) return ret -def _rm_async_status(api_client, rm_async_id): - status = DbfsApi(api_client).delete_async_status(rm_async_id) - return tabulate(_rm_async_status_to_table(rm_async_id, status), tablefmt="plain") +def async_rm_status_impl(api_client, id, limit=None): + status = DbfsApi(api_client).async_delete_status(id, limit) + if id is not None: + return tabulate(async_rm_status_to_table(id, status), tablefmt="plain") + else: + headers = ( + "id", "path", "recursive", + "run_page_url", "cluster_type", "cluster_id", + "start_time", "state", "result", "duration") + return tabulate(async_rm_status_to_table(id, status), headers, tablefmt='simple') @click.command(context_settings=CONTEXT_SETTINGS) -@click.option('--rm-async-id', required=False) +@click.option('--id', required=False) +@click.option('--limit', required=False) @debug_option @profile_option @eat_exceptions @provide_api_client -def rm_async_status_cli(api_client, rm_async_id): +def async_rm_status_cli(api_client, id, limit): """ Check the status of your rm-async request(s). """ - click.echo(_rm_async_status(api_client, rm_async_id)) + if id is not None and limit is not None: + raise UsageError("You cannot specify both --id and --limit.") + + click.echo(async_rm_status_impl(api_client, id, limit)) @click.command(context_settings=CONTEXT_SETTINGS) -@click.option('--rm-async-id', required=True) +@click.option('--id', required=True) @debug_option @profile_option @eat_exceptions @provide_api_client -def rm_async_cancel_cli(api_client, rm_async_id): +def async_rm_cancel_cli(api_client, id): """ Cancel your rm-async request. """ - DbfsApi(api_client).delete_async_cancel(rm_async_id) + DbfsApi(api_client).async_delete_cancel(id) +def async_rm_wait_impl(api_client, id): + i = 0 + progress_chars = ['/', '-', '\\', '|'] + while True: + status = DbfsApi(api_client).async_delete_status(id) + click.echo("\r" + async_rm_status_impl(api_client, id), nl=False) + r = json.loads(status['delete_job_run']) + if r['state'].get('result_state') is not None: + click.echo(" ") + break + i = (i + 1) % len(progress_chars) + click.echo(" " + progress_chars[i], nl=False) + time.sleep(0.5) + @click.command(context_settings=CONTEXT_SETTINGS) -@click.option('--rm-async-id', required=True) +@click.option('--id', required=True) @debug_option @profile_option @eat_exceptions @provide_api_client -def rm_async_wait_cli(api_client, rm_async_id): +def async_rm_wait_cli(api_client, id): """ Wait until your rm-async request is complete. """ - while True: - click.echo("\r" + _rm_async_status(api_client, rm_async_id), nl=False) - status = DbfsApi(api_client).delete_async_status(rm_async_id) - r = json.loads(status['delete_job_run']) - if r['state'].get('result_state') is not None: - break - time.sleep(1) - click.echo("") + async_rm_wait_impl(api_client, id) @click.group(context_settings=CONTEXT_SETTINGS, short_help='Remove files from DBFS asynchronously.') @debug_option @profile_option @eat_exceptions -def rm_async_group(): +def async_rm_group(): """ Remove files from dbfs asynchronously. """ pass -rm_async_group.add_command(rm_async_start_cli, name="start") -rm_async_group.add_command(rm_async_status_cli, name="status") -rm_async_group.add_command(rm_async_cancel_cli, name="cancel") -rm_async_group.add_command(rm_async_wait_cli, name="wait") +async_rm_group.add_command(async_rm_start_cli, name="start") +async_rm_group.add_command(async_rm_status_cli, name="status") +async_rm_group.add_command(async_rm_cancel_cli, name="cancel") +async_rm_group.add_command(async_rm_wait_cli, name="wait") @@ -304,7 +352,7 @@ def async_group(): pass -async_group.add_command(rm_async_group, name='rm') +async_group.add_command(async_rm_group, name='rm') dbfs_group.add_command(configure_cli, name='configure') diff --git a/databricks_cli/sdk/service.py b/databricks_cli/sdk/service.py index db00842e..86c65c0a 100644 --- a/databricks_cli/sdk/service.py +++ b/databricks_cli/sdk/service.py @@ -519,7 +519,7 @@ def close(self, handle, headers=None): - def delete_async_start(self, dbfs_path, recursive=None, cluster_id=None, headers=None): + def async_delete_start(self, dbfs_path, recursive=None, cluster_id=None, headers=None): _data = {} _data['path'] = dbfs_path if recursive is not None: @@ -528,17 +528,19 @@ def delete_async_start(self, dbfs_path, recursive=None, cluster_id=None, headers _data['cluster_id'] = cluster_id return self.client.perform_query('POST', '/dbfs-async/delete/submit', data=_data, headers=headers) - def delete_async_status(self, rm_async_id=None, headers=None): + def async_delete_status(self, async_delete_id=None, limit=None, headers=None): _data = {} - if rm_async_id is not None: - _data['delete_job_id'] = rm_async_id + if async_delete_id is not None: + _data['delete_job_id'] = async_delete_id return self.client.perform_query('GET', '/dbfs-async/delete/get', data=_data, headers=headers) else: + if limit is not None: + _data['limit'] = limit return self.client.perform_query('GET', '/dbfs-async/delete/list', data=_data, headers=headers) - def delete_async_cancel(self, rm_async_id, headers=None): + def async_delete_cancel(self, async_delete_id, headers=None): _data = {} - _data['delete_job_id'] = rm_async_id + _data['delete_job_id'] = async_delete_id return self.client.perform_query('POST', '/dbfs-async/delete/cancel', data=_data, headers=headers)