Skip to content

Commit 1bcc603

Browse files
authored
Fix out of order row groups in filter query (#1974)
- Add test for ordered filter query - Configure `datafusion` to prefer existing sort This ensures queries with filter clauses return data in order. apache/datafusion#10572 (comment)
1 parent cdedc6c commit 1bcc603

File tree

4 files changed

+22
-4
lines changed

4 files changed

+22
-4
lines changed

nautilus_core/persistence/src/backend/session.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,9 @@ impl DataBackendSession {
7171
.enable_all()
7272
.build()
7373
.unwrap();
74-
let session_cfg =
75-
SessionConfig::new().set_str("datafusion.optimizer.repartition_file_scans", "false");
74+
let session_cfg = SessionConfig::new()
75+
.set_str("datafusion.optimizer.repartition_file_scans", "false")
76+
.set_str("datafusion.optimizer.prefer_existing_sort", "true");
7677
let session_ctx = SessionContext::new_with_config(session_cfg);
7778
Self {
7879
session_ctx,
@@ -119,7 +120,7 @@ impl DataBackendSession {
119120
file_sort_order: vec![vec![Expr::Sort(Sort {
120121
expr: Box::new(col("ts_init")),
121122
asc: true,
122-
nulls_first: true,
123+
nulls_first: false,
123124
})]],
124125
..Default::default()
125126
};
@@ -129,7 +130,7 @@ impl DataBackendSession {
129130
parquet_options,
130131
))?;
131132

132-
let default_query = format!("SELECT * FROM {}", &table_name);
133+
let default_query = format!("SELECT * FROM {} ORDER BY ts_init", &table_name);
133134
let sql_query = sql_query.unwrap_or(&default_query);
134135
let query = self.runtime.block_on(self.session_ctx.sql(sql_query))?;
135136

nautilus_core/persistence/tests/test_catalog.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,22 @@ fn test_quote_tick_query() {
237237
assert!(is_monotonically_increasing_by_init(&ticks));
238238
}
239239

240+
#[rstest]
241+
fn test_quote_tick_query_with_filter() {
242+
let file_path = "../../tests/test_data/nautilus/quotes-3-groups-filter-query.parquet";
243+
let mut catalog = DataBackendSession::new(10);
244+
catalog
245+
.add_file::<QuoteTick>(
246+
"quote_005",
247+
file_path,
248+
Some("SELECT * FROM quote_005 WHERE ts_init >= 1701388832486000000 ORDER BY ts_init"),
249+
)
250+
.unwrap();
251+
let query_result: QueryResult = catalog.get_query_result();
252+
let ticks: Vec<Data> = query_result.collect();
253+
assert!(is_monotonically_increasing_by_init(&ticks));
254+
}
255+
240256
#[rstest]
241257
fn test_quote_tick_multiple_query() {
242258
let expected_length = 9_600;

nautilus_trader/persistence/catalog/parquet.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -622,6 +622,7 @@ def _build_query(
622622
if conditions:
623623
query += f" WHERE {' AND '.join(conditions)}"
624624

625+
query += " ORDER BY ts_init"
625626
return query
626627

627628
@staticmethod
Binary file not shown.

0 commit comments

Comments
 (0)