Skip to content

Commit af8a56a

Browse files
committed
SQLAlchemy: Add insert_bulk fast-path INSERT method for pandas
This method supports efficient batch inserts using CrateDB's bulk operations endpoint. https://crate.io/docs/crate/reference/en/latest/interfaces/http.html#bulk-operations
1 parent d3f511f commit af8a56a

File tree

8 files changed

+216
-4
lines changed

8 files changed

+216
-4
lines changed

CHANGES.txt

+3
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@ Changes for crate
55
Unreleased
66
==========
77

8+
- SQLAlchemy: Added ``insert_bulk`` fast-path ``INSERT`` method for pandas, in
9+
order to support efficient batch inserts using CrateDB's bulk operations endpoint.
10+
811

912
2023/04/18 0.31.1
1013
=================

docs/by-example/index.rst

+1
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ its corresponding API interfaces, see also :ref:`sqlalchemy-support`.
4848
sqlalchemy/working-with-types
4949
sqlalchemy/advanced-querying
5050
sqlalchemy/inspection-reflection
51+
sqlalchemy/dataframe
5152

5253

5354
.. _Python DB API: https://peps.python.org/pep-0249/
+106
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
.. _sqlalchemy-pandas:
2+
.. _sqlalchemy-dataframe:
3+
4+
================================
5+
SQLAlchemy: DataFrame operations
6+
================================
7+
8+
About
9+
=====
10+
11+
This section of the documentation demonstrates support for efficient batch
12+
``INSERT`` operations with `pandas`_, using the CrateDB SQLAlchemy dialect.
13+
14+
15+
Introduction
16+
============
17+
18+
The :ref:`pandas DataFrame <pandas:api.dataframe>` is a structure that contains
19+
two-dimensional data and its corresponding labels. DataFrames are widely used
20+
in data science, machine learning, scientific computing, and many other
21+
data-intensive fields.
22+
23+
DataFrames are similar to SQL tables or the spreadsheets that you work with in
24+
Excel or Calc. In many cases, DataFrames are faster, easier to use, and more
25+
powerful than tables or spreadsheets because they are an integral part of the
26+
`Python`_ and `NumPy`_ ecosystems.
27+
28+
The :ref:`pandas I/O subsystem <pandas:api.io>` for `relational databases`_
29+
using `SQL`_ is based on `SQLAlchemy`_.
30+
31+
32+
.. rubric:: Table of Contents
33+
34+
.. contents::
35+
:local:
36+
37+
38+
Efficient ``INSERT`` operations with pandas
39+
===========================================
40+
41+
The package provides a ``bulk_insert`` function to use the
42+
:meth:`pandas:pandas.DataFrame.to_sql` method most efficiently, based on the `CrateDB
43+
bulk operations`_ endpoint. It will effectively split your insert workload across
44+
multiple batches, using a defined chunk size.
45+
46+
>>> import sqlalchemy as sa
47+
>>> from pandas._testing import makeTimeDataFrame
48+
>>> from crate.client.sqlalchemy.support import insert_bulk
49+
...
50+
>>> # Define number of records, and chunk size.
51+
>>> INSERT_RECORDS = 42
52+
>>> CHUNK_SIZE = 8
53+
...
54+
>>> # Create a pandas DataFrame, and connect to CrateDB.
55+
>>> df = makeTimeDataFrame(nper=INSERT_RECORDS, freq="S")
56+
>>> engine = sa.create_engine(f"crate://{crate_host}")
57+
...
58+
>>> # Insert content of DataFrame using batches of records.
59+
>>> # Effectively, it's six. 42 / 8 = 5.25.
60+
>>> df.to_sql(
61+
... name="test-testdrive",
62+
... con=engine,
63+
... if_exists="replace",
64+
... index=False,
65+
... chunksize=CHUNK_SIZE,
66+
... method=insert_bulk,
67+
... )
68+
69+
.. TIP::
70+
71+
You will observe that the optimal chunk size highly depends on the shape of
72+
your data, specifically the width of each record, i.e. the number of columns
73+
and their individual sizes. You will need to determine a good chunk size by
74+
running corresponding experiments on your own behalf. For that purpose, you
75+
can use the `insert_pandas.py`_ program as a blueprint.
76+
77+
It is a good idea to start your explorations with a chunk size of 5000, and
78+
then see if performance improves when you increase or decrease that figure.
79+
Chunk sizes of 20000 may also be applicable, but make sure to take the limits
80+
of your HTTP infrastructure into consideration.
81+
82+
In order to learn more about what wide- vs. long-form (tidy, stacked, narrow)
83+
data means in the context of `DataFrame computing`_, let us refer you to `a
84+
general introduction <wide-narrow-general_>`_, the corresponding section in
85+
the `Data Computing book <wide-narrow-data-computing_>`_, and a `pandas
86+
tutorial <wide-narrow-pandas-tutorial_>`_ about the same topic.
87+
88+
89+
.. hidden: Disconnect from database
90+
91+
>>> engine.dispose()
92+
93+
94+
.. _CrateDB bulk operations: https://crate.io/docs/crate/reference/en/latest/interfaces/http.html#bulk-operations
95+
.. _DataFrame computing: https://realpython.com/pandas-dataframe/
96+
.. _insert_pandas.py: https://github.com/crate/crate-python/blob/master/examples/insert_pandas.py
97+
.. _NumPy: https://en.wikipedia.org/wiki/NumPy
98+
.. _pandas: https://en.wikipedia.org/wiki/Pandas_(software)
99+
.. _pandas DataFrame: https://pandas.pydata.org/pandas-docs/stable/reference/frame.html
100+
.. _Python: https://en.wikipedia.org/wiki/Python_(programming_language)
101+
.. _relational databases: https://en.wikipedia.org/wiki/Relational_database
102+
.. _SQL: https://en.wikipedia.org/wiki/SQL
103+
.. _SQLAlchemy: https://aosabook.org/en/v2/sqlalchemy.html
104+
.. _wide-narrow-general: https://en.wikipedia.org/wiki/Wide_and_narrow_data
105+
.. _wide-narrow-data-computing: https://dtkaplan.github.io/DataComputingEbook/chap-wide-vs-narrow.html#chap:wide-vs-narrow
106+
.. _wide-narrow-pandas-tutorial: https://anvil.works/blog/tidy-data

