Skip to content

Commit

Permalink
Stack CLI: Stack API interface change and little changes to comments (#…
Browse files Browse the repository at this point in the history
…155)

* Added advisory message for stack status file

* added stack status and stack config validation for all resource types along with end-to-end test

* little change in comment

* Added spaces in comments

* Added filesystem configuration for end-to-end test

* moved import copy

* remove instances of using 'NOTEBOOK' and 'DIRECTORY'

* Replaced all magic string fields with global variables

* Changed test names/descriptions along with add more cases to validate outputted stack status from deploy_config

* modularized assertions for validation

* Change interface

* Removed comments

* Little comment change

* Flip ' comments to " comments to be more consistent with JSON

* fix bug

* Change Job ID global variable usage

* added more #*80 outputs and better generalization and modularity to validate functions

* changed comment of test

* switching more instances of deploy_config to deploy and download_config to download

* deleted stack_download test, changes in comments

* Changed test name

* fix test_api add some comments

* Fix merge error

* Clarified global variable comments

* Change in message, change in function name

* Added some more descriptions to CLI
  • Loading branch information
alinxie authored and andrewmchen committed Aug 17, 2018
1 parent a3850cf commit 6d53ab2
Show file tree
Hide file tree
Showing 4 changed files with 219 additions and 212 deletions.
138 changes: 28 additions & 110 deletions databricks_cli/stack/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,14 @@
# Config Outer Fields
STACK_NAME = 'name'
STACK_RESOURCES = 'resources'
STACK_DEPLOYED = 'deployed'
STACK_DEPLOYED = 'deployed' # For Stack Status

# Resource Fields
RESOURCE_ID = 'id'
RESOURCE_SERVICE = 'service'
RESOURCE_PROPERTIES = 'properties'

# Deployed Resource Fields
# Resource Status Fields
RESOURCE_PHYSICAL_ID = 'physical_id'
RESOURCE_DEPLOY_OUTPUT = 'deploy_output'
RESOURCE_DEPLOY_TIMESTAMP = 'timestamp'
Expand All @@ -80,51 +80,7 @@ def __init__(self, api_client):
self.workspace_client = WorkspaceApi(api_client)
self.dbfs_client = DbfsApi(api_client)

def deploy(self, config_path, **kwargs):
"""
Deploys a stack given stack JSON configuration template at path config_path.
Loads the JSON template as well as status JSON if stack has been deployed before.
The working directory is changed to that where the JSON template is contained
so that paths within the stack configuration are relative to the directory of the
JSON template instead of the directory where this function is called.
:param config_path: Path to stack JSON configuration template. Must have the fields of
'name', the name of the stack and 'resources', a list of stack resources.
:return: None.
"""
stack_config = self._load_json(config_path)
status_path = self._generate_stack_status_path(config_path)
stack_status = self._load_json(status_path)
config_dir = os.path.dirname(os.path.abspath(config_path))
cli_dir = os.getcwd()
os.chdir(config_dir) # Switch current working directory to where json config is stored
new_stack_status = self.deploy_config(stack_config, stack_status, **kwargs)
os.chdir(cli_dir)
click.echo("Saving stack status to {}".format(status_path))
self._save_json(status_path, new_stack_status)

def download(self, config_path, **kwargs):
"""
Downloads a stack given stack JSON configuration template at path config_path.
The working directory is changed to that where the JSON template is contained
so that paths within the stack configuration are relative to the directory of the
JSON template instead of the directory where this function is called.
:param config_path: Path to stack JSON configuration template. Must have the fields of
'name', the name of the stack and 'resources', a list of stack resources.
:return: None.
"""
stack_config = self._load_json(config_path)
config_dir = os.path.dirname(os.path.abspath(config_path))
cli_dir = os.getcwd()
os.chdir(config_dir) # Switch current working directory to where json config is stored
self.download_from_config(stack_config, **kwargs)
os.chdir(cli_dir)

def deploy_config(self, stack_config, stack_status=None, **kwargs):
def deploy(self, stack_config, stack_status=None, **kwargs):
"""
Deploys a stack given stack JSON configuration template at path config_path.
Expand All @@ -135,8 +91,11 @@ def deploy_config(self, stack_config, stack_status=None, **kwargs):
:param stack_config: Must have the fields of
'name', the name of the stack and 'resources', a list of stack resources.
:param stack_status: Must have the fields of
:return:
:param stack_status: Must have the fields of 'name', the name of the stack, 'resources',
a list of stack resources, and 'deployed', a list of resource statuses from a previous
deployment.
:return: new_stack_status: The new stack status generated from the deployment of
the given stack_config.
"""
click.echo('#' * 80)
self._validate_config(stack_config)
Expand Down Expand Up @@ -175,7 +134,7 @@ def deploy_config(self, stack_config, stack_status=None, **kwargs):

