Skip to content

Commit c9f8465

Browse files
authored
Merge pull request #119 from oracle/fix/oml_client_session
Session info in OML Python jobs
2 parents db81c27 + 395cc05 commit c9f8465

File tree

9 files changed

+79
-29
lines changed

9 files changed

+79
-29
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,4 +147,5 @@ doc/build.gitbak
147147
.venv1.4/
148148
.venv1.5/
149149
.venv1.6/
150+
.venv*/
150151
dbt_adbs_py_test_project

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# Configuration variables
2-
VERSION=1.5.4
2+
VERSION=1.5.5
33
PROJ_DIR?=$(shell pwd)
44
VENV_DIR?=${PROJ_DIR}/.bldenv
55
BUILD_DIR=${PROJ_DIR}/build

dbt/adapters/oracle/connections.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ def open(cls, connection):
213213
}
214214

215215
if oracledb.__name__ == "oracledb":
216-
conn_config['connection_id_prefix'] = 'dbt-oracle-'
216+
conn_config['connection_id_prefix'] = f'dbt-oracle-{dbt_version}-'
217217

218218
if credentials.shardingkey:
219219
conn_config['shardingkey'] = credentials.shardingkey
@@ -240,7 +240,7 @@ def open(cls, connection):
240240
handle = oracledb.connect(**conn_config)
241241
# session_info is stored in v$session
242242
session_info = cls.get_session_info(credentials=credentials)
243-
logger.info(f"Session info :{json.dumps(session_info)}")
243+
logger.debug(f"Session info :{json.dumps(session_info)}")
244244
for k, v in session_info.items():
245245
try:
246246
setattr(handle, k, v)

dbt/adapters/oracle/python_submissions.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
import http
1818
import json
1919
from typing import Dict
20+
import uuid
21+
import platform
2022

2123
import requests
2224
import time
@@ -25,6 +27,7 @@
2527
from dbt.adapters.oracle import OracleAdapterCredentials
2628
from dbt.events import AdapterLogger
2729
from dbt.ui import red, green
30+
from dbt.version import __version__ as dbt_version
2831

