Skip to content

Commit decce23

Browse files
amotlhlcianfagna
andcommitted
SQLAlchemy: Add insert_bulk fast-path INSERT method for pandas
This method supports efficient batch inserts using CrateDB's bulk operations endpoint. https://crate.io/docs/crate/reference/en/latest/interfaces/http.html#bulk-operations Co-authored-by: hlcianfagna <[email protected]>
1 parent d3f511f commit decce23

File tree

8 files changed

+248
-4
lines changed

8 files changed

+248
-4
lines changed

CHANGES.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@ Changes for crate
55
Unreleased
66
==========
77

8+
- SQLAlchemy: Added ``insert_bulk`` fast-path ``INSERT`` method for pandas, in
9+
order to support efficient batch inserts using CrateDB's bulk operations endpoint.
10+
811

912
2023/04/18 0.31.1
1013
=================

docs/by-example/index.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ its corresponding API interfaces, see also :ref:`sqlalchemy-support`.
4848
sqlalchemy/working-with-types
4949
sqlalchemy/advanced-querying
5050
sqlalchemy/inspection-reflection
51+
sqlalchemy/dataframe
5152

5253

5354
.. _Python DB API: https://peps.python.org/pep-0249/
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
.. _sqlalchemy-pandas:
2+
.. _sqlalchemy-dataframe:
3+
4+
================================
5+
SQLAlchemy: DataFrame operations
6+
================================
7+
8+
About
9+
=====
10+
11+
This section of the documentation demonstrates support for efficient batch/bulk
12+
``INSERT`` operations with `pandas`_ and `Dask`_, using the CrateDB SQLAlchemy dialect.
13+
14+
Efficient bulk operations are needed for typical `ETL`_ batch processing and
15+
data streaming workloads, for example to move data in- and out of OLAP data
16+
warehouses, as contrasted to interactive online transaction processing (OLTP)
17+
applications. The strategies of `batching`_ together series of records for
18+
improving performance are also referred to as `chunking`_.
19+
20+
21+
Introduction
22+
============
23+
24+
The :ref:`pandas DataFrame <pandas:api.dataframe>` is a structure that contains
25+
two-dimensional data and its corresponding labels. DataFrames are widely used
26+
in data science, machine learning, scientific computing, and many other
27+
data-intensive fields.
28+
29+
DataFrames are similar to SQL tables or the spreadsheets that you work with in
30+
Excel or Calc. In many cases, DataFrames are faster, easier to use, and more
31+
powerful than tables or spreadsheets because they are an integral part of the
32+
`Python`_ and `NumPy`_ ecosystems.
33+
34+
The :ref:`pandas I/O subsystem <pandas:api.io>` for `relational databases`_
35+
using `SQL`_ is based on `SQLAlchemy`_.
36+
37+
38+
.. rubric:: Table of Contents
39+
40+
.. contents::
41+
:local:
42+
43+
44+
Efficient ``INSERT`` operations with pandas
45+
===========================================
46+
47+
The package provides a ``bulk_insert`` function to use the
48+
:meth:`pandas:pandas.DataFrame.to_sql` method more efficiently, based on the
49+
`CrateDB bulk operations`_ endpoint. It will effectively split your insert
50+
workload across multiple batches, using a defined chunk size.
51+
52+
>>> import sqlalchemy as sa
53+
>>> from pandas._testing import makeTimeDataFrame
54+
>>> from crate.client.sqlalchemy.support import insert_bulk
55+
...
56+
>>> # Define number of records, and chunk size.
57+
>>> INSERT_RECORDS = 42
58+
>>> CHUNK_SIZE = 8
59+
...
60+
>>> # Create a pandas DataFrame, and connect to CrateDB.
61+
>>> df = makeTimeDataFrame(nper=INSERT_RECORDS, freq="S")
62+
>>> engine = sa.create_engine(f"crate://{crate_host}")
63+
...
64+
>>> # Insert content of DataFrame using batches of records.
65+
>>> # Effectively, it's six. 42 / 8 = 5.25.
66+
>>> df.to_sql(
67+
... name="test-testdrive",
68+
... con=engine,
69+
... if_exists="replace",
70+
... index=False,
71+
... chunksize=CHUNK_SIZE,
72+
... method=insert_bulk,
73+
... )
74+
75+
.. TIP::
76+
77+
You will observe that the optimal chunk size highly depends on the shape of
78+
your data, specifically the width of each record, i.e. the number of columns
79+
and their individual sizes. You will need to determine a good chunk size by
80+
running corresponding experiments on your own behalf. For that purpose, you
81+
can use the `insert_pandas.py`_ program as a blueprint.
82+
83+
A few details should be taken into consideration when determining the optimal
84+
chunk size for a specific dataset. We are outlining the two major ones.
85+
86+
- First, when working with data larger than the main memory available on your
87+
machine, each chunk should be small enough to fit into the memory, but large
88+
enough to minimize the overhead of a single data insert operation. Depending
89+
on whether you are running other workloads on the same machine, you should
90+
also account for the total share of heap memory you will assign to each domain,
91+
to prevent overloading the system as a whole.
92+
93+
- Second, as each batch is submitted using HTTP, you should know about the request
94+
size limits and other constraints of your HTTP infrastructure, which may include
95+
any types of HTTP intermediaries relaying information between your database client
96+
application and your CrateDB cluster. For example, HTTP proxy servers or load
97+
balancers not optimally configured for performance, or web application firewalls
98+
and intrusion prevention systems may hamper HTTP communication, sometimes in
99+
subtle ways, for example based on request size constraints, or throttling
100+
mechanisms. If you are working with very busy systems, and hosting it on shared
101+
infrastructure, details like `SNAT port exhaustion`_ may also come into play.
102+
103+
You will need to determine a good chunk size by running corresponding experiments
104+
on your own behalf. For that purpose, you can use the `insert_pandas.py`_ program
105+
as a blueprint.
106+
107+
It is a good idea to start your explorations with a chunk size of 5_000, and
108+
then see if performance improves when you increase or decrease that figure.
109+
Chunk sizes of 20000 may also be applicable, but make sure to take the limits
110+
of your HTTP infrastructure into consideration.
111+
112+
In order to learn more about what wide- vs. long-form (tidy, stacked, narrow)
113+
data means in the context of `DataFrame computing`_, let us refer you to `a
114+
general introduction <wide-narrow-general_>`_, the corresponding section in
115+
the `Data Computing book <wide-narrow-data-computing_>`_, and a `pandas
116+
tutorial <wide-narrow-pandas-tutorial_>`_ about the same topic.
117+
118+
119+
.. hidden: Disconnect from database
120+
121+
>>> engine.dispose()
122+
123+
124+
.. _batching: https://en.wikipedia.org/wiki/Batch_processing#Common_batch_processing_usage
125+
.. _chunking: https://en.wikipedia.org/wiki/Chunking_(computing)
126+
.. _CrateDB bulk operations: https://crate.io/docs/crate/reference/en/latest/interfaces/http.html#bulk-operations
127+
.. _DataFrame computing: https://realpython.com/pandas-dataframe/
128+
.. _insert_pandas.py: https://github.com/crate/crate-python/blob/master/examples/insert_pandas.py
129+
.. _NumPy: https://en.wikipedia.org/wiki/NumPy
130+
.. _pandas: https://en.wikipedia.org/wiki/Pandas_(software)
131+
.. _pandas DataFrame: https://pandas.pydata.org/pandas-docs/stable/reference/frame.html
132+
.. _Python: https://en.wikipedia.org/wiki/Python_(programming_language)
133+
.. _relational databases: https://en.wikipedia.org/wiki/Relational_database
134+
.. _SQL: https://en.wikipedia.org/wiki/SQL
135+
.. _SQLAlchemy: https://aosabook.org/en/v2/sqlalchemy.html
136+
.. _wide-narrow-general: https://en.wikipedia.org/wiki/Wide_and_narrow_data
137+
.. _wide-narrow-data-computing: https://dtkaplan.github.io/DataComputingEbook/chap-wide-vs-narrow.html#chap:wide-vs-narrow
138+
.. _wide-narrow-pandas-tutorial: https://anvil.works/blog/tidy-data

