Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions be/src/pipeline/exec/analytic_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -780,8 +780,6 @@ Status AnalyticSinkOperatorX::_add_input_block(doris::RuntimeState* state,
size_t block_rows = input_block->rows();
local_state._input_total_rows += block_rows;

// record origin columns, maybe be after this, could cast some column but no need to output
auto column_to_keep = input_block->columns();
{
SCOPED_TIMER(local_state._compute_agg_data_timer);
//insert _agg_input_columns, execute calculate for its, and those columns maybe could remove have used data
Expand Down Expand Up @@ -817,7 +815,6 @@ Status AnalyticSinkOperatorX::_add_input_block(doris::RuntimeState* state,
local_state._range_result_columns[i].get(), block_rows));
}
}
vectorized::Block::erase_useless_column(input_block, column_to_keep);
COUNTER_UPDATE(local_state._memory_used_counter, input_block->allocated_bytes());
COUNTER_UPDATE(local_state._blocks_memory_usage, input_block->allocated_bytes());
local_state._input_blocks.emplace_back(std::move(*input_block));
Expand Down Expand Up @@ -894,7 +891,7 @@ size_t AnalyticSinkOperatorX::get_reserve_mem_size(RuntimeState* state, bool eos
return local_state._reserve_mem_size;
}

Status AnalyticSinkOperatorX::_insert_range_column(vectorized::Block* block,
Status AnalyticSinkOperatorX::_insert_range_column(const vectorized::Block* block,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里如果是const,应该改成const &

const vectorized::VExprContextSPtr& expr,
vectorized::IColumn* dst_column, size_t length) {
vectorized::ColumnPtr column;
Expand Down
3 changes: 2 additions & 1 deletion be/src/pipeline/exec/analytic_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,8 @@ class AnalyticSinkOperatorX final : public DataSinkOperatorX<AnalyticSinkLocalSt

private:
friend class AnalyticSinkLocalState;
Status _insert_range_column(vectorized::Block* block, const vectorized::VExprContextSPtr& expr,
Status _insert_range_column(const vectorized::Block* block,
const vectorized::VExprContextSPtr& expr,
vectorized::IColumn* dst_column, size_t length);
Status _add_input_block(doris::RuntimeState* state, vectorized::Block* input_block);

Expand Down
19 changes: 5 additions & 14 deletions be/src/pipeline/exec/nested_loop_join_probe_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,6 @@ class NestedLoopJoinProbeLocalState final
NestedLoopJoinProbeLocalState(RuntimeState* state, OperatorXBase* parent);
~NestedLoopJoinProbeLocalState() override = default;

#define CLEAR_BLOCK \
for (size_t i = 0; i < column_to_keep; ++i) { \
block->get_by_position(i).column->assume_mutable()->clear(); \
}
Status init(RuntimeState* state, LocalStateInfo& info) override;
Status open(RuntimeState* state) override;
Status close(RuntimeState* state) override;
Expand Down Expand Up @@ -83,7 +79,6 @@ class NestedLoopJoinProbeLocalState final
void _append_probe_data_with_null(vectorized::Block& block) const;
template <typename Filter, bool SetBuildSideFlag, bool SetProbeSideFlag>
void _do_filtering_and_update_visited_flags_impl(vectorized::Block* block,
uint32_t column_to_keep,
size_t build_block_idx,
size_t processed_blocks_num, bool materialize,
Filter& filter) {
Expand Down Expand Up @@ -127,17 +122,15 @@ class NestedLoopJoinProbeLocalState final

if (materialize) {
SCOPED_TIMER(_filtered_by_join_conjuncts_timer);
vectorized::Block::filter_block_internal(block, filter, column_to_keep);
vectorized::Block::filter_block_internal(block, filter);
} else {
CLEAR_BLOCK
block->clear_column_data();
}
}

// need exception safety
template <bool SetBuildSideFlag, bool SetProbeSideFlag, bool IgnoreNull>
Status _do_filtering_and_update_visited_flags(vectorized::Block* block, bool materialize) {
// The number of columns will not exceed the range of u32.
uint32_t column_to_keep = cast_set<uint32_t>(block->columns());
// If we need to set visited flags for build side,
// 1. Execute conjuncts and get a column with bool type to do filtering.
// 2. Use bool column to update build-side visited flags.
Expand All @@ -155,7 +148,7 @@ class NestedLoopJoinProbeLocalState final
}

if (can_filter_all) {
CLEAR_BLOCK
block->clear_column_data();
std::stack<uint16_t> empty1;
_probe_offset_stack.swap(empty1);

Expand All @@ -164,8 +157,7 @@ class NestedLoopJoinProbeLocalState final
} else {
_do_filtering_and_update_visited_flags_impl<decltype(filter), SetBuildSideFlag,
SetProbeSideFlag>(
block, column_to_keep, build_block_idx, processed_blocks_num, materialize,
filter);
block, build_block_idx, processed_blocks_num, materialize, filter);
}
} else if (block->rows() > 0) {
if constexpr (SetBuildSideFlag) {
Expand All @@ -189,10 +181,9 @@ class NestedLoopJoinProbeLocalState final
1);
}
if (!materialize) {
CLEAR_BLOCK
block->clear_column_data();
}
}
vectorized::Block::erase_useless_column(block, column_to_keep);
return Status::OK();
}

Expand Down
16 changes: 7 additions & 9 deletions be/src/vec/runtime/partitioner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,16 @@ Status Crc32HashPartitioner<ChannelIds>::do_partitioning(RuntimeState* state, Bl
size_t rows = block->rows();

if (rows > 0) {
auto column_to_keep = block->columns();

int result_size = cast_set<int>(_partition_expr_ctxs.size());
std::vector<int> result(result_size);
Columns columns(_partition_expr_ctxs.size());
for (size_t i = 0; i < _partition_expr_ctxs.size(); ++i) {
RETURN_IF_ERROR(_partition_expr_ctxs[i]->execute(block, columns[i]));
}

_initialize_hash_vals(rows);
auto* __restrict hashes = _hash_vals.data();
RETURN_IF_ERROR(_get_partition_column_result(block, result));
for (int j = 0; j < result_size; ++j) {
const auto& [col, is_const] = unpack_if_const(block->get_by_position(result[j]).column);

for (int j = 0; j < _partition_expr_ctxs.size(); ++j) {
const auto& [col, is_const] = unpack_if_const(columns[j]);
if (is_const) {
continue;
}
Expand All @@ -51,8 +51,6 @@ Status Crc32HashPartitioner<ChannelIds>::do_partitioning(RuntimeState* state, Bl
for (size_t i = 0; i < rows; i++) {
hashes[i] = ChannelIds()(hashes[i], _partition_count);
}

Block::erase_useless_column(block, column_to_keep);
}
return Status::OK();
}
Expand Down
7 changes: 0 additions & 7 deletions be/src/vec/runtime/partitioner.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,6 @@ class Crc32HashPartitioner : public PartitionerBase {
Status clone(RuntimeState* state, std::unique_ptr<PartitionerBase>& partitioner) override;

protected:
Status _get_partition_column_result(Block* block, std::vector<int>& result) const {
int counter = 0;
for (auto ctx : _partition_expr_ctxs) {
RETURN_IF_ERROR(ctx->execute(block, &result[counter++]));
}
return Status::OK();
}

Status _clone_expr_ctxs(RuntimeState* state, VExprContextSPtrs& new_partition_expr_ctxs) const {
new_partition_expr_ctxs.resize(_partition_expr_ctxs.size());
Expand Down