From 6ad5b5c3cd8ab676eaaa528465a6b11966bfb6b1 Mon Sep 17 00:00:00 2001 From: mwilsnd Date: Tue, 21 May 2024 15:50:35 -0400 Subject: [PATCH] Sync refactor --- src/mbgl/util/thread_pool.cpp | 61 ++++++++++++++++++----------------- src/mbgl/util/thread_pool.hpp | 4 +-- 2 files changed, 34 insertions(+), 31 deletions(-) diff --git a/src/mbgl/util/thread_pool.cpp b/src/mbgl/util/thread_pool.cpp index 6227b627c49..94e1190693f 100644 --- a/src/mbgl/util/thread_pool.cpp +++ b/src/mbgl/util/thread_pool.cpp @@ -15,7 +15,7 @@ void ThreadedSchedulerBase::terminate() { runRenderJobs(); { - std::unique_lock lock(workerMutex); + std::lock_guard lock(workerMutex); terminated = true; } @@ -36,24 +36,36 @@ std::thread ThreadedSchedulerBase::makeSchedulerThread(size_t index) { owningThreadPool.set(this); - while (!terminated) { + bool didWork = true; + while (true) { + std::unique_lock conditionLock(workerMutex); + if (!terminated && !didWork) { + cvAvailable.wait(conditionLock); + } + + if (terminated) { + platform::detachThread(); + break; + } + + // Let other threads run + conditionLock.unlock(); + + didWork = false; std::vector> pending; - bool didWork = false; - { // 1. Gather buckets for us to visit this iteration - std::shared_lock lock(taggedQueueLock); - for (const auto& [tag, q] : taggedQueue) { - pending.push_back(q); + std::lock_guard lock(taggedQueueLock); + for (const auto& [tag, queue] : taggedQueue) { + pending.push_back(queue); } } // 2. Visit a task from each for (auto& q : pending) { std::function tasklet; - { - std::unique_lock lock(q->lock); + std::lock_guard lock(q->lock); if (q->queue.size()) { tasklet = std::move(q->queue.front()); q->queue.pop(); @@ -70,13 +82,13 @@ std::thread ThreadedSchedulerBase::makeSchedulerThread(size_t index) { // destroy the function and release its captures before unblocking `waitForEmpty` tasklet = {}; if (!--q->runningCount) { - std::unique_lock lock(q->lock); + std::lock_guard lock(q->lock); if (q->queue.empty()) { q->cv.notify_all(); } } } catch (...) { - std::unique_lock lock(q->lock); + std::lock_guard lock(q->lock); if (handler) { handler(std::current_exception()); } @@ -92,16 +104,7 @@ std::thread ThreadedSchedulerBase::makeSchedulerThread(size_t index) { throw; } } - - if (!didWork) { - std::unique_lock conditionLock(workerMutex); - if (!terminated) { - cvAvailable.wait(conditionLock); - } - } } - - platform::detachThread(); }); } @@ -113,12 +116,10 @@ void ThreadedSchedulerBase::schedule(const void* tag, std::function&& fn assert(fn); if (!fn) return; - decltype(taggedQueue)::const_iterator it; std::shared_ptr q; - { - std::unique_lock lock(taggedQueueLock); - it = taggedQueue.find(tag); + std::lock_guard lock(taggedQueueLock); + auto it = taggedQueue.find(tag); if (it == taggedQueue.end()) { q = std::make_shared(); taggedQueue.insert({tag, q}); @@ -128,7 +129,7 @@ void ThreadedSchedulerBase::schedule(const void* tag, std::function&& fn } { - std::unique_lock lock(q->lock); + std::lock_guard lock(q->lock); q->queue.push(std::move(fn)); } @@ -143,11 +144,10 @@ void ThreadedSchedulerBase::waitForEmpty(const void* tag = nullptr) { tag = static_cast(this); } - decltype(taggedQueue)::const_iterator it; std::shared_ptr q; { - std::shared_lock lock(taggedQueueLock); - it = taggedQueue.find(tag); + std::lock_guard lock(taggedQueueLock); + auto it = taggedQueue.find(tag); if (it == taggedQueue.end()) { return; } @@ -160,7 +160,10 @@ void ThreadedSchedulerBase::waitForEmpty(const void* tag = nullptr) { } // After waiting for the queue to empty, go ahead and erase it from the map. - taggedQueue.erase(tag); + { + std::lock_guard lock(taggedQueueLock); + taggedQueue.erase(tag); + } } } diff --git a/src/mbgl/util/thread_pool.hpp b/src/mbgl/util/thread_pool.hpp index b7fb44c6d4e..601dd5a3b71 100644 --- a/src/mbgl/util/thread_pool.hpp +++ b/src/mbgl/util/thread_pool.hpp @@ -36,7 +36,7 @@ class ThreadedSchedulerBase : public Scheduler { // Signal when an item is added to the queue std::condition_variable cvAvailable; std::mutex workerMutex; - std::shared_mutex taggedQueueLock; + std::mutex taggedQueueLock; util::ThreadLocal owningThreadPool; bool terminated{false}; @@ -47,7 +47,7 @@ class ThreadedSchedulerBase : public Scheduler { std::mutex lock; /* lock */ std::queue> queue; /* pending task queue */ }; - std::map> taggedQueue; + std::unordered_map> taggedQueue; }; /**