return new_stack_status

def download_from_config(self, stack_config, **kwargs):
def download(self, stack_config, **kwargs):
"""
Downloads a stack given a dict of the stack configuration.
:param stack_config: dict of stack configuration. Must contain 'name' and 'resources' field.
Expand Down Expand Up @@ -214,13 +173,13 @@ def _deploy_resource(self, resource_config, resource_status=None, **kwargs):
physical_id = resource_status.get(RESOURCE_PHYSICAL_ID) if resource_status else None

if resource_service == JOBS_SERVICE:
click.echo("Deploying job '{}' with properties: \n{}".format(resource_id, json.dumps(
click.echo('Deploying job "{}" with properties: \n{}'.format(resource_id, json.dumps(
resource_properties, indent=2, separators=(',', ': '))))
new_physical_id, deploy_output = self._deploy_job(resource_properties,
physical_id)
elif resource_service == WORKSPACE_SERVICE:
click.echo(
"Deploying workspace asset '{}' with properties \n{}"
'Deploying workspace asset "{}" with properties \n{}'
.format(
resource_id, json.dumps(resource_properties, indent=2, separators=(',', ': '))
)
Expand All @@ -231,7 +190,7 @@ def _deploy_resource(self, resource_config, resource_status=None, **kwargs):
overwrite)
elif resource_service == DBFS_SERVICE:
click.echo(
"Deploying DBFS asset '{}' with properties \n{}".format(
'Deploying DBFS asset "{}" with properties \n{}'.format(
resource_id, json.dumps(resource_properties, indent=2, separators=(',', ': '))
)
)
Expand All @@ -240,7 +199,7 @@ def _deploy_resource(self, resource_config, resource_status=None, **kwargs):
physical_id,
overwrite)
else:
raise StackError("Resource service '{}' not supported".format(resource_service))
raise StackError('Resource service "{}" not supported'.format(resource_service))

new_resource_status = {RESOURCE_ID: resource_id,
RESOURCE_SERVICE: resource_service,
Expand All @@ -266,16 +225,16 @@ def _download_resource(self, resource_config, **kwargs):

if resource_service == WORKSPACE_SERVICE:
click.echo(
"Downloading workspace asset '{}' with properties \n{}"
'Downloading workspace asset "{}" with properties \n{}'
.format(
resource_id, json.dumps(resource_properties, indent=2, separators=(',', ': '))
)
)
overwrite = kwargs.get('overwrite', False)
self._download_workspace(resource_properties, overwrite)
else:
click.echo("Resource service '{}' not supported for download. "
"skipping.".format(resource_service))
click.echo('Resource service "{}" not supported for download. '
'skipping.'.format(resource_service))

def _deploy_job(self, resource_properties, physical_id=None):
"""
Expand Down Expand Up @@ -316,15 +275,15 @@ def _put_job(self, job_settings):
job_name = job_settings.get(JOBS_RESOURCE_NAME)
jobs_same_name = self.jobs_client._list_jobs_by_name(job_name)
if len(jobs_same_name) > 1:
raise StackError("Multiple jobs with the same name '{}' already exist, aborting"
" stack deployment".format(job_name))
raise StackError('Multiple jobs with the same name "{}" already exist, aborting'
' stack deployment'.format(job_name))
elif len(jobs_same_name) == 1:
existing_job = jobs_same_name[0]
creator_name = existing_job.get('creator_user_name')
timestamp = existing_job.get('created_time') / MS_SEC # Convert to readable date.
date_created = datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S')
click.echo("Warning: Job exists with same name '{}' created by {} on {}. Job will "
"be overwritten".format(job_name, creator_name, date_created))
click.echo('Warning: Job exists with same name "{}" created by {} on {}. Job will '
'be overwritten'.format(job_name, creator_name, date_created))
# Calling jobs_client.reset_job directly so as to not call same level function.
self.jobs_client.reset_job({'job_id': existing_job.get('job_id'),
'new_settings': job_settings})
Expand Down Expand Up @@ -363,8 +322,8 @@ def _deploy_workspace(self, resource_properties, physical_id, overwrite):

