Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 41 additions & 29 deletions be/src/exec/pipeline/dependency.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ void Dependency::set_ready() {
}
std::vector<std::weak_ptr<PipelineTask>> local_block_task {};
{
std::unique_lock<std::mutex> lc(_task_lock);
LockGuard lc(_task_lock);
if (_ready) {
return;
}
Expand All @@ -77,14 +77,14 @@ void Dependency::set_ready() {
}
for (auto task : local_block_task) {
if (auto t = task.lock()) {
std::unique_lock<std::mutex> lc(_task_lock);
LockGuard lc(_task_lock);
t->wake_up(this, lc);
}
}
}

Dependency* Dependency::is_blocked_by(std::shared_ptr<PipelineTask> task) {
std::unique_lock<std::mutex> lc(_task_lock);
LockGuard lc(_task_lock);
auto ready = _ready.load();
if (!ready && task) {
_add_block_task(task);
Expand All @@ -95,19 +95,34 @@ Dependency* Dependency::is_blocked_by(std::shared_ptr<PipelineTask> task) {
}

std::string Dependency::debug_string(int indentation_level) {
size_t blocked_task_size = 0;
{
LockGuard lc(_task_lock);
blocked_task_size = _blocked_task.size();
}
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer, "{}{}: id={}, block task = {}, ready={}, _always_ready={}",
std::string(indentation_level * 2, ' '), _name, _node_id, _blocked_task.size(),
std::string(indentation_level * 2, ' '), _name, _node_id, blocked_task_size,
_ready, _always_ready);
return fmt::to_string(debug_string_buffer);
}

std::string CountedFinishDependency::debug_string(int indentation_level) {
size_t blocked_task_size = 0;
uint32_t counter = 0;
{
LockGuard lc(_task_lock);
blocked_task_size = _blocked_task.size();
}
{
LockGuard lc(_mtx);
counter = _counter;
}
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer,
"{}{}: id={}, block_task={}, ready={}, _always_ready={}, count={}",
std::string(indentation_level * 2, ' '), _name, _node_id, _blocked_task.size(),
_ready, _always_ready, _counter);
std::string(indentation_level * 2, ' '), _name, _node_id, blocked_task_size,
_ready, _always_ready, counter);
return fmt::to_string(debug_string_buffer);
}