docs/conf.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@
1212
intersphinx_mapping.update({
1313
'py': ('https://docs.python.org/3/', None),
1414
'sa': ('https://docs.sqlalchemy.org/en/14/', None),
15-
'urllib3': ('https://urllib3.readthedocs.io/en/1.26.13/', None)
15+
'urllib3': ('https://urllib3.readthedocs.io/en/1.26.13/', None),
16+
'pandas': ('https://pandas.pydata.org/docs/', None),
1617
})
1718

1819

setup.py

+1
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ def read(path):
7070
'createcoverage>=1,<2',
7171
'stopit>=1.1.2,<2',
7272
'flake8>=4,<7',
73+
'pandas>=2,<3',
7374
'pytz',
7475
# `test_http.py` needs `setuptools.ssl_support`
7576
'setuptools<57',
+62
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
# -*- coding: utf-8; -*-
2+
#
3+
# Licensed to CRATE Technology GmbH ("Crate") under one or more contributor
4+
# license agreements. See the NOTICE file distributed with this work for
5+
# additional information regarding copyright ownership. Crate licenses
6+
# this file to you under the Apache License, Version 2.0 (the "License");
7+
# you may not use this file except in compliance with the License. You may
8+
# obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14+
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15+
# License for the specific language governing permissions and limitations
16+
# under the License.
17+
#
18+
# However, if you have executed another commercial license agreement
19+
# with Crate these terms will supersede the license and you may use the
20+
# software solely pursuant to the terms of the relevant commercial agreement.
21+
import logging
22+
23+
24+
logger = logging.getLogger(__name__)
25+
26+
27+
def insert_bulk(pd_table, conn, keys, data_iter):
28+
"""
29+
Use CrateDB's "bulk operations" endpoint as a fast path for pandas' and Dask's `to_sql()` [1] method.
30+
31+
The idea is to break out of SQLAlchemy, compile the insert statement, and use the raw
32+
DBAPI connection client, in order to invoke a request using `bulk_parameters` [2]::
33+
34+
cursor.execute(sql=sql, bulk_parameters=data)
35+
36+
The vanilla implementation, used by SQLAlchemy, is::
37+
38+
data = [dict(zip(keys, row)) for row in data_iter]
39+
conn.execute(pd_table.table.insert(), data)
40+
41+
Batch chunking will happen outside of this function, for example [3] demonstrates
42+
the relevant code in `pandas.io.sql`.
43+
44+
[1] https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_sql.html
45+
[2] https://crate.io/docs/crate/reference/en/latest/interfaces/http.html#bulk-operations
46+
[3] https://github.com/pandas-dev/pandas/blob/v2.0.1/pandas/io/sql.py#L1011-L1027
47+
"""
48+
49+
# Compile SQL statement and materialize batch.
50+
sql = str(pd_table.table.insert().compile(bind=conn))
51+
data = list(data_iter)
52+
53+
# For debugging and tracing the batches running through this method.
54+
# Because it's a performance-optimized code path, the log statements are not active by default.
55+
# logger.info(f"Bulk SQL: {sql}")
56+
# logger.info(f"Bulk records: {len(data)}")
57+
# logger.info(f"Bulk data: {data}")
58+
59+
# Invoke bulk insert operation.
60+
cursor = conn._dbapi_connection.cursor()
61+
cursor.execute(sql=sql, bulk_parameters=data)
62+
cursor.close()

