|
1 | 1 | """
|
2 |
| -databricks-sql-connector includes a SQLAlchemy dialect compatible with Databricks SQL. |
3 |
| -It aims to be a drop-in replacement for the crflynn/sqlalchemy-databricks project, that implements |
4 |
| -more of the Databricks API, particularly around table reflection, Alembic usage, and data |
5 |
| -ingestion with pandas. |
6 |
| -
|
7 |
| -Expected URI format is: databricks+thrift://token:dapi***@***.cloud.databricks.com?http_path=/sql/*** |
8 |
| -
|
9 |
| -Because of the extent of SQLAlchemy's capabilities it isn't feasible to provide examples of every |
10 |
| -usage in a single script, so we only provide a basic one here. More examples are found in our test |
11 |
| -suite at tests/e2e/sqlalchemy/test_basic.py and in the PR that implements this change: |
12 |
| -
|
13 |
| -https://github.com/databricks/databricks-sql-python/pull/57 |
14 |
| -
|
15 |
| -# What's already supported |
16 |
| -
|
17 |
| -Most of the functionality is demonstrated in the e2e tests mentioned above. The below list we |
18 |
| -derived from those test method names: |
19 |
| -
|
20 |
| - - Create and drop tables with SQLAlchemy Core |
21 |
| - - Create and drop tables with SQLAlchemy ORM |
22 |
| - - Read created tables via reflection |
23 |
| - - Modify column nullability |
24 |
| - - Insert records manually |
25 |
| - - Insert records with pandas.to_sql (note that this does not work for DataFrames with indexes) |
26 |
| -
|
27 |
| -This connector also aims to support Alembic for programmatic delta table schema maintenance. This |
28 |
| -behaviour is not yet backed by integration tests, which will follow in a subsequent PR as we learn |
29 |
| -more about customer use cases there. That said, the following behaviours have been tested manually: |
30 |
| -
|
31 |
| - - Autogenerate revisions with alembic revision --autogenerate |
32 |
| - - Upgrade and downgrade between revisions with `alembic upgrade <revision hash>` and |
33 |
| - `alembic downgrade <revision hash>` |
34 |
| -
|
35 |
| -# Known Gaps |
36 |
| - - MAP, ARRAY, and STRUCT types: this dialect can read these types out as strings. But you cannot |
37 |
| - define a SQLAlchemy model with databricks.sqlalchemy.types.DatabricksMap (e.g.) because |
38 |
| - we haven't implemented them yet. |
39 |
| - - Constraints: with the addition of information_schema to Unity Catalog, Databricks SQL supports |
40 |
| - foreign key and primary key constraints. This dialect can write these constraints but the ability |
41 |
| - for alembic to reflect and modify them programmatically has not been tested. |
| 2 | +databricks-sql-connector includes a SQLAlchemy 2.0 dialect compatible with Databricks SQL. To install |
| 3 | +its dependencies you can run `pip install databricks-sql-connector[sqlalchemy]`. |
| 4 | +
|
| 5 | +The expected connection string format which you can pass to create_engine() is: |
| 6 | +
|
| 7 | +databricks://token:dapi***@***.cloud.databricks.com?http_path=/sql/***&catalog=**&schema=** |
| 8 | +
|
| 9 | +Our dialect implements the majority of SQLAlchemy 2.0's API. Because of the extent of SQLAlchemy's |
| 10 | +capabilities it isn't feasible to provide examples of every usage in a single script, so we only |
| 11 | +provide a basic one here. Learn more about usage in README.sqlalchemy.md in this repo. |
42 | 12 | """
|
43 | 13 |
|
44 |
| -import os |
45 |
| -import sqlalchemy |
46 |
| -from sqlalchemy.orm import Session |
47 |
| -from sqlalchemy import Column, String, Integer, BOOLEAN, create_engine, select |
| 14 | +# fmt: off |
48 | 15 |
|
49 |
| -try: |
50 |
| - from sqlalchemy.orm import declarative_base |
51 |
| -except ImportError: |
52 |
| - from sqlalchemy.ext.declarative import declarative_base |
| 16 | +import os |
| 17 | +from datetime import date, datetime, time, timedelta, timezone |
| 18 | +from decimal import Decimal |
| 19 | +from uuid import UUID |
| 20 | + |
| 21 | +# By convention, backend-specific SQLA types are defined in uppercase |
| 22 | +# This dialect exposes Databricks SQL's TIMESTAMP and TINYINT types |
| 23 | +# as these are not covered by the generic, camelcase types shown below |
| 24 | +from databricks.sqlalchemy import TIMESTAMP, TINYINT |
| 25 | + |
| 26 | +# Beside the CamelCase types shown below, line comments reflect |
| 27 | +# the underlying Databricks SQL / Delta table type |
| 28 | +from sqlalchemy import ( |
| 29 | + BigInteger, # BIGINT |
| 30 | + Boolean, # BOOLEAN |
| 31 | + Column, |
| 32 | + Date, # DATE |
| 33 | + DateTime, # TIMESTAMP_NTZ |
| 34 | + Integer, # INTEGER |
| 35 | + Numeric, # DECIMAL |
| 36 | + String, # STRING |
| 37 | + Time, # STRING |
| 38 | + Uuid, # STRING |
| 39 | + create_engine, |
| 40 | + select, |
| 41 | +) |
| 42 | +from sqlalchemy.orm import DeclarativeBase, Session |
53 | 43 |
|
54 | 44 | host = os.getenv("DATABRICKS_SERVER_HOSTNAME")
|
55 | 45 | http_path = os.getenv("DATABRICKS_HTTP_PATH")
|
|
58 | 48 | schema = os.getenv("DATABRICKS_SCHEMA")
|
59 | 49 |
|
60 | 50 |
|
61 |
| -# Extra arguments are passed untouched to the driver |
62 |
| -# See thrift_backend.py for complete list |
| 51 | +# Extra arguments are passed untouched to databricks-sql-connector |
| 52 | +# See src/databricks/sql/thrift_backend.py for complete list |
63 | 53 | extra_connect_args = {
|
64 | 54 | "_tls_verify_hostname": True,
|
65 | 55 | "_user_agent_entry": "PySQL Example Script",
|
66 | 56 | }
|
67 | 57 |
|
68 |
| -if sqlalchemy.__version__.startswith("1.3"): |
69 |
| - # SQLAlchemy 1.3.x fails to parse the http_path, catalog, and schema from our connection string |
70 |
| - # Pass these in as connect_args instead |
71 |
| - |
72 |
| - conn_string = f"databricks://token:{access_token}@{host}" |
73 |
| - connect_args = dict(catalog=catalog, schema=schema, http_path=http_path) |
74 |
| - all_connect_args = {**extra_connect_args, **connect_args} |
75 |
| - engine = create_engine(conn_string, connect_args=all_connect_args) |
76 |
| -else: |
77 |
| - engine = create_engine( |
78 |
| - f"databricks://token:{access_token}@{host}?http_path={http_path}&catalog={catalog}&schema={schema}", |
79 |
| - connect_args=extra_connect_args, |
80 |
| - ) |
81 |
| - |
82 |
| -session = Session(bind=engine) |
83 |
| -base = declarative_base(bind=engine) |
84 |
| - |
85 |
| - |
86 |
| -class SampleObject(base): |
87 | 58 |
|
88 |
| - __tablename__ = "mySampleTable" |
89 |
| - |
90 |
| - name = Column(String(255), primary_key=True) |
91 |
| - episodes = Column(Integer) |
92 |
| - some_bool = Column(BOOLEAN) |
93 |
| - |
94 |
| - |
95 |
| -base.metadata.create_all() |
96 |
| - |
97 |
| -sample_object_1 = SampleObject(name="Bim Adewunmi", episodes=6, some_bool=True) |
98 |
| -sample_object_2 = SampleObject(name="Miki Meek", episodes=12, some_bool=False) |
99 |
| - |
100 |
| -session.add(sample_object_1) |
101 |
| -session.add(sample_object_2) |
| 59 | +engine = create_engine( |
| 60 | + f"databricks://token:{access_token}@{host}?http_path={http_path}&catalog={catalog}&schema={schema}", |
| 61 | + connect_args=extra_connect_args, echo=True, |
| 62 | +) |
| 63 | + |
| 64 | + |
| 65 | +class Base(DeclarativeBase): |
| 66 | + pass |
| 67 | + |
| 68 | + |
| 69 | +# This object gives a usage example for each supported type |
| 70 | +# for more details on these, see README.sqlalchemy.md |
| 71 | +class SampleObject(Base): |
| 72 | + __tablename__ = "pysql_sqlalchemy_example_table" |
| 73 | + |
| 74 | + bigint_col = Column(BigInteger, primary_key=True) |
| 75 | + string_col = Column(String) |
| 76 | + tinyint_col = Column(TINYINT) |
| 77 | + int_col = Column(Integer) |
| 78 | + numeric_col = Column(Numeric(10, 2)) |
| 79 | + boolean_col = Column(Boolean) |
| 80 | + date_col = Column(Date) |
| 81 | + datetime_col = Column(TIMESTAMP) |
| 82 | + datetime_col_ntz = Column(DateTime) |
| 83 | + time_col = Column(Time) |
| 84 | + uuid_col = Column(Uuid) |
| 85 | + |
| 86 | +# This generates a CREATE TABLE statement against the catalog and schema |
| 87 | +# specified in the connection string |
| 88 | +Base.metadata.create_all(engine) |
| 89 | + |
| 90 | +# Output SQL is: |
| 91 | +# CREATE TABLE pysql_sqlalchemy_example_table ( |
| 92 | +# bigint_col BIGINT NOT NULL, |
| 93 | +# string_col STRING, |
| 94 | +# tinyint_col SMALLINT, |
| 95 | +# int_col INT, |
| 96 | +# numeric_col DECIMAL(10, 2), |
| 97 | +# boolean_col BOOLEAN, |
| 98 | +# date_col DATE, |
| 99 | +# datetime_col TIMESTAMP, |
| 100 | +# datetime_col_ntz TIMESTAMP_NTZ, |
| 101 | +# time_col STRING, |
| 102 | +# uuid_col STRING, |
| 103 | +# PRIMARY KEY (bigint_col) |
| 104 | +# ) USING DELTA |
| 105 | + |
| 106 | +# The code that follows will INSERT a record using SQLAlchemy ORM containing these values |
| 107 | +# and then SELECT it back out. The output is compared to the input to demonstrate that |
| 108 | +# all type information is preserved. |
| 109 | +sample_object = { |
| 110 | + "bigint_col": 1234567890123456789, |
| 111 | + "string_col": "foo", |
| 112 | + "tinyint_col": -100, |
| 113 | + "int_col": 5280, |
| 114 | + "numeric_col": Decimal("525600.01"), |
| 115 | + "boolean_col": True, |
| 116 | + "date_col": date(2020, 12, 25), |
| 117 | + "datetime_col": datetime( |
| 118 | + 1991, 8, 3, 21, 30, 5, tzinfo=timezone(timedelta(hours=-8)) |
| 119 | + ), |
| 120 | + "datetime_col_ntz": datetime(1990, 12, 4, 6, 33, 41), |
| 121 | + "time_col": time(23, 59, 59), |
| 122 | + "uuid_col": UUID(int=255), |
| 123 | +} |
| 124 | +sa_obj = SampleObject(**sample_object) |
102 | 125 |
|
| 126 | +session = Session(engine) |
| 127 | +session.add(sa_obj) |
103 | 128 | session.commit()
|
104 | 129 |
|
105 |
| -# SQLAlchemy 1.3 has slightly different methods |
106 |
| -if sqlalchemy.__version__.startswith("1.3"): |
107 |
| - stmt = select([SampleObject]).where(SampleObject.name.in_(["Bim Adewunmi", "Miki Meek"])) |
108 |
| - output = [i for i in session.execute(stmt)] |
109 |
| -else: |
110 |
| - stmt = select(SampleObject).where(SampleObject.name.in_(["Bim Adewunmi", "Miki Meek"])) |
111 |
| - output = [i for i in session.scalars(stmt)] |
112 |
| - |
113 |
| -assert len(output) == 2 |
114 |
| - |
115 |
| -base.metadata.drop_all() |
| 130 | +# Output SQL is: |
| 131 | +# INSERT INTO |
| 132 | +# pysql_sqlalchemy_example_table ( |
| 133 | +# bigint_col, |
| 134 | +# string_col, |
| 135 | +# tinyint_col, |
| 136 | +# int_col, |
| 137 | +# numeric_col, |
| 138 | +# boolean_col, |
| 139 | +# date_col, |
| 140 | +# datetime_col, |
| 141 | +# datetime_col_ntz, |
| 142 | +# time_col, |
| 143 | +# uuid_col |
| 144 | +# ) |
| 145 | +# VALUES |
| 146 | +# ( |
| 147 | +# :bigint_col, |
| 148 | +# :string_col, |
| 149 | +# :tinyint_col, |
| 150 | +# :int_col, |
| 151 | +# :numeric_col, |
| 152 | +# :boolean_col, |
| 153 | +# :date_col, |
| 154 | +# :datetime_col, |
| 155 | +# :datetime_col_ntz, |
| 156 | +# :time_col, |
| 157 | +# :uuid_col |
| 158 | +# ) |
| 159 | + |
| 160 | +# Here we build a SELECT query using ORM |
| 161 | +stmt = select(SampleObject).where(SampleObject.int_col == 5280) |
| 162 | + |
| 163 | +# Then fetch one result with session.scalar() |
| 164 | +result = session.scalar(stmt) |
| 165 | + |
| 166 | +# Finally, we read out the input data and compare it to the output |
| 167 | +compare = {key: getattr(result, key) for key in sample_object.keys()} |
| 168 | +assert compare == sample_object |
| 169 | + |
| 170 | +# Then we drop the demonstration table |
| 171 | +Base.metadata.drop_all(engine) |
| 172 | + |
| 173 | +# Output SQL is: |
| 174 | +# DROP TABLE pysql_sqlalchemy_example_table |
0 commit comments