actual_object_type = DIRECTORY if os.path.isdir(local_path) else NOTEBOOK
if object_type != actual_object_type:
raise StackError("Field '{}' ({}) not consistent"
"with actual object type ({})".format(WORKSPACE_RESOURCE_OBJECT_TYPE,
raise StackError('Field "{}" ({}) not consistent '
'with actual object type ({})'.format(WORKSPACE_RESOURCE_OBJECT_TYPE,
object_type,
actual_object_type))

Expand All @@ -375,7 +334,7 @@ def _deploy_workspace(self, resource_properties, physical_id, overwrite):
# Inference of notebook language and format
language_fmt = WorkspaceLanguage.to_language_and_format(local_path)
if language_fmt is None:
raise StackError("Workspace notebook language and format cannot be inferred"
raise StackError("Workspace notebook language and format cannot be inferred. "
"Please check file extension of notebook file.")
language, fmt = language_fmt
# Create needed directories in workspace.
Expand Down Expand Up @@ -416,7 +375,7 @@ def _download_workspace(self, resource_properties, overwrite):
# Inference of notebook language and format. A tuple of (language, fmt) or Nonetype.
language_fmt = WorkspaceLanguage.to_language_and_format(local_path)
if language_fmt is None:
raise StackError("Workspace Notebook language and format cannot be inferred."
raise StackError("Workspace Notebook language and format cannot be inferred. "
"Please check file extension of notebook 'source_path'.")
(_, fmt) = language_fmt
local_dir = os.path.dirname(os.path.abspath(local_path))
Expand All @@ -426,7 +385,7 @@ def _download_workspace(self, resource_properties, overwrite):
elif object_type == DIRECTORY:
self.workspace_client.export_workspace_dir(workspace_path, local_path, overwrite)
else:
raise StackError("Invalid value for '{}' field: {}"
raise StackError('Invalid value for "{}" field: {}'
.format(WORKSPACE_RESOURCE_OBJECT_TYPE, object_type))

def _deploy_dbfs(self, resource_properties, physical_id, overwrite):
Expand All @@ -451,8 +410,8 @@ def _deploy_dbfs(self, resource_properties, physical_id, overwrite):

if is_dir != os.path.isdir(local_path):
dir_or_file = 'directory' if os.path.isdir(local_path) else 'file'
raise StackError("local source_path '{}' is found to be a {}, but is not specified"
" as one with is_dir: {}."
raise StackError('local source_path "{}" is found to be a {}, but is not specified'
' as one with is_dir: {}.'
.format(local_path, dir_or_file, str(is_dir).lower()))
if is_dir:
click.echo('Uploading directory from {} to DBFS at {}'.format(local_path, dbfs_path))
Expand Down Expand Up @@ -513,7 +472,7 @@ def _validate_config(self, stack_config):
[DBFS_RESOURCE_PATH, DBFS_RESOURCE_SOURCE_PATH,
DBFS_RESOURCE_IS_DIR], resource_properties)
else:
raise StackError("Resource service '{}' not supported".format(resource_service))
raise StackError('Resource service "{}" not supported'.format(resource_service))

def _validate_status(self, stack_status):
"""
Expand Down Expand Up @@ -569,44 +528,3 @@ def _get_resource_to_status_map(self, stack_status):
resource_status
for resource_status in stack_status.get(STACK_DEPLOYED)
}

def _generate_stack_status_path(self, stack_path):
"""
Given a path to the stack configuration template JSON file, generates a path to where the
deployment status JSON will be stored after successful deployment of the stack.
:param stack_path: Path to the stack config template JSON file
:return: The path to the stack status file.
>>> self._generate_stack_status_path('./stack.json')
'./stack.deployed.json'
"""
stack_status_insert = 'deployed'
stack_path_split = stack_path.split('.')
stack_path_split.insert(-1, stack_status_insert)
return '.'.join(stack_path_split)

def _load_json(self, path):
"""
Parse a json file to a readable dict format.
Returns an empty dictionary if the path doesn't exist.
:param path: File path of the JSON stack configuration template.
:return: dict of parsed JSON stack config template.
"""
stack_conf = {}
if os.path.exists(path):
with open(path, 'r') as f:
stack_conf = json.load(f)
return stack_conf

def _save_json(self, path, data):
"""
Writes data to a JSON file.
:param path: Path of JSON file.
:param data: dict- data that wants to by written to JSON file
:return: None
"""
with open(path, 'w') as f:
json.dump(data, f, indent=2, sort_keys=True)
82 changes: 77 additions & 5 deletions databricks_cli/stack/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import json

import click

Expand All @@ -31,8 +33,52 @@
DEBUG_MODE = True


def _generate_stack_status_path(stack_path):
"""
Given a path to the stack configuration template JSON file, generates a path to where the
deployment status JSON will be stored after successful deployment of the stack.
:param stack_path: Path to the stack config template JSON file
:return: The path to the stack status file.
>>> self._generate_stack_status_path('./stack.json')
'./stack.deployed.json'
"""
stack_status_insert = 'deployed'
stack_path_split = stack_path.split('.')
stack_path_split.insert(-1, stack_status_insert)
return '.'.join(stack_path_split)


def _load_json(path):
"""
Parse a json file to a readable dict format.
Returns an empty dictionary if the path doesn't exist.
:param path: File path of the JSON stack configuration template.
:return: dict of parsed JSON stack config template.
"""
stack_conf = {}
if os.path.exists(path):
with open(path, 'r') as f:
stack_conf = json.load(f)
return stack_conf


def _save_json(path, data):
"""
Writes data to a JSON file.
:param path: Path of JSON file.
:param data: dict- data that wants to by written to JSON file
:return: None
"""
with open(path, 'w') as f:
json.dump(data, f, indent=2, sort_keys=True)


@click.command(context_settings=CONTEXT_SETTINGS,
short_help='Deploy stack given a JSON configuration of the stack')
short_help='Deploy a stack given a JSON configuration of the stack')
@click.argument('config_path', type=click.Path(exists=True), required=True)
@click.option('--overwrite', '-o', is_flag=True, default=False, show_default=True,
help='Include to overwrite existing workspace notebooks and dbfs files')
Expand All @@ -43,15 +89,35 @@
def deploy(api_client, config_path, **kwargs):
"""
Deploy a stack to the databricks workspace given a JSON stack configuration template.
After deployment, a stack status will be saves at <config_file_name>.deployed.json. Please
do not edit or move the file as it is generated through the CLI and is used for future
deployments of the stack. If you run into errors with the stack status at deployment,
please delete the stack status file and try the deployment again. If the problem persists,
please raise a Github issue on the Databricks CLI repository at
https://www.github.com/databricks/databricks-cli/issues
"""
click.echo('#' * 80)
click.echo('Deploying stack at: {} with options: {}'.format(config_path, kwargs))
StackApi(api_client).deploy(config_path, **kwargs)
stack_config = _load_json(config_path)
status_path = _generate_stack_status_path(config_path)
stack_status = _load_json(status_path)
config_dir = os.path.dirname(os.path.abspath(config_path))
cli_dir = os.getcwd()
os.chdir(config_dir) # Switch current working directory to where json config is stored
new_stack_status = StackApi(api_client).deploy(stack_config, stack_status, **kwargs)
click.echo('#' * 80)
os.chdir(cli_dir)
click.echo("Saving stack status to {}".format(status_path))
_save_json(status_path, new_stack_status)
click.echo('Note: The stack status file is an automatically generated file that is'
' depended on for the databricks stack CLI to function correctly.'
' Please do not edit the file.')
click.echo('#' * 80)


@click.command(context_settings=CONTEXT_SETTINGS,
short_help='Download stack given a JSON configuration of the stack')
short_help='Download stack notebooks given a JSON configuration of the stack')
@click.argument('config_path', type=click.Path(exists=True), required=True)
@click.option('--overwrite', '-o', is_flag=True, help='Include to overwrite existing'
' notebooks in the local filesystem.')
Expand All @@ -61,11 +127,17 @@ def deploy(api_client, config_path, **kwargs):
@provide_api_client
def download(api_client, config_path, **kwargs):
"""
Download a stack to the databricks workspace given a JSON stack configuration template.
Download workspace notebooks of a stack to the local filesystem given a JSON stack
configuration template.
"""
click.echo('#' * 80)
click.echo('Downloading stack at: {} with options: {}'.format(config_path, kwargs))
StackApi(api_client).download(config_path, **kwargs)
stack_config = _load_json(config_path)
config_dir = os.path.dirname(os.path.abspath(config_path))
cli_dir = os.getcwd()
os.chdir(config_dir) # Switch current working directory to where json config is stored
StackApi(api_client).download(stack_config, **kwargs)
os.chdir(cli_dir)
click.echo('#' * 80)


Expand Down
Loading

0 comments on commit 6d53ab2

Please sign in to comment.