Skip to content

Commit acd5618

Browse files
committed
1 parent 9032a3f commit acd5618

File tree

2 files changed

+321
-0
lines changed

2 files changed

+321
-0
lines changed

examples/insert_efficient.py

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
"""
2+
About
3+
=====
4+
5+
Example program to demonstrate multi-row inserts and batched inserts
6+
using SQLAlchemy's `insertmanyvalues_page_size` option.
7+
8+
- https://docs.sqlalchemy.org/core/connections.html#controlling-the-batch-size
9+
- https://github.com/crate/crate-python/pull/539
10+
11+
12+
13+
Setup
14+
=====
15+
::
16+
17+
pip install --upgrade 'crate[sqlalchemy]'
18+
19+
20+
Synopsis
21+
========
22+
::
23+
24+
# Run CrateDB
25+
docker run --rm -it --publish=4200:4200 crate
26+
27+
# Run PostgreSQL
28+
docker run --rm -it --publish=5432:5432 --env "POSTGRES_HOST_AUTH_METHOD=trust" postgres:15 postgres -c log_statement=all
29+
30+
# Use SQLite
31+
time python insert_efficient.py sqlite multirow
32+
time python insert_efficient.py sqlite batched
33+
34+
# Use PostgreSQL
35+
time python insert_efficient.py postgresql multirow
36+
time python insert_efficient.py postgresql batched
37+
38+
# Use CrateDB
39+
time python insert_efficient.py cratedb multirow
40+
time python insert_efficient.py cratedb batched
41+
42+
43+
Bugs
44+
====
45+
- With `insert_batched`, the CrateDB dialect currently does not invoke SQLAlchemy's
46+
`Connection._exec_insertmany_context`, but the PostgreSQL dialect does.
47+
The CrateDB dialect currently only implements the legacy `bulk_save_objects` method.
48+
49+
[1] https://docs.sqlalchemy.org/orm/session_api.html#sqlalchemy.orm.Session.bulk_save_objects
50+
51+
"""
52+
import sys
53+
54+
import sqlalchemy as sa
55+
56+
# INSERT_RECORDS = 1275
57+
# INSERT_RECORDS = 50_000
58+
INSERT_RECORDS = 2_750_000
59+
60+
BATCHED_PAGE_SIZE = 20_000
61+
62+
63+
def insert_multirow(engine, table, records):
64+
"""
65+
Demonstrate in-place multirow inserts.
66+
67+
- Needs `dialect.supports_multivalues_insert = True`.
68+
- Will issue a single SQL statement.
69+
- SA can not control the batch-/chunksize, as there are no batches.
70+
- Will OOM CrateDB with large numbers of records, unless externally chunked, like pandas does.
71+
"""
72+
insertable = table.insert().values(records)
73+
with engine.begin() as conn:
74+
conn.execute(insertable)
75+
76+
77+
def insert_batched(engine, table, records):
78+
"""
79+
Demonstrate batched inserts, with page-size.
80+
81+
- Will issue multiple SQL statements.
82+
- SA controls batch-size per `insertmanyvalues_page_size` option.
83+
"""
84+
insertable = table.insert()
85+
with engine.begin() as conn:
86+
# Optional: Adjust page size on a per-connection level.
87+
# conn.execution_options(insertmanyvalues_page_size=5)
88+
conn.execute(insertable, parameters=records)
89+
90+
91+
def run_example(dburi: str, variant: str):
92+
metadata = sa.MetaData()
93+
table = sa.Table(
94+
"testdrive",
95+
metadata,
96+
sa.Column("id", sa.Integer),
97+
sa.Column("name", sa.String),
98+
)
99+
100+
# Create 275 test records.
101+
records = [{"id": i, "name": f"foo_{i}"} for i in range(1, INSERT_RECORDS + 1)]
102+
103+
# Run multi-row insert, with a specified batch-/page-size.
104+
engine = sa.create_engine(dburi, insertmanyvalues_page_size=BATCHED_PAGE_SIZE, echo=True)
105+
table.drop(bind=engine, checkfirst=True)
106+
table.create(bind=engine)
107+
108+
if variant == "multirow":
109+
insert_multirow(engine, table, records)
110+
elif variant == "batched":
111+
insert_batched(engine, table, records)
112+
else:
113+
raise ValueError(f"Unknown variant: {variant}")
114+
115+
with engine.begin() as conn:
116+
if dburi.startswith("crate://"):
117+
conn.execute(sa.text("REFRESH TABLE testdrive"))
118+
result = conn.execute(sa.text("SELECT COUNT(*) FROM testdrive"))
119+
print("Number of records:", result.scalar_one())
120+
121+
122+
def run_database(database: str, variant: str):
123+
if database == "sqlite":
124+
dburi = "sqlite:///:memory:"
125+
elif database == "postgresql":
126+
dburi = "postgresql://postgres@localhost:5432"
127+
elif database == "cratedb":
128+
dburi = "crate://localhost:4200"
129+
else:
130+
raise ValueError(f"Unknown database: {database}")
131+
132+
run_example(dburi, variant)
133+
134+
135+
if __name__ == "__main__":
136+
database = sys.argv[1]
137+
variant = sys.argv[2]
138+
run_database(database, variant)

