Skip to content

Commit

Permalink
Sync refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
mwilsnd committed May 21, 2024
1 parent 95b79cd commit 6ad5b5c
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 31 deletions.
61 changes: 32 additions & 29 deletions src/mbgl/util/thread_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ void ThreadedSchedulerBase::terminate() {
runRenderJobs();

{
std::unique_lock<std::mutex> lock(workerMutex);
std::lock_guard<std::mutex> lock(workerMutex);
terminated = true;
}

Expand All @@ -36,24 +36,36 @@ std::thread ThreadedSchedulerBase::makeSchedulerThread(size_t index) {

owningThreadPool.set(this);

while (!terminated) {
bool didWork = true;
while (true) {
std::unique_lock<std::mutex> conditionLock(workerMutex);
if (!terminated && !didWork) {
cvAvailable.wait(conditionLock);
}

if (terminated) {
platform::detachThread();
break;
}

// Let other threads run
conditionLock.unlock();

didWork = false;
std::vector<std::shared_ptr<Queue>> pending;
bool didWork = false;

{
// 1. Gather buckets for us to visit this iteration
std::shared_lock<std::shared_mutex> lock(taggedQueueLock);
for (const auto& [tag, q] : taggedQueue) {
pending.push_back(q);
std::lock_guard<std::mutex> lock(taggedQueueLock);
for (const auto& [tag, queue] : taggedQueue) {
pending.push_back(queue);
}
}

// 2. Visit a task from each
for (auto& q : pending) {
std::function<void()> tasklet;

{
std::unique_lock<std::mutex> lock(q->lock);
std::lock_guard<std::mutex> lock(q->lock);
if (q->queue.size()) {
tasklet = std::move(q->queue.front());
q->queue.pop();
Expand All @@ -70,13 +82,13 @@ std::thread ThreadedSchedulerBase::makeSchedulerThread(size_t index) {
// destroy the function and release its captures before unblocking `waitForEmpty`
tasklet = {};
if (!--q->runningCount) {
std::unique_lock<std::mutex> lock(q->lock);
std::lock_guard<std::mutex> lock(q->lock);
if (q->queue.empty()) {
q->cv.notify_all();
}
}
} catch (...) {
std::unique_lock<std::mutex> lock(q->lock);
std::lock_guard<std::mutex> lock(q->lock);
if (handler) {
handler(std::current_exception());
}
Expand All @@ -92,16 +104,7 @@ std::thread ThreadedSchedulerBase::makeSchedulerThread(size_t index) {
throw;
}
}

if (!didWork) {
std::unique_lock<std::mutex> conditionLock(workerMutex);
if (!terminated) {
cvAvailable.wait(conditionLock);
}
}
}

platform::detachThread();
});
}

Expand All @@ -113,12 +116,10 @@ void ThreadedSchedulerBase::schedule(const void* tag, std::function<void()>&& fn
assert(fn);
if (!fn) return;

decltype(taggedQueue)::const_iterator it;
std::shared_ptr<Queue> q;

{
std::unique_lock<std::shared_mutex> lock(taggedQueueLock);
it = taggedQueue.find(tag);
std::lock_guard<std::mutex> lock(taggedQueueLock);
auto it = taggedQueue.find(tag);
if (it == taggedQueue.end()) {
q = std::make_shared<Queue>();
taggedQueue.insert({tag, q});
Expand All @@ -128,7 +129,7 @@ void ThreadedSchedulerBase::schedule(const void* tag, std::function<void()>&& fn
}

{
std::unique_lock<std::mutex> lock(q->lock);
std::lock_guard<std::mutex> lock(q->lock);
q->queue.push(std::move(fn));
}

Expand All @@ -143,11 +144,10 @@ void ThreadedSchedulerBase::waitForEmpty(const void* tag = nullptr) {
tag = static_cast<const void*>(this);
}

decltype(taggedQueue)::const_iterator it;
std::shared_ptr<Queue> q;
{
std::shared_lock<std::shared_mutex> lock(taggedQueueLock);
it = taggedQueue.find(tag);
std::lock_guard<std::mutex> lock(taggedQueueLock);
auto it = taggedQueue.find(tag);
if (it == taggedQueue.end()) {
return;
}
Expand All @@ -160,7 +160,10 @@ void ThreadedSchedulerBase::waitForEmpty(const void* tag = nullptr) {
}

// After waiting for the queue to empty, go ahead and erase it from the map.
taggedQueue.erase(tag);
{
std::lock_guard<std::mutex> lock(taggedQueueLock);
taggedQueue.erase(tag);
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/mbgl/util/thread_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class ThreadedSchedulerBase : public Scheduler {
// Signal when an item is added to the queue
std::condition_variable cvAvailable;
std::mutex workerMutex;
std::shared_mutex taggedQueueLock;
std::mutex taggedQueueLock;
util::ThreadLocal<ThreadedSchedulerBase> owningThreadPool;
bool terminated{false};

Expand All @@ -47,7 +47,7 @@ class ThreadedSchedulerBase : public Scheduler {
std::mutex lock; /* lock */
std::queue<std::function<void()>> queue; /* pending task queue */
};
std::map<const void*, std::shared_ptr<Queue>> taggedQueue;
std::unordered_map<const void*, std::shared_ptr<Queue>> taggedQueue;
};

/**
Expand Down

0 comments on commit 6ad5b5c

Please sign in to comment.