Skip to content

Commit eeda847

Browse files
committed
python models support
1 parent 36a09e7 commit eeda847

File tree

4 files changed

+238
-13
lines changed

4 files changed

+238
-13
lines changed

dbt/include/iris/macros/adapters.sql

+19-13
Original file line numberDiff line numberDiff line change
@@ -124,19 +124,25 @@ dbt docs: https://docs.getdbt.com/docs/contributing/building-a-new-adapter
124124

125125
{%- endmacro %}
126126

127-
{% macro iris__create_table_as(temporary, relation, sql) -%}
128-
{%- set sql_header = config.get('sql_header', none) -%}
129-
{% if temporary: -%}
130-
{% call statement('drop_relation') %}
131-
drop table if exists {{ relation }} cascade %DELDATA
132-
{% endcall %}
133-
{%- endif %}
134-
/* create_table_as */
135-
{{ sql_header if sql_header is not none }}
136-
create {% if temporary: -%}global temporary{%- endif %} table
137-
{{ relation }}
138-
as
139-
{{ sql }}
127+
{% macro iris__create_table_as(temporary, relation, compiled_code, language='sql') -%}
128+
{%- if language == 'sql' -%}
129+
{%- set sql_header = config.get('sql_header', none) -%}
130+
{% if temporary: -%}
131+
{% call statement('drop_relation') %}
132+
drop table if exists {{ relation }} cascade %DELDATA
133+
{% endcall %}
134+
{%- endif %}
135+
/* create_table_as */
136+
{{ sql_header if sql_header is not none }}
137+
create {% if temporary: -%}global temporary{%- endif %} table
138+
{{ relation }}
139+
as
140+
{{ compiled_code }}
141+
{%- elif language == 'python' -%}
142+
{{ py_write_table(compiled_code=compiled_code, target_relation=relation, temporary=temporary) }}
143+
{%- else -%}
144+
{% do exceptions.raise_compiler_error("iris__create_table_as macro didn't get supported language, it got %s" % language) %}
145+
{%- endif -%}
140146
{%- endmacro %}
141147

