Skip to content

Add option to specify the database schema in addition to 'database' (e.g. for starrocks) #52

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions dbt_superset_lineage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ def pull_dashboards(dbt_project_dir: str = typer.Option('.', help="Directory pat
"needs to be added to source-paths in dbt_project.yml."),
dbt_db_name: str = typer.Option(None, help="Name of your database within dbt towards which "
"the pull should be reduced to run."),
dbt_schema_name: str = typer.Option(None, help="Name of your schema within dbt towards which "
"the pull should be reduced to run."),
superset_url: str = typer.Argument(..., help="URL of your Superset, e.g. "
"https://mysuperset.mycompany.com"),
superset_db_id: int = typer.Option(None, help="ID of your database within Superset towards which "
Expand All @@ -28,14 +30,17 @@ def pull_dashboards(dbt_project_dir: str = typer.Option('.', help="Directory pat
help="Refresh token to Superset API.")):

pull_dashboards_main(dbt_project_dir, exposures_path, dbt_db_name,
superset_url, superset_db_id, sql_dialect,
superset_access_token, superset_refresh_token)
dbt_schema_name, superset_url, superset_db_id,
sql_dialect, superset_access_token,
superset_refresh_token)


@app.command()
def push_descriptions(dbt_project_dir: str = typer.Option('.', help="Directory path to dbt project."),
dbt_db_name: str = typer.Option(None, help="Name of your database within dbt to which the script "
"should be reduced to run."),
dbt_schema_name: str = typer.Option(None, help="Name of your schema within dbt towards which "
"the pull should be reduced to run."),
superset_url: str = typer.Argument(..., help="URL of your Superset, e.g. "
"https://mysuperset.mycompany.com"),
superset_db_id: int = typer.Option(None, help="ID of your database within Superset towards which "
Expand All @@ -55,7 +60,7 @@ def push_descriptions(dbt_project_dir: str = typer.Option('.', help="Directory p
superset_refresh_token: str = typer.Option(None, envvar="SUPERSET_REFRESH_TOKEN",
help="Refresh token to Superset API.")):

push_descriptions_main(dbt_project_dir, dbt_db_name,
push_descriptions_main(dbt_project_dir, dbt_db_name, dbt_schema_name,
superset_url, superset_db_id, superset_refresh_columns, superset_pause_after_update,
superset_access_token, superset_refresh_token)

Expand Down
19 changes: 12 additions & 7 deletions dbt_superset_lineage/pull_dashboards.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def get_tables_from_sql(sql, dialect):
return tables


def get_tables_from_dbt(dbt_manifest, dbt_db_name):
def get_tables_from_dbt(dbt_manifest, dbt_db_name, dbt_schema_name):
tables = {}
for table_type in ['nodes', 'sources']:
manifest_subset = dbt_manifest[table_type]
Expand All @@ -85,8 +85,13 @@ def get_tables_from_dbt(dbt_manifest, dbt_db_name):
source = table['unique_id'].split('.')[-2]
table_key = schema + '.' + name

if dbt_db_name is None or database == dbt_db_name:
# fail if it breaks uniqueness constraint
should_add = (
(dbt_db_name is None or database == dbt_db_name) and
(dbt_schema_name is None or schema == dbt_schema_name)
)

if should_add:
print(f"Adding table {table_key} to tables")
assert table_key not in tables, \
f"Table {table_key} is a duplicate name (schema + table) across databases. " \
"This would result in incorrect matching between Superset and dbt. " \
Expand All @@ -99,7 +104,7 @@ def get_tables_from_dbt(dbt_manifest, dbt_db_name):
'ref':
f"ref('{name}')" if table_type == 'nodes'
else f"source('{source}', '{name}')"
}
}

assert tables, "Manifest is empty!"

Expand Down Expand Up @@ -298,8 +303,8 @@ def __init__(self):


def main(dbt_project_dir, exposures_path, dbt_db_name,
superset_url, superset_db_id, sql_dialect,
superset_access_token, superset_refresh_token):
dbt_schema_name, superset_url, superset_db_id,
sql_dialect, superset_access_token, superset_refresh_token):

# require at least one token for Superset
assert superset_access_token is not None or superset_refresh_token is not None, \
Expand All @@ -326,7 +331,7 @@ def main(dbt_project_dir, exposures_path, dbt_db_name,
Path(exposures_yaml_path).touch(exist_ok=True)
exposures = {}

dbt_tables = get_tables_from_dbt(dbt_manifest, dbt_db_name)
dbt_tables = get_tables_from_dbt(dbt_manifest, dbt_db_name, dbt_schema_name)
dashboards, dashboards_datasets = get_dashboards_from_superset(superset,
superset_url,
superset_db_id)
Expand Down
13 changes: 9 additions & 4 deletions dbt_superset_lineage/push_descriptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def get_datasets_from_superset(superset, superset_db_id):
return datasets


def get_tables_from_dbt(dbt_manifest, dbt_db_name):
def get_tables_from_dbt(dbt_manifest, dbt_db_name, dbt_schema_name):
tables = {}
for table_type in ['nodes', 'sources']:
manifest_subset = dbt_manifest[table_type]
Expand All @@ -82,7 +82,12 @@ def get_tables_from_dbt(dbt_manifest, dbt_db_name):
columns = table['columns']
description = table['description']

if dbt_db_name is None or database == dbt_db_name:
should_add = (dbt_db_name is None or database == dbt_db_name) and (
dbt_schema_name is None or schema == dbt_schema_name
)

if should_add:
print(f"Pushing table {table_key_short} to tables")
# fail if it breaks uniqueness constraint
assert table_key_short not in tables, \
f"Table {table_key_short} is a duplicate name (schema + table) " \
Expand Down Expand Up @@ -228,7 +233,7 @@ def put_descriptions_to_superset(superset, dataset, superset_pause_after_update)
logging.info("Skipping PUT execute request as nothing would be updated.")


def main(dbt_project_dir, dbt_db_name,
def main(dbt_project_dir, dbt_db_name, dbt_schema_name,
superset_url, superset_db_id, superset_refresh_columns, superset_pause_after_update,
superset_access_token, superset_refresh_token):

Expand All @@ -249,7 +254,7 @@ def main(dbt_project_dir, dbt_db_name,
with open(f'{dbt_project_dir}/target/manifest.json') as f:
dbt_manifest = json.load(f)

dbt_tables = get_tables_from_dbt(dbt_manifest, dbt_db_name)
dbt_tables = get_tables_from_dbt(dbt_manifest, dbt_db_name, dbt_schema_name)

sst_datasets_dbt_filtered = [d for d in sst_datasets if d["key"] in dbt_tables]
logging.info("There are %d physical datasets in Superset with a match in dbt.", len(sst_datasets_dbt_filtered))
Expand Down