examples/insert_pandas.py

Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
"""
2+
About
3+
=====
4+
5+
Example program to demonstrate efficient batched inserts using CrateDB and
6+
pandas, based on SQLAlchemy's `insertmanyvalues` and CrateDB's bulk import
7+
HTTP endpoint.
8+
9+
- https://docs.sqlalchemy.org/core/connections.html#controlling-the-batch-size
10+
- https://crate.io/docs/crate/reference/en/latest/interfaces/http.html#bulk-operations
11+
12+
13+
Setup
14+
=====
15+
::
16+
17+
pip install --upgrade click colorlog crate pandas sqlalchemy
18+
19+
20+
Synopsis
21+
========
22+
::
23+
24+
# Run CrateDB.
25+
docker run --rm -it --publish=4200:4200 crate
26+
27+
# Use local CrateDB.
28+
time python insert_pandas.py
29+
30+
# Use local CrateDB with "basic" mode.
31+
time python insert_pandas.py --mode=basic --insertmanyvalues-page-size=5000
32+
33+
# Use local CrateDB with "bulk" mode, and a few more records.
34+
time python insert_pandas.py --mode=bulk --bulk-size=20000 --num-records=75000
35+
36+
# Use CrateDB Cloud.
37+
time python insert_pandas.py --dburi='crate://admin:<PASSWORD>@example.aks1.westeurope.azure.cratedb.net:4200?ssl=true'
38+
39+
40+
Details
41+
=======
42+
To watch the HTTP traffic to your local CrateDB instance, invoke::
43+
44+
sudo ngrep -d lo0 -Wbyline port 4200
45+
46+
"""
47+
import logging
48+
49+
import click
50+
import colorlog
51+
import pkg_resources
52+
import sqlalchemy as sa
53+
from colorlog.escape_codes import escape_codes
54+
from pandas._testing import makeTimeDataFrame
55+
56+
logger = logging.getLogger(__name__)
57+
58+
pkg_resources.require("sqlalchemy>=2.0")
59+
60+
SQLALCHEMY_LOGGING = True
61+
62+
63+
class DatabaseWorkload:
64+
65+
table_name = "foo"
66+
67+
def __init__(self, dburi: str):
68+
self.dburi = dburi
69+
70+
def get_engine(self, **kwargs):
71+
return sa.create_engine(self.dburi, **kwargs)
72+
73+
def process(self, mode: str, num_records: int, bulk_size: int, insertmanyvalues_page_size: int):
74+
"""
75+
Exercise different insert methods of pandas, SQLAlchemy, and CrateDB.
76+
"""
77+
78+
logger.info(f"Creating DataFrame with {num_records} records")
79+
80+
# Create a DataFrame to feed into the database.
81+
df = makeTimeDataFrame(nper=num_records, freq="S")
82+
print(df)
83+
84+
logger.info(f"Connecting to {self.dburi}")
85+
logger.info(f"Importing data with mode={mode}, bulk_size={bulk_size}, insertmanyvalues_page_size={insertmanyvalues_page_size}")
86+
87+
engine = self.get_engine(insertmanyvalues_page_size=insertmanyvalues_page_size)
88+
89+
# SQLAlchemy "Insert Many Values" mode. 40K records/s
90+
# https://docs.sqlalchemy.org/en/20/core/connections.html#engine-insertmanyvalues
91+
# https://docs.sqlalchemy.org/en/20/core/connections.html#engine-insertmanyvalues-page-size
92+
if mode == "basic":
93+
# Using `chunksize` does not make much of a difference here,
94+
# because chunking will be done by SQLAlchemy already.
95+
df.to_sql(name=self.table_name, con=engine, if_exists="replace", index=False)
96+
# df.to_sql(name=self.table_name, con=engine, if_exists="replace", index=False, chunksize=bulk_size)
97+
98+
# Multi-row mode. It is slower.
99+
# https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_sql.html
100+
elif mode == "multi":
101+
df.to_sql(name=self.table_name, con=engine, if_exists="replace", index=False, chunksize=bulk_size, method="multi")
102+
103+
# CrateDB bulk transfer mode. 65K records/s
104+
# https://crate.io/docs/crate/reference/en/latest/interfaces/http.html#bulk-operations
105+
elif mode == "bulk":
106+
df.to_sql(name=self.table_name, con=engine, if_exists="append", index=False, chunksize=bulk_size, method=self.insert_bulk)
107+
108+
else:
109+
raise ValueError(f"Unknown mode: {mode}")
110+
111+
@staticmethod
112+
def insert_bulk(pd_table, conn, keys, data_iter):
113+
"""
114+
A fast insert method for pandas and Dask, using CrateDB's "bulk operations" endpoint.
115+
116+
The idea is to break out of SQLAlchemy, compile the insert statement, and use the raw
117+
DBAPI connection client, in order to invoke a request using `bulk_parameters`::
118+
119+
cursor.execute(sql=sql, bulk_parameters=data)
120+
121+
- https://crate.io/docs/crate/reference/en/latest/interfaces/http.html#bulk-operations
122+
123+
The vanilla implementation, used by SQLAlchemy, is::
124+
125+
data = [dict(zip(keys, row)) for row in data_iter]
126+
conn.execute(pd_table.table.insert(), data)
127+
128+
"""
129+
130+
# Bulk
131+
sql = str(pd_table.table.insert().compile(bind=conn))
132+
data = list(data_iter)
133+
134+
logger.info(f"Bulk SQL: {sql}")
135+
logger.info(f"Bulk records: {len(data)}")
136+
137+
cursor = conn._dbapi_connection.cursor()
138+
cursor.execute(sql=sql, bulk_parameters=data)
139+
cursor.close()
140+
141+
def show_table_stats(self):
142+
"""
143+
Display number of records in table.
144+
"""
145+
engine = self.get_engine()
146+
with engine.connect() as conn:
147+
conn.exec_driver_sql(f"REFRESH TABLE {self.table_name};")
148+
result = conn.exec_driver_sql(f"SELECT COUNT(*) FROM {self.table_name};")
149+
table_size = result.scalar_one()
150+
logger.info(f"Table size: {table_size}")
151+
#engine.dispose()
152+
153+
154+
def setup_logging(level=logging.INFO):
155+
reset = escape_codes["reset"]
156+
log_format = f"%(asctime)-15s [%(name)-26s] %(log_color)s%(levelname)-8s:{reset} %(message)s"
157+
158+
handler = colorlog.StreamHandler()
159+
handler.setFormatter(colorlog.ColoredFormatter(log_format))
160+
161+
logging.basicConfig(format=log_format, level=level, handlers=[handler])
162+
163+
# Enable SQLAlchemy logging.
164+
if SQLALCHEMY_LOGGING:
165+
logging.getLogger("sqlalchemy").setLevel(level)
166+
167+
168+
@click.command()
169+
@click.option("--dburi", type=str, default="crate://localhost:4200", required=False, help="SQLAlchemy database connection URI.")
170+
@click.option("--mode", type=str, default="bulk", required=False, help="Insert mode. Choose one of basic, multi, bulk.")
171+
@click.option("--num-records", type=int, default=23_000, required=False, help="Number of records to insert.")
172+
@click.option("--bulk-size", type=int, default=5_000, required=False, help="Bulk size / chunk size.")
173+
@click.option("--insertmanyvalues-page-size", type=int, default=1_000, required=False, help="Page size for SA's insertmanyvalues.")
174+
@click.help_option()
175+
def main(dburi: str, mode: str, num_records: int, bulk_size: int, insertmanyvalues_page_size: int):
176+
setup_logging()
177+
dbw = DatabaseWorkload(dburi=dburi)
178+
dbw.process(mode, num_records, bulk_size, insertmanyvalues_page_size)
179+
dbw.show_table_stats()
180+
181+
182+
if __name__ == "__main__":
183+
main()

0 commit comments

Comments
 (0)