55SQLAlchemy: DataFrame operations
66================================
77
8+ .. rubric :: Table of Contents
9+
10+ .. contents ::
11+ :local:
12+
13+
814About
915=====
1016
@@ -21,6 +27,8 @@ improving performance are also referred to as `chunking`_.
2127Introduction
2228============
2329
30+ pandas
31+ ------
2432The :ref: `pandas DataFrame <pandas:api.dataframe >` is a structure that contains
2533two-dimensional data and its corresponding labels. DataFrames are widely used
2634in data science, machine learning, scientific computing, and many other
@@ -34,11 +42,29 @@ powerful than tables or spreadsheets because they are an integral part of the
3442The :ref: `pandas I/O subsystem <pandas:api.io >` for `relational databases `_
3543using `SQL `_ is based on `SQLAlchemy `_.
3644
45+ Dask
46+ ----
47+ `Dask `_ is a flexible library for parallel computing in Python, which scales
48+ Python code from multi-core local machines to large distributed clusters in
49+ the cloud. Dask provides a familiar user interface by mirroring the APIs of
50+ other libraries in the PyData ecosystem, including `pandas `_, `scikit-learn `_,
51+ and `NumPy `_.
3752
38- .. rubric :: Table of Contents
53+ A :doc: `dask:dataframe ` is a large parallel DataFrame composed of many smaller
54+ pandas DataFrames, split along the index. These pandas DataFrames may live on
55+ disk for larger-than-memory computing on a single machine, or on many different
56+ machines in a cluster. One Dask DataFrame operation triggers many operations on
57+ the constituent pandas DataFrames.
3958
40- .. contents ::
41- :local:
59+
60+ Compatibility notes
61+ ===================
62+
63+ .. NOTE ::
64+
65+ Please note that DataFrame support for pandas and Dask is only validated
66+ with Python 3.8 and higher, and SQLAlchemy 1.4 and higher. We recommend
67+ to use the most recent versions of those libraries.
4268
4369
4470Efficient ``INSERT `` operations with pandas
@@ -118,6 +144,90 @@ workload across multiple batches, using a defined chunk size.
118144 tutorial <wide-narrow-pandas-tutorial_> `_ about the same topic.
119145
120146
147+ Efficient ``INSERT `` operations with Dask
148+ =========================================
149+
150+ The same ``bulk_insert `` function presented in the previous section will also
151+ be used in the context of `Dask `_, in order to make the
152+ :func: `dask:dask.dataframe.to_sql ` method more efficiently, based on the
153+ `CrateDB bulk operations `_ endpoint.
154+
155+ The example below will partition your insert workload into equal-sized parts, and
156+ schedule it to be executed on Dask cluster resources, using a defined number of
157+ compute partitions. Each worker instance will then insert its partition's records
158+ in a batched/chunked manner, using a defined chunk size, effectively using the
159+ pandas implementation introduced in the previous section.
160+
161+ >>> import dask.dataframe as dd
162+ >>> from pandas._testing import makeTimeDataFrame
163+ >>> from crate.client.sqlalchemy.support import insert_bulk
164+ ...
165+ >>> # Define the number of records, the number of computing partitions,
166+ >>> # and the chunk size of each database insert operation.
167+ >>> INSERT_RECORDS = 100
168+ >>> NPARTITIONS = 4
169+ >>> CHUNK_SIZE = 25
170+ ...
171+ >>> # Create a Dask DataFrame.
172+ >>> df = makeTimeDataFrame(nper = INSERT_RECORDS , freq = " S" )
173+ >>> ddf = dd.from_pandas(df, npartitions = NPARTITIONS )
174+ ...
175+ >>> # Insert content of DataFrame using multiple workers on a
176+ >>> # compute cluster, transferred using batches of records.
177+ >>> ddf.to_sql(
178+ ... name= " test-testdrive" ,
179+ ... uri= f " crate:// { crate_host} " ,
180+ ... if_exists= " replace" ,
181+ ... index= False ,
182+ ... chunksize= CHUNK_SIZE ,
183+ ... method= insert_bulk,
184+ ... parallel= True ,
185+ ... )
186+
187+
188+ .. TIP ::
189+
190+ You will observe that optimizing your workload will now also involve determining a
191+ good value for the ``NPARTITIONS `` argument, based on the capacity and topology of
192+ the available compute resources , and based on workload characteristics or policies
193+ like peak- vs. balanced- vs. shared-usage. For example, on a machine or cluster fully
194+ dedicated to the problem at hand, you may want to use all available processor cores,
195+ while on a shared system, this strategy may not be appropriate.
196+
197+ If you want to dedicate all available compute resources on your machine, you may want
198+ to use the number of CPU cores as a value to the ``NPARTITIONS `` argument. You can find
199+ out about the available CPU core on your machine, for example by running the ``nproc ``
200+ command in your terminal.
201+
202+ Depending on the implementation and runtime behavior of the compute task, the optimal
203+ number of worker processes, determined by the ``NPARTITIONS `` argument, also needs to be
204+ figured out by running a few test iterations. For that purpose, you can use the
205+ `insert_dask.py `_ program as a blueprint.
206+
207+ Adjusting this value in both directions is perfectly fine: If you observe that you are
208+ overloading the machine, maybe because there are workloads scheduled other than the one
209+ you are running, try to reduce the value. If fragments/steps of your implementation
210+ involve waiting for network or disk I/O, you may want to increase the number of workers
211+ beyond the number of available CPU cores, to increase utilization. On the other hand,
212+ you should be wary about not over-committing resources.
213+
214+ Before getting more serious with Dask, you are welcome to read and watch the excellent
215+ :doc: `dask:best-practices `, in order to learn about things to avoid, and beyond. For
216+ finding out if your compute workload scheduling is healthy, you can, for example, use
217+ Dask's :doc: `dask:dashboard `.
218+
219+ .. WARNING ::
220+
221+ Because the settings assigned in the example above fit together well, the ``to_sql() ``
222+ instruction will effectively run four insert operations, executed in parallel, and
223+ scheduled optimally on the available cluster resources.
224+
225+ However, not using those settings sensibly, you can easily misconfigure the resource
226+ scheduling system, and overload the underlying hardware, virtualized or not. This is
227+ why experimenting with different parameters, and a real dataset, is crucial.
228+
229+
230+
121231.. hidden: Disconnect from database
122232
123233 >>> engine.dispose()
@@ -126,14 +236,19 @@ workload across multiple batches, using a defined chunk size.
126236 .. _batching : https://en.wikipedia.org/wiki/Batch_processing#Common_batch_processing_usage
127237.. _chunking : https://en.wikipedia.org/wiki/Chunking_(computing)
128238.. _CrateDB bulk operations : https://crate.io/docs/crate/reference/en/latest/interfaces/http.html#bulk-operations
239+ .. _Dask : https://en.wikipedia.org/wiki/Dask_(software)
129240.. _DataFrame computing : https://realpython.com/pandas-dataframe/
241+ .. _ETL : https://en.wikipedia.org/wiki/Extract,_transform,_load
242+ .. _insert_dask.py : https://github.com/crate/cratedb-examples/blob/main/by-language/python-sqlalchemy/insert_dask.py
130243.. _insert_pandas.py : https://github.com/crate/cratedb-examples/blob/main/by-language/python-sqlalchemy/insert_pandas.py
131244.. _leveling up to 200_000 : https://acepor.github.io/2017/08/03/using-chunksize/
132245.. _NumPy : https://en.wikipedia.org/wiki/NumPy
133246.. _pandas : https://en.wikipedia.org/wiki/Pandas_(software)
134247.. _pandas DataFrame : https://pandas.pydata.org/pandas-docs/stable/reference/frame.html
135248.. _Python : https://en.wikipedia.org/wiki/Python_(programming_language)
136249.. _relational databases : https://en.wikipedia.org/wiki/Relational_database
250+ .. _scikit-learn : https://en.wikipedia.org/wiki/Scikit-learn
251+ .. _SNAT port exhaustion : https://learn.microsoft.com/en-us/azure/load-balancer/troubleshoot-outbound-connection
137252.. _SQL : https://en.wikipedia.org/wiki/SQL
138253.. _SQLAlchemy : https://aosabook.org/en/v2/sqlalchemy.html
139254.. _the chunksize should not be too small : https://acepor.github.io/2017/08/03/using-chunksize/
0 commit comments