Skip to content

Commit 0780263

Browse files
committed
Merge remote-tracking branch 'upstream/main' into varchar_default_ut8view
2 parents 0bb1d96 + cb45f1f commit 0780263

File tree

6 files changed

+133
-3
lines changed

6 files changed

+133
-3
lines changed

datafusion-cli/src/main.rs

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,16 @@
1717

1818
use std::collections::HashMap;
1919
use std::env;
20+
use std::num::NonZeroUsize;
2021
use std::path::Path;
2122
use std::process::ExitCode;
2223
use std::sync::{Arc, LazyLock};
2324

2425
use datafusion::error::{DataFusionError, Result};
2526
use datafusion::execution::context::SessionConfig;
26-
use datafusion::execution::memory_pool::{FairSpillPool, GreedyMemoryPool, MemoryPool};
27+
use datafusion::execution::memory_pool::{
28+
FairSpillPool, GreedyMemoryPool, MemoryPool, TrackConsumersPool,
29+
};
2730
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
2831
use datafusion::execution::DiskManager;
2932
use datafusion::prelude::SessionContext;
@@ -118,6 +121,13 @@ struct Args {
118121
)]
119122
mem_pool_type: PoolType,
120123

124+
#[clap(
125+
long,
126+
help = "The number of top memory consumers to display when query fails due to memory exhaustion. To disable memory consumer tracking, set this value to 0",
127+
default_value = "3"
128+
)]
129+
top_memory_consumers: usize,
130+
121131
#[clap(
122132
long,
123133
help = "The max number of rows to display for 'Table' format\n[possible values: numbers(0/10/...), inf(no limit)]",
@@ -169,9 +179,22 @@ async fn main_inner() -> Result<()> {
169179
if let Some(memory_limit) = args.memory_limit {
170180
// set memory pool type
171181
let pool: Arc<dyn MemoryPool> = match args.mem_pool_type {
172-
PoolType::Fair => Arc::new(FairSpillPool::new(memory_limit)),
173-
PoolType::Greedy => Arc::new(GreedyMemoryPool::new(memory_limit)),
182+
PoolType::Fair if args.top_memory_consumers == 0 => {
183+
Arc::new(FairSpillPool::new(memory_limit))
184+
}
185+
PoolType::Fair => Arc::new(TrackConsumersPool::new(
186+
FairSpillPool::new(memory_limit),
187+
NonZeroUsize::new(args.top_memory_consumers).unwrap(),
188+
)),
189+
PoolType::Greedy if args.top_memory_consumers == 0 => {
190+
Arc::new(GreedyMemoryPool::new(memory_limit))
191+
}
192+
PoolType::Greedy => Arc::new(TrackConsumersPool::new(
193+
GreedyMemoryPool::new(memory_limit),
194+
NonZeroUsize::new(args.top_memory_consumers).unwrap(),
195+
)),
174196
};
197+
175198
rt_builder = rt_builder.with_memory_pool(pool)
176199
}
177200

datafusion-cli/tests/cli_integration.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,42 @@ fn test_cli_format<'a>(#[case] format: &'a str) {
122122
assert_cmd_snapshot!(cmd);
123123
}
124124

