Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
611 changes: 483 additions & 128 deletions src/common/base/src/base/dma.rs

Large diffs are not rendered by default.

8 changes: 1 addition & 7 deletions src/common/base/src/base/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,7 @@ mod uniq_id;
mod watch_notify;

pub use build_info::*;
pub use dma::dma_buffer_to_bytes;
pub use dma::dma_read_file;
pub use dma::dma_read_file_range;
pub use dma::dma_write_file_vectored;
pub use dma::Alignment;
pub use dma::DmaAllocator;
pub use dma::DmaWriteBuf;
pub use dma::*;
pub use drop_callback::DropCallback;
pub use net::get_free_tcp_port;
pub use net::get_free_udp_port;
Expand Down
2 changes: 1 addition & 1 deletion src/query/expression/src/types/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ impl<T: AccessType> ArrayColumn<T> {

impl<T: ValueType> ArrayColumn<T> {
pub fn upcast(self, data_type: &DataType) -> ArrayColumn<AnyType> {
let values_type = data_type.as_array().unwrap();
let values_type = data_type.as_array().expect("must array type");
ArrayColumn {
values: T::upcast_column_with_type(self.values, values_type),
offsets: self.offsets,
Expand Down
10 changes: 10 additions & 0 deletions src/query/pipeline/transforms/src/processors/traits/spill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,16 @@ pub enum Location {
Local(TempPath),
}

impl Location {
pub fn is_local(&self) -> bool {
matches!(self, Location::Local(_))
}

pub fn is_remote(&self) -> bool {
matches!(self, Location::Remote(_))
}
}

#[async_trait::async_trait]
pub trait DataBlockSpill: Clone + Send + Sync + 'static {
async fn spill(&self, data_block: DataBlock) -> Result<Location> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ impl HashJoinSpiller {
.add_data_block(partition_id, data_block);
if let Some(data_blocks) = self
.partition_buffer
.fetch_data_blocks(partition_id, &fetch_option)?
.fetch_data_blocks(partition_id, &fetch_option)
{
self.spiller
.spill_with_partition(partition_id, data_blocks)
Expand Down Expand Up @@ -341,8 +341,9 @@ impl HashJoinSpiller {
PartitionBufferFetchOption::ReadPartition
};

self.partition_buffer
.fetch_data_blocks(partition_id, &option)
Ok(self
.partition_buffer
.fetch_data_blocks(partition_id, &option))
}

fn partition_data_block(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ mod data_processor_strategy;
mod hilbert_partition_exchange;
mod transform_window_partition_collect;
mod window_partition_buffer;
mod window_partition_buffer_v2;
mod window_partition_exchange;
mod window_partition_meta;
mod window_partition_partial_top_n_exchange;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use databend_common_pipeline_transforms::MemorySettings;
use databend_common_settings::Settings;
use databend_common_storage::DataOperator;

use super::window_partition_buffer_v2::WindowPartitionBufferV2;
use super::WindowPartitionBuffer;
use super::WindowPartitionMeta;
use crate::pipelines::processors::transforms::DataProcessorStrategy;
Expand All @@ -40,21 +41,89 @@ use crate::spillers::SpillerConfig;
use crate::spillers::SpillerDiskConfig;
use crate::spillers::SpillerType;

enum WindowBuffer {
V1(WindowPartitionBuffer),
V2(WindowPartitionBufferV2),
}

impl WindowBuffer {
fn new(
is_v2: bool,
spiller: Spiller,
num_partitions: usize,
sort_block_size: usize,
memory_settings: MemorySettings,
) -> Result<Self> {
if is_v2 {
let inner = WindowPartitionBufferV2::new(
spiller,
num_partitions,
sort_block_size,
memory_settings,
)?;
Ok(Self::V2(inner))
} else {
let inner = WindowPartitionBuffer::new(
spiller,
num_partitions,
sort_block_size,
memory_settings,
)?;
Ok(Self::V1(inner))
}
}

fn need_spill(&mut self) -> bool {
match self {
WindowBuffer::V1(inner) => inner.need_spill(),
WindowBuffer::V2(inner) => inner.need_spill(),
}
}

fn is_empty(&self) -> bool {
match self {
WindowBuffer::V1(inner) => inner.is_empty(),
WindowBuffer::V2(inner) => inner.is_empty(),
}
}

fn add_data_block(&mut self, partition_id: usize, data_block: DataBlock) {
match self {
WindowBuffer::V1(inner) => inner.add_data_block(partition_id, data_block),
WindowBuffer::V2(inner) => inner.add_data_block(partition_id, data_block),
}
}

async fn spill(&mut self) -> Result<()> {
match self {
WindowBuffer::V1(inner) => inner.spill().await,
WindowBuffer::V2(inner) => inner.spill().await,
}
}

async fn restore(&mut self) -> Result<Vec<DataBlock>> {
match self {
WindowBuffer::V1(inner) => inner.restore().await,
WindowBuffer::V2(inner) => inner.restore().await,
}
}
}

#[derive(Debug, Clone, Copy)]
pub enum Step {
enum Step {
Sync(SyncStep),
Async(AsyncStep),
Finish,
}

#[derive(Debug, Clone, Copy)]
pub enum SyncStep {
enum SyncStep {
Collect,
Process,
}

#[derive(Debug, Clone, Copy)]
pub enum AsyncStep {
enum AsyncStep {
Spill,
Restore,
}
Expand All @@ -69,7 +138,7 @@ pub struct TransformWindowPartitionCollect<S: DataProcessorStrategy> {
// The partition id is used to map the partition id to the new partition id.
partition_id: Vec<usize>,
// The buffer is used to control the memory usage of the window operator.
buffer: WindowPartitionBuffer,
buffer: WindowBuffer,

strategy: S,

Expand Down Expand Up @@ -116,7 +185,8 @@ impl<S: DataProcessorStrategy> TransformWindowPartitionCollect<S> {

// Create the window partition buffer.
let sort_block_size = settings.get_window_partition_sort_block_size()? as usize;
let buffer = WindowPartitionBuffer::new(
let buffer = WindowBuffer::new(
true,
spiller,
partitions.len(),
sort_block_size,
Expand Down Expand Up @@ -275,7 +345,7 @@ impl<S: DataProcessorStrategy> TransformWindowPartitionCollect<S> {
fn collect_data_block(
data_block: DataBlock,
partition_ids: &[usize],
buffer: &mut WindowPartitionBuffer,
buffer: &mut WindowBuffer,
) {
if let Some(meta) = data_block
.get_owned_meta()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl WindowPartitionBuffer {
self.can_spill && self.memory_settings.check_spill()
}

pub fn out_of_memory_limit(&mut self) -> bool {
fn out_of_memory_limit(&mut self) -> bool {
self.memory_settings.check_spill()
}

Expand Down Expand Up @@ -93,7 +93,7 @@ impl WindowPartitionBuffer {
{
if let Some(data_blocks) = self
.partition_buffer
.fetch_data_blocks(partition_id, &option)?
.fetch_data_blocks(partition_id, &option)
{
return self
.spiller
Expand All @@ -112,7 +112,7 @@ impl WindowPartitionBuffer {
self.partition_buffer.partition_memory_size(partition_id);
if let Some(data_blocks) = self
.partition_buffer
.fetch_data_blocks(partition_id, &option)?
.fetch_data_blocks(partition_id, &option)
{
partitions_to_spill.push((partition_id, data_blocks));
accumulated_bytes += partition_memory_size;
Expand Down Expand Up @@ -190,9 +190,9 @@ impl WindowPartitionBuffer {
let option = PartitionBufferFetchOption::PickPartitionWithThreshold(0);
if let Some(data_blocks) = self
.partition_buffer
.fetch_data_blocks(partition_id, &option)?
.fetch_data_blocks(partition_id, &option)
{
result.extend(self.concat_data_blocks(data_blocks)?);
result.extend(concat_data_blocks(data_blocks, self.sort_block_size)?);
}
}

Expand All @@ -203,9 +203,9 @@ impl WindowPartitionBuffer {
let option = PartitionBufferFetchOption::PickPartitionWithThreshold(0);
if let Some(data_blocks) = self
.restored_partition_buffer
.fetch_data_blocks(partition_id, &option)?
.fetch_data_blocks(partition_id, &option)
{
result.extend(self.concat_data_blocks(data_blocks)?);
result.extend(concat_data_blocks(data_blocks, self.sort_block_size)?);
}
}

Expand All @@ -215,26 +215,29 @@ impl WindowPartitionBuffer {
}
Ok(vec![])
}
}

fn concat_data_blocks(&self, data_blocks: Vec<DataBlock>) -> Result<Vec<DataBlock>> {
let mut num_rows = 0;
let mut result = Vec::new();
let mut current_blocks = Vec::new();

for data_block in data_blocks.into_iter() {
num_rows += data_block.num_rows();
current_blocks.push(data_block);
if num_rows >= self.sort_block_size {
result.push(DataBlock::concat(&current_blocks)?);
num_rows = 0;
current_blocks.clear();
}
}

if !current_blocks.is_empty() {
pub(super) fn concat_data_blocks(
data_blocks: Vec<DataBlock>,
target_size: usize,
) -> Result<Vec<DataBlock>> {
let mut num_rows = 0;
let mut result = Vec::new();
let mut current_blocks = Vec::new();

for data_block in data_blocks.into_iter() {
num_rows += data_block.num_rows();
current_blocks.push(data_block);
if num_rows >= target_size {
result.push(DataBlock::concat(&current_blocks)?);
num_rows = 0;
current_blocks.clear();
}
}

Ok(result)
if !current_blocks.is_empty() {
result.push(DataBlock::concat(&current_blocks)?);
}

Ok(result)
}
Loading
Loading