src/crate/client/sqlalchemy/tests/bulk_test.py

+40-3
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
# However, if you have executed another commercial license agreement
1919
# with Crate these terms will supersede the license and you may use the
2020
# software solely pursuant to the terms of the relevant commercial agreement.
21-
21+
import math
2222
from unittest import TestCase, skipIf
2323
from unittest.mock import patch, MagicMock
2424

@@ -36,8 +36,7 @@
3636

3737

3838
fake_cursor = MagicMock(name='fake_cursor')
39-
FakeCursor = MagicMock(name='FakeCursor', spec=Cursor)
40-
FakeCursor.return_value = fake_cursor
39+
FakeCursor = MagicMock(name='FakeCursor', spec=Cursor, return_value=fake_cursor)
4140

4241

4342
class SqlAlchemyBulkTest(TestCase):
@@ -168,3 +167,41 @@ def test_bulk_save_modern(self):
168167
'Callisto', 37,
169168
)
170169
self.assertSequenceEqual(expected_bulk_args, bulk_args)
170+
171+
@patch('crate.client.connection.Cursor', mock_cursor=FakeCursor)
172+
def test_bulk_save_pandas(self, mock_cursor):
173+
"""
174+
Verify bulk INSERT with pandas.
175+
"""
176+
import sqlalchemy as sa
177+
from pandas._testing import makeTimeDataFrame
178+
from crate.client.sqlalchemy.support import insert_bulk
179+
180+
# 42 records / 8 chunksize = 5.25, which means 6 batches will be emitted.
181+
INSERT_RECORDS = 42
182+
CHUNK_SIZE = 8
183+
OPCOUNT = math.ceil(INSERT_RECORDS / CHUNK_SIZE)
184+
185+
# Create a DataFrame to feed into the database.
186+
df = makeTimeDataFrame(nper=INSERT_RECORDS, freq="S")
187+
188+
dburi = "crate://localhost:4200"
189+
engine = sa.create_engine(dburi, echo=True)
190+
retval = df.to_sql(
191+
name="test-testdrive",
192+
con=engine,
193+
if_exists="replace",
194+
index=False,
195+
chunksize=CHUNK_SIZE,
196+
method=insert_bulk,
197+
)
198+
self.assertIsNone(retval)
199+
200+
# Initializing the query has an overhead of two calls to the cursor object, probably one
201+
# initial connection from the DB-API driver, to inquire the database version, and another
202+
# one, for SQLAlchemy. SQLAlchemy will use it to inquire the table schema using `information_schema`,
203+
# and to eventually issue the `CREATE TABLE ...` statement.
204+
effective_op_count = mock_cursor.call_count - 2
205+
206+
# Verify number of batches.
207+
self.assertEqual(effective_op_count, OPCOUNT)

src/crate/client/tests.py

+1
Original file line numberDiff line numberDiff line change
@@ -385,6 +385,7 @@ def test_suite():
385385
'docs/by-example/sqlalchemy/working-with-types.rst',
386386
'docs/by-example/sqlalchemy/advanced-querying.rst',
387387
'docs/by-example/sqlalchemy/inspection-reflection.rst',
388+
'docs/by-example/sqlalchemy/dataframe.rst',
388389
module_relative=False,
389390
setUp=setUpCrateLayerSqlAlchemy,
390391
tearDown=tearDownDropEntitiesSqlAlchemy,

0 commit comments

Comments
 (0)