142148
{% macro iris__rename_relation(from_relation, to_relation) -%}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
{% materialization incremental, adapter='iris', supported_languages=['sql', 'python'] -%}
2+
3+
{%- set language = model['language'] -%}
4+
5+
-- relations
6+
{%- set existing_relation = load_cached_relation(this) -%}
7+
{%- set target_relation = this.incorporate(type='table') -%}
8+
{%- set temp_relation = make_temp_relation(target_relation)-%}
9+
{%- set intermediate_relation = make_intermediate_relation(target_relation)-%}
10+
{%- set backup_relation_type = 'table' if existing_relation is none else existing_relation.type -%}
11+
{%- set backup_relation = make_backup_relation(target_relation, backup_relation_type) -%}
12+
13+
-- configs
14+
{%- set unique_key = config.get('unique_key') -%}
15+
{%- set full_refresh_mode = (should_full_refresh() or existing_relation.is_view) -%}
16+
{%- set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') -%}
17+
18+
-- the temp_ and backup_ relations should not already exist in the database; get_relation
19+
-- will return None in that case. Otherwise, we get a relation that we can drop
20+
-- later, before we try to use this name for the current operation. This has to happen before
21+
-- BEGIN, in a separate transaction
22+
{%- set preexisting_intermediate_relation = load_cached_relation(intermediate_relation)-%}
23+
{%- set preexisting_backup_relation = load_cached_relation(backup_relation) -%}
24+
-- grab current tables grants config for comparision later on
25+
{% set grant_config = config.get('grants') %}
26+
{{ drop_relation_if_exists(preexisting_intermediate_relation) }}
27+
{{ drop_relation_if_exists(preexisting_backup_relation) }}
28+
29+
{{ run_hooks(pre_hooks, inside_transaction=False) }}
30+
31+
-- `BEGIN` happens here:
32+
{{ run_hooks(pre_hooks, inside_transaction=True) }}
33+
34+
{% set to_drop = [] %}
35+
36+
{% if existing_relation is none %}
37+
{%- call statement('main', language=language) -%}
38+
{{ create_table_as(False, target_relation, compiled_code, language) }}
39+
{%- endcall -%}
40+
{% elif full_refresh_mode %}
41+
{%- call statement('main', language=language) -%}
42+
{{ create_table_as(False, intermediate_relation, compiled_code, language) }}
43+
{%- endcall -%}
44+
{% set need_swap = true %}
45+
{% else %}
46+
{%- call statement('main', language=language) -%}
47+
{{ create_table_as(True, temp_relation, compiled_code, language) }}
48+
{%- endcall -%}
49+
50+
{% do adapter.expand_target_column_types(
51+
from_relation=temp_relation,
52+
to_relation=target_relation) %}
53+
{#-- Process schema changes. Returns dict of changes if successful. Use source columns for upserting/merging --#}
54+
{% set dest_columns = process_schema_changes(on_schema_change, temp_relation, existing_relation) %}
55+
{% if not dest_columns %}
56+
{% set dest_columns = adapter.get_columns_in_relation(existing_relation) %}
57+
{% endif %}
58+
59+
{#-- Get the incremental_strategy, the macro to use for the strategy, and build the sql --#}
60+
{% set incremental_strategy = config.get('incremental_strategy') or 'default' %}
61+
{% set incremental_predicates = config.get('incremental_predicates', none) %}
62+
{% set strategy_sql_macro_func = adapter.get_incremental_strategy_macro(context, incremental_strategy) %}
63+
{% set strategy_arg_dict = ({'target_relation': target_relation, 'temp_relation': temp_relation, 'unique_key': unique_key, 'dest_columns': dest_columns, 'predicates': incremental_predicates }) %}
64+
65+
{%- call statement('main') -%}
66+
{{ strategy_sql_macro_func(strategy_arg_dict) }}
67+
{%- endcall -%}
68+
{% endif %}
69+
70+
{% if need_swap %}
71+
{% do adapter.rename_relation(target_relation, backup_relation) %}
72+
{% do adapter.rename_relation(intermediate_relation, target_relation) %}
73+
{% do to_drop.append(backup_relation) %}
74+
{% endif %}
75+
76+
{% set should_revoke = should_revoke(existing_relation, full_refresh_mode) %}
77+
{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}
78+
79+
{% do persist_docs(target_relation, model) %}
80+
81+
{% if existing_relation is none or existing_relation.is_view or should_full_refresh() %}
82+
{% do create_indexes(target_relation) %}
83+
{% endif %}
84+
85+
{{ run_hooks(post_hooks, inside_transaction=True) }}
86+
87+
-- `COMMIT` happens here
88+
{% do adapter.commit() %}
89+
90+
{% for rel in to_drop %}
91+
{% do adapter.drop_relation(rel) %}
92+
{% endfor %}
93+
94+
{{ run_hooks(post_hooks, inside_transaction=False) }}
95+
96+
{{ return({'relations': [target_relation]}) }}
97+
98+
{%- endmaterialization %}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
{% materialization table, adapter='iris', supported_languages=['sql', 'python']-%}
2+
3+
{%- set language = model['language'] -%}
4+
5+
{%- set existing_relation = load_cached_relation(this) -%}
6+
{%- set target_relation = this.incorporate(type='table') %}
7+
{%- set intermediate_relation = make_intermediate_relation(target_relation) -%}
8+
-- the intermediate_relation should not already exist in the database; get_relation
9+
-- will return None in that case. Otherwise, we get a relation that we can drop
10+
-- later, before we try to use this name for the current operation
11+
{%- set preexisting_intermediate_relation = load_cached_relation(intermediate_relation) -%}
12+
/*
13+
See ../view/view.sql for more information about this relation.
14+
*/
15+
{%- set backup_relation_type = 'table' if existing_relation is none else existing_relation.type -%}
16+
{%- set backup_relation = make_backup_relation(target_relation, backup_relation_type) -%}
17+
-- as above, the backup_relation should not already exist
18+
{%- set preexisting_backup_relation = load_cached_relation(backup_relation) -%}
19+
-- grab current tables grants config for comparision later on
20+
{% set grant_config = config.get('grants') %}
21+
22+
-- drop the temp relations if they exist already in the database
23+
{{ drop_relation_if_exists(preexisting_intermediate_relation) }}
24+
{{ drop_relation_if_exists(preexisting_backup_relation) }}
25+
26+
{{ run_hooks(pre_hooks, inside_transaction=False) }}
27+
28+
-- `BEGIN` happens here:
29+
{{ run_hooks(pre_hooks, inside_transaction=True) }}
30+
31+
-- build model
32+
{% call statement('main', language=language) -%}
33+
{{ create_table_as(False, intermediate_relation, compiled_code, language) }}
34+
{%- endcall %}
35+
36+
-- cleanup
37+
{% if existing_relation is not none %}
38+
{{ adapter.rename_relation(existing_relation, backup_relation) }}
39+
{% endif %}
40+
41+
{{ adapter.rename_relation(intermediate_relation, target_relation) }}
42+
43+
{% do create_indexes(target_relation) %}
44+
45+
{{ run_hooks(post_hooks, inside_transaction=True) }}
46+
47+
{% set should_revoke = should_revoke(existing_relation, full_refresh_mode=True) %}
48+
{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}
49+
50+
{% do persist_docs(target_relation, model) %}
51+
52+
-- `COMMIT` happens here
53+
{{ adapter.commit() }}
54+
55+
-- finally, drop the existing/backup relation after the commit
56+
{{ drop_relation_if_exists(backup_relation) }}
57+
58+
{{ run_hooks(post_hooks, inside_transaction=False) }}
59+
60+
{{ return({'relations': [target_relation]}) }}
61+
62+
{% endmaterialization %}
63+
64+
{% macro py_write_table(compiled_code, target_relation, temporary=False) %}
65+
{{ compiled_code }}
66+
67+
import sys
68+
paths = [
69+
'/home/irisowner/sqlalchemy',
70+
'/home/irisowner/sqlalchemy-iris',
71+
'/home/irisowner/intersystems-irispython',
72+
]
73+
for path in paths:
74+
if path not in sys.path:
75+
sys.path.insert(1, path)
76+
try:
77+
import pandas
78+
from sqlalchemy import create_engine
79+
import intersystems_iris
80+
except:
81+
return "ERROR"
82+
83+
class DataFrame(pandas.DataFrame):
84+
def limit(self, num):
85+
return DataFrame(self.iloc[:num])
86+
87+
def filter(self, condition):
88+
return DataFrame(self[condition])
89+
90+
class IRISSession:
91+
default_schema = 'SQLUser'
92+
93+
def __init__(self) -> None:
94+
self.engine = create_engine('iris+emb:///')
95+
96+
def table(self, full_name) -> DataFrame:
97+
[schema, table] = full_name.split('.') if '.' in full_name else [self.default_schema, full_name]
98+
df = pandas.read_sql_table(table, self.engine, schema=schema)
99+
return DataFrame(df)
100+
101+
def to_sql(self, df, table, schema):
102+
df.to_sql(table, self.engine, if_exists='replace', schema=schema)
103+
104+
try:
105+
session = IRISSession()
106+
dbt = dbtObj(session.table)
107+
df = model(dbt, session)
108+
session.to_sql(df, '{{ target_relation.identifier }}', '{{ target_relation.schema }}')
109+
return "OK"
110+
except Exception as ex:
111+
print(ex)
112+
return "ERROR"
113+
{% endmacro %}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
from dbt.tests.adapter.python_model.test_python_model import BasePythonModelTests
2+
from dbt.tests.adapter.python_model.test_python_model import BasePythonIncrementalTests
3+
4+
class TestPythonModelIRIS(BasePythonModelTests):
5+
pass
6+
7+
class TestPythonIncrementalIRIS(BasePythonIncrementalTests):
8+
pass

0 commit comments

Comments
 (0)