2932
# ADB-S OML Rest API minimum timeout is 1800 seconds
3033
DEFAULT_TIMEOUT_IN_SECONDS = 1800
@@ -140,6 +143,20 @@ def __init__(self,
140143
self.oml4py_client = OracleOML4PYClient(oml_cloud_service_url=credential.oml_cloud_service_url,
141144
username=credential.user,
142145
password=credential.password)
146+
self.session_info = self.get_session_info(credentials=credential)
147+
148+
@staticmethod
149+
def get_session_info(credentials):
150+
default_action = "DBT RUN OML4PY"
151+
default_client_identifier = f'dbt-oracle-client-{uuid.uuid4()}'
152+
default_client_info = "_".join([platform.node(), platform.machine()])
153+
default_module = f'dbt-{dbt_version}'
154+
return {
155+
"action": credentials.session_info.get("action", default_action),
156+
"client_identifier": credentials.session_info.get("client_identifier", default_client_identifier),
157+
"clientinfo": credentials.session_info.get("client_info", default_client_info),
158+
"module": credentials.session_info.get("module", default_module)
159+
}
143160

144161
def schedule_async_job_and_wait_for_completion(self, data):
145162
logger.info(f"Running Python aysnc job using {data}")
@@ -196,7 +213,8 @@ def schedule_async_job_and_wait_for_completion(self, data):
196213

197214
def __call__(self, *args, **kwargs):
198215
data = {
199-
"service": self.service
216+
"service": self.service,
217+
"parameters": json.dumps(self.session_info)
200218
}
201219
if self.async_flag:
202220
data["asyncFlag"] = self.async_flag

dbt/include/oracle/macros/materializations/python_model/python.sql

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,26 @@
5454
{% endmacro %}
5555

5656
{% macro py_script_postfix(model) %}
57-
def main():
57+
def main(action, client_identifier, clientinfo, module):
5858
import oml
59+
def set_connection_attributes():
60+
try:
61+
connection = oml.core.methods._get_conn()
62+
except Exception:
63+
raise
64+
else:
65+
session_info = {"action": action,
66+
"client_identifier": client_identifier,
67+
"clientinfo": clientinfo,
68+
"module": module}
69+
for k, v in session_info.items():
70+
try:
71+
setattr(connection, k, v)
72+
except AttributeError:
73+
pass # ok to be silent, ADB-S Python runtime complains about print statements
74+
75+
set_connection_attributes()
76+
5977
import pandas as pd
6078
{{ build_ref_function(model ) }}
6179
{{ build_source_function(model ) }}
@@ -86,6 +104,12 @@ def main():
86104
self.this = this()
87105
self.is_incremental = {{ is_incremental() }}
88106

107+
def materialize(df, table, session):
108+
if isinstance(df, pd.core.frame.DataFrame):
109+
oml.create(df, table=table)
110+
elif isinstance(df, oml.core.frame.DataFrame):
111+
df.materialize(table=table)
112+
89113
{{ model.raw_code | indent(width=4, first=False, blank=True)}}
90114

91115

dbt/include/oracle/macros/materializations/table/table.sql

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -100,26 +100,26 @@
100100

101101
{% macro py_write_table(compiled_code, target_relation, temporary=False) %}
102102
{{ compiled_code.replace(model.raw_code, "", 1) }}
103-
def materialize(df, table, session):
104-
if isinstance(df, pd.core.frame.DataFrame):
105-
oml.create(df, table=table)
106-
elif isinstance(df, oml.core.frame.DataFrame):
107-
df.materialize(table=table)
108-
109-
dbt = dbtObj(load_df_function=oml.sync)
110-
final_df = model(dbt, session=oml)
111-
112-
{{ log("Python model materialization is " ~ model.config.materialized, info=True) }}
113-
{% if model.config.materialized.lower() == 'table' %}
114-
table_name = f"{dbt.this.identifier}__dbt_tmp"
115-
{% else %}
116-
# incremental materialization
117-
{% if temporary %}
118-
table_name = "{{target_relation.identifier}}"
119-
{% else %}
120-
table_name = dbt.this.identifier
121-
{% endif %}
122-
{% endif %}
123-
materialize(final_df, table=table_name.upper(), session=oml)
124-
return pd.DataFrame.from_dict({"result": [1]})
103+
try:
104+
dbt = dbtObj(load_df_function=oml.sync)
105+
set_connection_attributes()
106+
final_df = model(dbt, session=oml)
107+
{{ log("Python model materialization is " ~ model.config.materialized, info=True) }}
108+
{% if model.config.materialized.lower() == 'table' %}
109+
table_name = f"{dbt.this.identifier}__dbt_tmp"
110+
{% else %}
111+
# incremental materialization
112+
{% if temporary %}
113+
table_name = "{{target_relation.identifier}}"
114+
{% else %}
115+
table_name = dbt.this.identifier
116+
{% endif %}
117+
{% endif %}
118+
materialize(final_df, table=table_name.upper(), session=oml)
119+
return pd.DataFrame.from_dict({"result": [1]})
120+
except Exception:
121+
raise
122+
finally:
123+
connection = oml.core.methods._get_conn()
124+
connection.close()
125125
{% endmacro %}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
def model(dbt, session):
2+
dbt.config(materialized="table")
3+
dbt.config(async_flag=True)
4+
dbt.config(timeout=1800)
5+
# oml.core.DataFrame referencing a dbt-sql model
6+
sales = session.sync(query="SELECT * FROM SH.SALES")
7+
return sales

setup.cfg

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[metadata]
22
name = dbt-oracle
3-
version = 1.5.4
3+
version = 1.5.5
44
description = dbt (data build tool) adapter for the Oracle database
55
long_description = file: README.md
66
long_description_content_type = text/markdown

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@
5252

5353
url = 'https://github.com/oracle/dbt-oracle'
5454

55-
VERSION = '1.5.4'
55+
VERSION = '1.5.5'
5656
setup(
5757
author="Oracle",
5858
python_requires='>=3.7.2',

0 commit comments

Comments
 (0)