docs/conf.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@
1212
intersphinx_mapping.update({
1313
'py': ('https://docs.python.org/3/', None),
1414
'sa': ('https://docs.sqlalchemy.org/en/14/', None),
15-
'urllib3': ('https://urllib3.readthedocs.io/en/1.26.13/', None)
15+
'urllib3': ('https://urllib3.readthedocs.io/en/1.26.13/', None),
16+
'pandas': ('https://pandas.pydata.org/docs/', None),
1617
})
1718

1819

setup.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ def read(path):
7070
'createcoverage>=1,<2',
7171
'stopit>=1.1.2,<2',
7272
'flake8>=4,<7',
73+
'pandas>=2,<3',
7374
'pytz',
7475
# `test_http.py` needs `setuptools.ssl_support`
7576
'setuptools<57',
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
# -*- coding: utf-8; -*-
2+
#
3+
# Licensed to CRATE Technology GmbH ("Crate") under one or more contributor
4+
# license agreements. See the NOTICE file distributed with this work for
5+
# additional information regarding copyright ownership. Crate licenses
6+
# this file to you under the Apache License, Version 2.0 (the "License");
7+
# you may not use this file except in compliance with the License. You may
8+
# obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14+
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15+
# License for the specific language governing permissions and limitations
16+
# under the License.
17+
#
18+
# However, if you have executed another commercial license agreement
19+
# with Crate these terms will supersede the license and you may use the
20+
# software solely pursuant to the terms of the relevant commercial agreement.
21+
import logging
22+
23+
24+
logger = logging.getLogger(__name__)
25+
26+
27+
def insert_bulk(pd_table, conn, keys, data_iter):
28+
"""
29+
Use CrateDB's "bulk operations" endpoint as a fast path for pandas' and Dask's `to_sql()` [1] method.
30+
31+
The idea is to break out of SQLAlchemy, compile the insert statement, and use the raw
32+
DBAPI connection client, in order to invoke a request using `bulk_parameters` [2]::
33+
34+
cursor.execute(sql=sql, bulk_parameters=data)
35+
36+
The vanilla implementation, used by SQLAlchemy, is::
37+
38+
data = [dict(zip(keys, row)) for row in data_iter]
39+
conn.execute(pd_table.table.insert(), data)
40+
41+
Batch chunking will happen outside of this function, for example [3] demonstrates
42+
the relevant code in `pandas.io.sql`.
43+
44+
[1] https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_sql.html
45+
[2] https://crate.io/docs/crate/reference/en/latest/interfaces/http.html#bulk-operations
46+
[3] https://github.com/pandas-dev/pandas/blob/v2.0.1/pandas/io/sql.py#L1011-L1027
47+
"""
48+
49+
# Compile SQL statement and materialize batch.
50+
sql = str(pd_table.table.insert().compile(bind=conn))
51+
data = list(data_iter)
52+
53+
# For debugging and tracing the batches running through this method.
54+
# Because it's a performance-optimized code path, the log statements are not active by default.
55+
# logger.info(f"Bulk SQL: {sql}")
56+
# logger.info(f"Bulk records: {len(data)}")
57+
# logger.info(f"Bulk data: {data}")
58+
59+
# Invoke bulk insert operation.
60+
cursor = conn._dbapi_connection.cursor()
61+
cursor.execute(sql=sql, bulk_parameters=data)
62+
cursor.close()