Expand Down Expand Up @@ -142,50 +157,47 @@ bool RuntimeFilterTimer::should_be_check_timeout() {

void RuntimeFilterTimerQueue::start() {
while (!_stop) {
std::unique_lock<std::mutex> lk(cv_m);

UniqueLock queue_lock {_que_lock};
while (_que.empty() && !_stop) {
cv.wait_for(lk, std::chrono::seconds(3), [this] { return !_que.empty() || _stop; });
cv.wait_for(queue_lock, std::chrono::seconds(3));
}
if (_stop) {
break;
}
{
std::unique_lock<std::mutex> lc(_que_lock);
std::list<std::shared_ptr<RuntimeFilterTimer>> new_que;
for (auto& it : _que) {
if (it.use_count() == 1) {
// `use_count == 1` means this runtime filter has been released
} else if (it->should_be_check_timeout()) {
if (it->force_wait_timeout() || it->_parent->is_blocked_by()) {
// This means runtime filter is not ready, so we call timeout or continue to poll this timer.
int64_t ms_since_registration = MonotonicMillis() - it->registration_time();
if (ms_since_registration > it->wait_time_ms()) {
it->call_timeout();
} else {
new_que.push_back(std::move(it));
}
std::list<std::shared_ptr<RuntimeFilterTimer>> new_que;
for (auto& it : _que) {
if (it.use_count() == 1) {
// `use_count == 1` means this runtime filter has been released
} else if (it->should_be_check_timeout()) {
if (it->force_wait_timeout() || it->_parent->is_blocked_by()) {
// This means runtime filter is not ready, so we call timeout or continue to poll this timer.
int64_t ms_since_registration = MonotonicMillis() - it->registration_time();
if (ms_since_registration > it->wait_time_ms()) {
it->call_timeout();
} else {
new_que.push_back(std::move(it));
}
} else {
new_que.push_back(std::move(it));
}
} else {
new_que.push_back(std::move(it));
}
new_que.swap(_que);
}
new_que.swap(_que);
queue_lock.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(interval));
}
_shutdown = true;
}

void LocalExchangeSharedState::sub_running_sink_operators() {
std::unique_lock<std::mutex> lc(le_lock);
LockGuard lc(le_lock);
if (exchanger->_running_sink_operators.fetch_sub(1) == 1) {
_set_always_ready();
}
}

void LocalExchangeSharedState::sub_running_source_operators() {
std::unique_lock<std::mutex> lc(le_lock);
LockGuard lc(le_lock);
if (exchanger->_running_source_operators.fetch_sub(1) == 1) {
_set_always_ready();
exchanger->finalize();
Expand Down
34 changes: 17 additions & 17 deletions be/src/exec/pipeline/dependency.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <sqltypes.h>

#include <atomic>
#include <condition_variable>
#include <functional>
#include <memory>
#include <mutex>
Expand Down Expand Up @@ -137,7 +138,7 @@ class Dependency : public std::enable_shared_from_this<Dependency> {
if (_always_ready) {
return;
}
std::unique_lock<std::mutex> lc(_always_ready_lock);
LockGuard lc(_always_ready_lock);
if (_always_ready) {
return;
}
Expand All @@ -148,7 +149,7 @@ class Dependency : public std::enable_shared_from_this<Dependency> {
if (_always_ready) {
return;
}
std::unique_lock<std::mutex> lc(_always_ready_lock);
LockGuard lc(_always_ready_lock);
if (_always_ready) {
return;
}
Expand All @@ -157,7 +158,7 @@ class Dependency : public std::enable_shared_from_this<Dependency> {
}

protected:
void _add_block_task(std::shared_ptr<PipelineTask> task);
void _add_block_task(std::shared_ptr<PipelineTask> task) REQUIRES(_task_lock);

const int _id;
const int _node_id;
Expand All @@ -167,12 +168,12 @@ class Dependency : public std::enable_shared_from_this<Dependency> {
BasicSharedState* _shared_state = nullptr;
MonotonicStopWatch _watcher;

std::mutex _task_lock;
std::vector<std::weak_ptr<PipelineTask>> _blocked_task;
AnnotatedMutex _task_lock;
std::vector<std::weak_ptr<PipelineTask>> _blocked_task GUARDED_BY(_task_lock);

// If `_always_ready` is true, `block()` will never block tasks.
std::atomic<bool> _always_ready = false;
std::mutex _always_ready_lock;
AnnotatedMutex _always_ready_lock;
};

struct FakeSharedState final : public BasicSharedState {
Expand All @@ -186,15 +187,15 @@ class CountedFinishDependency final : public Dependency {
: Dependency(id, node_id, std::move(name), true) {}

void add(uint32_t count = 1) {
std::unique_lock<std::mutex> l(_mtx);
LockGuard l(_mtx);
if (!_counter) {
block();
}
_counter += count;
}

void sub() {
std::unique_lock<std::mutex> l(_mtx);
LockGuard l(_mtx);
// _counter is unsigned: a stray sub() when counter is already 0 would
// underflow to UINT32_MAX and the dependency would never become ready,
// hanging the query forever. Fail loudly instead.
Expand All @@ -211,8 +212,8 @@ class CountedFinishDependency final : public Dependency {
std::string debug_string(int indentation_level = 0) override;

private:
std::mutex _mtx;
uint32_t _counter = 0;
AnnotatedMutex _mtx;
uint32_t _counter GUARDED_BY(_mtx) = 0;
};

struct RuntimeFilterTimerQueue;
Expand Down Expand Up @@ -247,7 +248,7 @@ class RuntimeFilterTimer {
friend struct RuntimeFilterTimerQueue;
std::shared_ptr<Dependency> _parent = nullptr;
std::vector<std::shared_ptr<Dependency>> _local_runtime_filter_dependencies;
std::mutex _lock;
AnnotatedMutex _lock;
int64_t _registration_time;
const int32_t _wait_time_ms;
// true only for group_commit_scan_operator
Expand All @@ -274,18 +275,17 @@ struct RuntimeFilterTimerQueue {
~RuntimeFilterTimerQueue() = default;
RuntimeFilterTimerQueue() { _thread = std::thread(&RuntimeFilterTimerQueue::start, this); }
void push_filter_timer(std::vector<std::shared_ptr<RuntimeFilterTimer>>&& filter) {
std::unique_lock<std::mutex> lc(_que_lock);
LockGuard queue_lock(_que_lock);
_que.insert(_que.end(), filter.begin(), filter.end());
cv.notify_all();
}

std::thread _thread;
std::condition_variable cv;
std::mutex cv_m;
std::mutex _que_lock;
std::condition_variable_any cv;
AnnotatedMutex _que_lock;
std::atomic_bool _stop = false;
std::atomic_bool _shutdown = false;
std::list<std::shared_ptr<RuntimeFilterTimer>> _que;
std::list<std::shared_ptr<RuntimeFilterTimer>> _que GUARDED_BY(_que_lock);
};

struct AggSharedState : public BasicSharedState {
Expand Down Expand Up @@ -887,7 +887,7 @@ struct LocalExchangeSharedState : public BasicSharedState {
std::atomic<int64_t> mem_usage = 0;
std::atomic<size_t> _buffer_mem_limit = config::local_exchange_buffer_mem_limit;
// We need to make sure to add mem_usage first and then enqueue, otherwise sub mem_usage may cause negative mem_usage during concurrent dequeue.
std::mutex le_lock;
AnnotatedMutex le_lock;
void sub_running_sink_operators();
void sub_running_source_operators();
void _set_always_ready() {
Expand Down
46 changes: 29 additions & 17 deletions be/src/exec/pipeline/pipeline_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ Status PipelineTask::prepare(const std::vector<TScanRangeParams>& scan_range, co
{
const auto& deps =
_state->get_local_state(_source->operator_id())->execution_dependencies();
std::unique_lock<std::mutex> lc(_dependency_lifecycle_lock);
LockGuard lc(_dependency_lifecycle_lock);
std::copy(deps.begin(), deps.end(),
std::inserter(_execution_dependencies, _execution_dependencies.end()));
}
Expand Down Expand Up @@ -200,7 +200,7 @@ Status PipelineTask::_extract_dependencies() {
}
}
{
std::unique_lock<std::mutex> lc(_dependency_lifecycle_lock);
LockGuard lc(_dependency_lifecycle_lock);
read_dependencies.swap(_read_dependencies);
write_dependencies.swap(_write_dependencies);
finish_dependencies.swap(_finish_dependencies);
Expand Down Expand Up @@ -347,7 +347,7 @@ bool PipelineTask::_is_blocked() {
void PipelineTask::unblock_all_dependencies() {
// Keep dependency pointers and task-owned operator/shared state stable because set_ready() may
// synchronously call wake_up() and submit this task.
std::unique_lock<std::mutex> lock(_dependency_lifecycle_lock);
LockGuard lock(_dependency_lifecycle_lock);
auto fragment = _fragment_context.lock();
if (!is_finalized() && fragment) {
try {
Expand Down Expand Up @@ -889,7 +889,7 @@ Status PipelineTask::finalize() {
}
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(fragment->get_query_ctx()->query_mem_tracker());
// Synchronize with unblock_all_dependencies() before clearing state used by wake_up()->submit().
std::unique_lock<std::mutex> lock(_dependency_lifecycle_lock);
LockGuard lock(_dependency_lifecycle_lock);
RETURN_IF_ERROR(_state_transition(State::FINALIZED));
_sink_shared_state.reset();
_op_shared_states.clear();
Expand Down Expand Up @@ -928,14 +928,28 @@ Status PipelineTask::close(Status exec_status, bool close_sink) {

if (close_sink) {
// Synchronize FINISHED with forced unblocking so delayed wake_up() sees a stable state.
std::unique_lock<std::mutex> lock(_dependency_lifecycle_lock);
LockGuard lock(_dependency_lifecycle_lock);
RETURN_IF_ERROR(_state_transition(State::FINISHED));
}
return s;
}

std::string PipelineTask::debug_string() {
fmt::memory_buffer debug_string_buffer;
Dependency* cur_blocked_dep = nullptr;
std::vector<std::vector<Dependency*>> read_dependencies;
std::vector<Dependency*> write_dependencies;
std::vector<Dependency*> execution_dependencies;
std::vector<Dependency*> finish_dependencies;

{
LockGuard lc(_dependency_lifecycle_lock);
cur_blocked_dep = _blocked_dep;
read_dependencies = _read_dependencies;
write_dependencies = _write_dependencies;
execution_dependencies = _execution_dependencies;
finish_dependencies = _finish_dependencies;
}

fmt::format_to(debug_string_buffer, "QueryId: {}\n", print_id(_query_id));
fmt::format_to(debug_string_buffer, "InstanceId: {}\n",
Expand All @@ -948,8 +962,6 @@ std::string PipelineTask::debug_string() {
_index, _opened, _eos, _to_string(_exec_state), _dry_run, _wake_up_early.load(),
_wake_by, _state_change_watcher.elapsed_time() / NANOS_PER_SEC, _spilling,
is_running());
std::unique_lock<std::mutex> lc(_dependency_lifecycle_lock);
auto* cur_blocked_dep = _blocked_dep;
auto fragment = _fragment_context.lock();
if (is_finalized() || !fragment) {
fmt::format_to(debug_string_buffer, " pipeline name = {}", _pipeline_name);
Expand Down Expand Up @@ -979,32 +991,32 @@ std::string PipelineTask::debug_string() {
fmt::format_to(debug_string_buffer, "\nRead Dependency Information: \n");

size_t i = 0;
for (; i < _read_dependencies.size(); i++) {
for (size_t j = 0; j < _read_dependencies[i].size(); j++) {
for (; i < read_dependencies.size(); i++) {
for (size_t j = 0; j < read_dependencies[i].size(); j++) {
fmt::format_to(debug_string_buffer, "{}. {}\n", i,
_read_dependencies[i][j]->debug_string(cast_set<int>(i) + 1));
read_dependencies[i][j]->debug_string(cast_set<int>(i) + 1));
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This now dereferences raw Dependency* values after _dependency_lifecycle_lock has been released. The lock comment says it protects the dependency containers and the raw pointers they contain, and finalize() takes the same lock before clearing _sink_shared_state, _op_shared_states, and _shared_state_map, which own these dependencies. A concrete race is: debug_string() copies read_dependencies and releases the lock, observes the task as not finalized, then another thread runs finalize() and clears the shared states; the subsequent read_dependencies[i][j]->debug_string() can use a freed Dependency. Please keep the lifetime protection until all raw dependency pointers are no longer dereferenced, or snapshot owning shared_ptr state under the lock instead of raw pointers.

}

fmt::format_to(debug_string_buffer, "{}. {}\n", i,
_memory_sufficient_dependency->debug_string(cast_set<int>(i++)));

fmt::format_to(debug_string_buffer, "\nWrite Dependency Information: \n");
for (size_t j = 0; j < _write_dependencies.size(); j++, i++) {
for (size_t j = 0; j < write_dependencies.size(); j++, i++) {
fmt::format_to(debug_string_buffer, "{}. {}\n", i,
_write_dependencies[j]->debug_string(cast_set<int>(j) + 1));
write_dependencies[j]->debug_string(cast_set<int>(j) + 1));
}

fmt::format_to(debug_string_buffer, "\nExecution Dependency Information: \n");
for (size_t j = 0; j < _execution_dependencies.size(); j++, i++) {
for (size_t j = 0; j < execution_dependencies.size(); j++, i++) {
fmt::format_to(debug_string_buffer, "{}. {}\n", i,
_execution_dependencies[j]->debug_string(cast_set<int>(i) + 1));
execution_dependencies[j]->debug_string(cast_set<int>(i) + 1));
}

fmt::format_to(debug_string_buffer, "Finish Dependency Information: \n");
for (size_t j = 0; j < _finish_dependencies.size(); j++, i++) {
for (size_t j = 0; j < finish_dependencies.size(); j++, i++) {
fmt::format_to(debug_string_buffer, "{}. {}\n", i,
_finish_dependencies[j]->debug_string(cast_set<int>(i) + 1));
finish_dependencies[j]->debug_string(cast_set<int>(i) + 1));
}
return fmt::to_string(debug_string_buffer);
}
Expand Down Expand Up @@ -1057,7 +1069,7 @@ Status PipelineTask::revoke_memory(const std::shared_ptr<SpillContext>& spill_co
return Status::OK();
}

void PipelineTask::wake_up(Dependency* dep, std::unique_lock<std::mutex>& /* dep_lock */) {
void PipelineTask::wake_up(Dependency* dep, LockGuard<AnnotatedMutex>& /* dep_lock */) {
auto cancel_if_error = [&](const Status& st) {
if (!st.ok()) {
if (auto frag = fragment_context().lock()) {
Expand Down
Loading
Loading