@@ -207,3 +207,46 @@ def test_bulk_save_pandas(self, mock_cursor):
207
207
208
208
# Verify number of batches.
209
209
self .assertEqual (effective_op_count , OPCOUNT )
210
+
211
+ @skipIf (sys .version_info < (3 , 8 ), "SQLAlchemy/Dask is not supported on Python <3.8" )
212
+ @skipIf (SA_VERSION < SA_1_4 , "SQLAlchemy 1.3 is not supported by pandas" )
213
+ @patch ('crate.client.connection.Cursor' , mock_cursor = FakeCursor )
214
+ def test_bulk_save_dask (self , mock_cursor ):
215
+ """
216
+ Verify bulk INSERT with Dask.
217
+ """
218
+ import dask .dataframe as dd
219
+ from pandas ._testing import makeTimeDataFrame
220
+ from crate .client .sqlalchemy .support import insert_bulk
221
+
222
+ # 42 records / 4 partitions means each partition has a size of 10.5 elements.
223
+ # Because the chunk size 8 is slightly smaller than 10, the partition will not
224
+ # fit into it, so two batches will be emitted to the database for each data
225
+ # partition. 4 partitions * 2 batches = 8 insert operations will be emitted.
226
+ INSERT_RECORDS = 42
227
+ NPARTITIONS = 4
228
+ CHUNK_SIZE = 8
229
+ OPCOUNT = math .ceil (INSERT_RECORDS / NPARTITIONS / CHUNK_SIZE ) * NPARTITIONS
230
+
231
+ # Create a DataFrame to feed into the database.
232
+ df = makeTimeDataFrame (nper = INSERT_RECORDS , freq = "S" )
233
+ ddf = dd .from_pandas (df , npartitions = NPARTITIONS )
234
+
235
+ dburi = "crate://localhost:4200"
236
+ retval = ddf .to_sql (
237
+ name = "test-testdrive" ,
238
+ uri = dburi ,
239
+ if_exists = "replace" ,
240
+ index = False ,
241
+ chunksize = CHUNK_SIZE ,
242
+ method = insert_bulk ,
243
+ )
244
+ self .assertIsNone (retval )
245
+
246
+ # Each of the insert operation incurs another call to the cursor object. This is probably
247
+ # the initial connection from the DB-API driver, to inquire the database version.
248
+ # This compensation formula has been determined empirically / by educated guessing.
249
+ effective_op_count = (mock_cursor .call_count - 2 * NPARTITIONS ) - 2
250
+
251
+ # Verify number of batches.
252
+ self .assertEqual (effective_op_count , OPCOUNT )
0 commit comments