src/crate/client/sqlalchemy/tests/bulk_test.py

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
# However, if you have executed another commercial license agreement
1919
# with Crate these terms will supersede the license and you may use the
2020
# software solely pursuant to the terms of the relevant commercial agreement.
21-
21+
import math
2222
from unittest import TestCase, skipIf
2323
from unittest.mock import patch, MagicMock
2424

@@ -36,8 +36,7 @@
3636

3737

3838
fake_cursor = MagicMock(name='fake_cursor')
39-
FakeCursor = MagicMock(name='FakeCursor', spec=Cursor)
40-
FakeCursor.return_value = fake_cursor
39+
FakeCursor = MagicMock(name='FakeCursor', spec=Cursor, return_value=fake_cursor)
4140

4241

4342
class SqlAlchemyBulkTest(TestCase):
@@ -168,3 +167,41 @@ def test_bulk_save_modern(self):
168167
'Callisto', 37,
169168
)
170169
self.assertSequenceEqual(expected_bulk_args, bulk_args)
170+
171+
@patch('crate.client.connection.Cursor', mock_cursor=FakeCursor)
172+
def test_bulk_save_pandas(self, mock_cursor):
173+
"""
174+
Verify bulk INSERT with pandas.
175+
"""
176+
import sqlalchemy as sa
177+
from pandas._testing import makeTimeDataFrame
178+
from crate.client.sqlalchemy.support import insert_bulk
179+
180+
# 42 records / 8 chunksize = 5.25, which means 6 batches will be emitted.
181+
INSERT_RECORDS = 42
182+
CHUNK_SIZE = 8
183+
OPCOUNT = math.ceil(INSERT_RECORDS / CHUNK_SIZE)
184+
185+
# Create a DataFrame to feed into the database.
186+
df = makeTimeDataFrame(nper=INSERT_RECORDS, freq="S")
187+
188+
dburi = "crate://localhost:4200"
189+
engine = sa.create_engine(dburi, echo=True)
190+
retval = df.to_sql(
191+
name="test-testdrive",
192+
con=engine,
193+
if_exists="replace",
194+
index=False,
195+
chunksize=CHUNK_SIZE,
196+
method=insert_bulk,
197+
)
198+
self.assertIsNone(retval)
199+
200+
# Initializing the query has an overhead of two calls to the cursor object, probably one
201+
# initial connection from the DB-API driver, to inquire the database version, and another
202+
# one, for SQLAlchemy. SQLAlchemy will use it to inquire the table schema using `information_schema`,
203+
# and to eventually issue the `CREATE TABLE ...` statement.
204+
effective_op_count = mock_cursor.call_count - 2
205+
206+
# Verify number of batches.
207+
self.assertEqual(effective_op_count, OPCOUNT)

src/crate/client/tests.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -385,6 +385,7 @@ def test_suite():
385385
'docs/by-example/sqlalchemy/working-with-types.rst',
386386
'docs/by-example/sqlalchemy/advanced-querying.rst',
387387
'docs/by-example/sqlalchemy/inspection-reflection.rst',
388+
'docs/by-example/sqlalchemy/dataframe.rst',
388389
module_relative=False,
389390
setUp=setUpCrateLayerSqlAlchemy,
390391
tearDown=tearDownDropEntitiesSqlAlchemy,

0 commit comments

Comments
 (0)