125+
#[rstest]
126+
#[case("no_track", ["--top-memory-consumers", "0"])]
127+
#[case("top2", ["--top-memory-consumers", "2"])]
128+
#[case("top3_default", [])]
129+
#[test]
130+
fn test_cli_top_memory_consumers<'a>(
131+
#[case] snapshot_name: &str,
132+
#[case] top_memory_consumers: impl IntoIterator<Item = &'a str>,
133+
) {
134+
let mut settings = make_settings();
135+
136+
settings.set_snapshot_suffix(snapshot_name);
137+
138+
settings.add_filter(
139+
r"[^\s]+\#\d+\(can spill: (true|false)\) consumed .*?B",
140+
"Consumer(can spill: bool) consumed XB",
141+
);
142+
settings.add_filter(
143+
r"Error: Failed to allocate additional .*? for .*? with .*? already allocated for this reservation - .*? remain available for the total pool",
144+
"Error: Failed to allocate ",
145+
);
146+
settings.add_filter(
147+
r"Resources exhausted: Failed to allocate additional .*? for .*? with .*? already allocated for this reservation - .*? remain available for the total pool",
148+
"Resources exhausted: Failed to allocate",
149+
);
150+
151+
let _bound = settings.bind_to_scope();
152+
153+
let mut cmd = cli();
154+
let sql = "select * from generate_series(1,500000) as t1(v1) order by v1;";
155+
cmd.args(["--memory-limit", "10M", "--command", sql]);
156+
cmd.args(top_memory_consumers);
157+
158+
assert_cmd_snapshot!(cmd);
159+
}
160+
125161
#[tokio::test]
126162
async fn test_cli() {
127163
if env::var("TEST_STORAGE_INTEGRATION").is_err() {
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
---
2+
source: datafusion-cli/tests/cli_integration.rs
3+
info:
4+
program: datafusion-cli
5+
args:
6+
- "--memory-limit"
7+
- 10M
8+
- "--command"
9+
- "select * from generate_series(1,500000) as t1(v1) order by v1;"
10+
- "--top-memory-consumers"
11+
- "0"
12+
---
13+
success: false
14+
exit_code: 1
15+
----- stdout -----
16+
[CLI_VERSION]
17+
Error: Not enough memory to continue external sort. Consider increasing the memory limit, or decreasing sort_spill_reservation_bytes
18+
caused by
19+
Resources exhausted: Failed to allocate
20+
21+
----- stderr -----
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
---
2+
source: datafusion-cli/tests/cli_integration.rs
3+
info:
4+
program: datafusion-cli
5+
args:
6+
- "--memory-limit"
7+
- 10M
8+
- "--command"
9+
- "select * from generate_series(1,500000) as t1(v1) order by v1;"
10+
- "--top-memory-consumers"
11+
- "2"
12+
---
13+
success: false
14+
exit_code: 1
15+
----- stdout -----
16+
[CLI_VERSION]
17+
Error: Not enough memory to continue external sort. Consider increasing the memory limit, or decreasing sort_spill_reservation_bytes
18+
caused by
19+
Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
20+
Consumer(can spill: bool) consumed XB,
21+
Consumer(can spill: bool) consumed XB.
22+
Error: Failed to allocate
23+
24+
----- stderr -----
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
---
2+
source: datafusion-cli/tests/cli_integration.rs
3+
info:
4+
program: datafusion-cli
5+
args:
6+
- "--memory-limit"
7+
- 10M
8+
- "--command"
9+
- "select * from generate_series(1,500000) as t1(v1) order by v1;"
10+
---
11+
success: false
12+
exit_code: 1
13+
----- stdout -----
14+
[CLI_VERSION]
15+
Error: Not enough memory to continue external sort. Consider increasing the memory limit, or decreasing sort_spill_reservation_bytes
16+
caused by
17+
Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
18+
Consumer(can spill: bool) consumed XB,
19+
Consumer(can spill: bool) consumed XB,
20+
Consumer(can spill: bool) consumed XB.
21+
Error: Failed to allocate
22+
23+
----- stderr -----

docs/source/user-guide/cli/usage.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,9 @@ OPTIONS:
5757
--mem-pool-type <MEM_POOL_TYPE>
5858
Specify the memory pool type 'greedy' or 'fair', default to 'greedy'
5959

60+
--top-memory-consumers <TOP_MEMORY_CONSUMERS>
61+
The number of top memory consumers to display when query fails due to memory exhaustion. To disable memory consumer tracking, set this value to 0 [default: 3]
62+
6063
-d, --disk-limit <DISK_LIMIT>
6164
Available disk space for spilling queries (e.g. '10g'), default to None (uses DataFusion's default value of '100g')
6265

0 commit comments

Comments
 (0)