Skip to content

Commit

Permalink
Add TaggedScheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
mwilsnd committed Jun 3, 2024
1 parent 096ea7f commit 6d7dd8f
Show file tree
Hide file tree
Showing 47 changed files with 250 additions and 252 deletions.
27 changes: 25 additions & 2 deletions include/mbgl/actor/scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ class Scheduler {

/// Enqueues a function for execution.
virtual void schedule(std::function<void()>&&) = 0;
virtual void schedule(const void*, std::function<void()>&&) = 0;

/// Makes a weak pointer to this Scheduler.
virtual mapbox::base::WeakPtr<Scheduler> makeWeakPtr() = 0;
/// Enqueues a function for execution on the render thread.
Expand Down Expand Up @@ -69,8 +71,7 @@ class Scheduler {

/// Wait until there's nothing pending or in process
/// Must not be called from a task provided to this scheduler.
/// @param timeout Time to wait, or zero to wait forever.
virtual std::size_t waitForEmpty(Milliseconds timeout = Milliseconds{0}) = 0;
virtual void waitForEmpty(const void* tag = nullptr) = 0;

/// Set/Get the current Scheduler for this thread
static Scheduler* GetCurrent();
Expand Down Expand Up @@ -116,4 +117,26 @@ class Scheduler {
std::function<void(const std::exception_ptr)> handler;
};

/// @brief A TaggedScheduler pairs a scheduler with a memory address. Tasklets submitted via a TaggedScheduler
/// are bucketed with the tag address to enable queries on tasks related to that tag. This allows multiple map
/// instances to all use the same scheduler and await processing of all their tasks prior to map deletion.
class TaggedScheduler {
public:
TaggedScheduler() = delete;
TaggedScheduler(std::shared_ptr<Scheduler> scheduler_, const void* tagAddr_)
: scheduler(std::move(scheduler_)),
tagAddr(tagAddr_) {}

/// @brief Get the wrapped scheduler
/// @return
const std::shared_ptr<Scheduler>& get() const noexcept { return scheduler; }

void schedule(std::function<void()>&& fn) { scheduler->schedule(tagAddr, std::move(fn)); }
void waitForEmpty() const noexcept { scheduler->waitForEmpty(tagAddr); }

private:
std::shared_ptr<Scheduler> scheduler;
const void* tagAddr;
};

} // namespace mbgl
3 changes: 2 additions & 1 deletion include/mbgl/util/run_loop.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,10 @@ class RunLoop : public Scheduler, private util::noncopyable {
}

void schedule(std::function<void()>&& fn) override { invoke(std::move(fn)); }
void schedule(const void*, std::function<void()>&& fn) override { schedule(std::move(fn)); }
::mapbox::base::WeakPtr<Scheduler> makeWeakPtr() override { return weakFactory.makeWeakPtr(); }

std::size_t waitForEmpty(Milliseconds timeout) override;
void waitForEmpty(const void* tag = nullptr) override;

class Impl;

Expand Down
9 changes: 3 additions & 6 deletions platform/android/MapLibreAndroid/src/cpp/map_renderer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,20 +84,17 @@ void MapRenderer::schedule(std::function<void()>&& scheduled) {
}
}

std::size_t MapRenderer::waitForEmpty(Milliseconds timeout) {
void MapRenderer::waitForEmpty([[maybe_unused]] const void* tag) {
try {
android::UniqueEnv _env = android::AttachEnv();
static auto& javaClass = jni::Class<MapRenderer>::Singleton(*_env);
static auto waitForEmpty = javaClass.GetMethod<jni::jint(jni::jlong)>(*_env, "waitForEmpty");
static auto waitForEmpty = javaClass.GetMethod<void()>(*_env, "waitForEmpty");
if (auto weakReference = javaPeer.get(*_env)) {
return weakReference.Call(*_env, waitForEmpty, static_cast<int64_t>(timeout.count()));
return weakReference.Call(*_env, waitForEmpty);
}
// If the peer is already cleaned up, there's nothing to wait for
return 0;
} catch (...) {
Log::Error(Event::Android, "MapRenderer::waitForEmpty failed");
jni::ThrowJavaError(*android::AttachEnv(), std::current_exception());
return 0;
}
}

Expand Down
5 changes: 3 additions & 2 deletions platform/android/MapLibreAndroid/src/cpp/map_renderer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,12 @@ class MapRenderer : public Scheduler {
// From Scheduler. Schedules by using callbacks to the
// JVM to process the mailbox on the right thread.
void schedule(std::function<void()>&& scheduled) override;
void schedule(const void*, std::function<void()>&& fn) override { schedule(std::move(fn)); };

mapbox::base::WeakPtr<Scheduler> makeWeakPtr() override { return weakFactory.makeWeakPtr(); }

// Wait for the queue to be empty
// A timeout of zero results in an unbounded wait
std::size_t waitForEmpty(Milliseconds timeout) override;
void waitForEmpty(const void*) override;

void requestRender();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,6 @@ void queueEvent(MapRendererRunnable runnable) {
this.queueEvent((Runnable) runnable);
}

/// Wait indefinitely for the queue to become empty
public void waitForEmpty() {
waitForEmpty(0);
}

private native void nativeInitialize(MapRenderer self,
float pixelRatio,
String localIdeographFontFamily);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,4 @@ public interface MapRendererScheduler {

@Keep
void waitForEmpty();

@Keep
long waitForEmpty(long timeoutMillis);
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public void queueEvent(Runnable runnable) {
* {@inheritDoc}
*/
@Override
public long waitForEmpty(long timeoutMillis) {
return glSurfaceView.waitForEmpty(timeoutMillis);
public void waitForEmpty() {
glSurfaceView.waitForEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -322,8 +322,8 @@ public void queueEvent(Runnable r) {
* @param timeoutMillis Timeout in milliseconds
* @return Number of queue items remaining
*/
public long waitForEmpty(long timeoutMillis) {
return glThread.waitForEmpty(timeoutMillis);
public void waitForEmpty() {
glThread.waitForEmpty();
}


Expand Down Expand Up @@ -1038,31 +1038,16 @@ public void queueEvent(@NonNull Runnable r) {
* @param timeoutMillis Timeout in milliseconds, zero for indefinite wait
* @return Number of queue items remaining
*/
public int waitForEmpty(long timeoutMillis) {
final long startTime = System.nanoTime();
public void waitForEmpty() {
synchronized (glThreadManager) {
// Wait for the queue to be empty
while (!this.eventQueue.isEmpty()) {
if (timeoutMillis > 0) {
final long elapsedMillis = (System.nanoTime() - startTime) / 1000 / 1000;
if (elapsedMillis < timeoutMillis) {
try {
glThreadManager.wait(timeoutMillis - elapsedMillis);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
} else {
break;
}
} else {
try {
glThreadManager.wait();
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
try {
glThreadManager.wait();
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
return this.eventQueue.size();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ public void queueEvent(Runnable runnable) {
* {@inheritDoc}
*/
@Override
public long waitForEmpty(long timeoutMillis) {
return renderThread.waitForEmpty(timeoutMillis);
public void waitForEmpty() {
renderThread.waitForEmpty();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,31 +141,16 @@ void queueEvent(@NonNull Runnable runnable) {
* @return The number of items remaining in the queue
*/
@UiThread
int waitForEmpty(long timeoutMillis) {
final long startTime = System.nanoTime();
void waitForEmpty() {
synchronized (lock) {
// Wait for the queue to be empty
while (!this.eventQueue.isEmpty()) {
if (timeoutMillis > 0) {
final long elapsedMillis = (System.nanoTime() - startTime) / 1000 / 1000;
if (elapsedMillis < timeoutMillis) {
try {
lock.wait(timeoutMillis - elapsedMillis);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
} else {
break;
}
} else {
try {
lock.wait(0);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
try {
lock.wait(0);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
return this.eventQueue.size();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,9 +443,8 @@ class NativeMapViewTest : AppCenter() {
// no-op
}

override fun waitForEmpty(timeoutMillis: Long): Long {
override fun waitForEmpty() {
// no-op
return 0
}
}
}
13 changes: 5 additions & 8 deletions platform/android/src/run_loop.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,19 +216,16 @@ Milliseconds RunLoop::Impl::processRunnables() {
return timeout;
}

std::size_t RunLoop::Impl::waitForEmpty(Milliseconds timeout) {
const auto startTime = mbgl::util::MonotonicTimer::now();
void RunLoop::Impl::waitForEmpty() {
while (true) {
std::size_t remaining;
{
std::lock_guard<std::mutex> lock(mutex);
remaining = runnables.size();
}

const auto elapsed = mbgl::util::MonotonicTimer::now() - startTime;
const auto elapsedMillis = std::chrono::duration_cast<Milliseconds>(elapsed);
if (remaining == 0 || (Milliseconds::zero() < timeout && timeout <= elapsedMillis)) {
return remaining;
if (remaining == 0) {
return;
}

runLoop->runOnce();
Expand Down Expand Up @@ -257,8 +254,8 @@ void RunLoop::wake() {
impl->wake();
}

std::size_t RunLoop::waitForEmpty(std::chrono::milliseconds timeout) {
return impl->waitForEmpty(timeout);
void RunLoop::waitForEmpty([[maybe_unused]] const void* tag) {
impl->waitForEmpty();
}

void RunLoop::run() {
Expand Down
2 changes: 1 addition & 1 deletion platform/android/src/run_loop_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class RunLoop::Impl {

Milliseconds processRunnables();

std::size_t waitForEmpty(Milliseconds timeout);
void waitForEmpty();

ALooper* loop = nullptr;
RunLoop* runLoop = nullptr;
Expand Down
9 changes: 3 additions & 6 deletions platform/darwin/src/run_loop.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,16 @@ void RunLoop::stop() {
invoke([&] { CFRunLoopStop(CFRunLoopGetCurrent()); });
}

std::size_t RunLoop::waitForEmpty(Milliseconds timeout) {
const auto startTime = mbgl::util::MonotonicTimer::now();
void RunLoop::waitForEmpty([[maybe_unused]] const void* tag) {
while (true) {
std::size_t remaining;
{
std::lock_guard<std::mutex> lock(mutex);
remaining = defaultQueue.size() + highPriorityQueue.size();
}

const auto elapsed = mbgl::util::MonotonicTimer::now() - startTime;
const auto elapsedMillis = std::chrono::duration_cast<std::chrono::milliseconds>(elapsed);
if (remaining == 0 || (Milliseconds::zero() < timeout && timeout <= elapsedMillis)) {
return remaining;
if (remaining == 0) {
return;
}

runOnce();
Expand Down
9 changes: 3 additions & 6 deletions platform/default/src/mbgl/util/run_loop.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,19 +149,16 @@ void RunLoop::stop() {
invoke([&] { uv_unref(impl->holderHandle()); });
}

std::size_t RunLoop::waitForEmpty(Milliseconds timeout) {
const auto startTime = mbgl::util::MonotonicTimer::now();
void RunLoop::waitForEmpty([[maybe_unused]] const void* tag) {
while (true) {
std::size_t remaining;
{
std::lock_guard<std::mutex> lock(mutex);
remaining = defaultQueue.size() + highPriorityQueue.size();
}

const auto elapsed = mbgl::util::MonotonicTimer::now() - startTime;
const auto elapsedMillis = std::chrono::duration_cast<Milliseconds>(elapsed);
if (remaining == 0 || (Milliseconds::zero() < timeout && timeout <= elapsedMillis)) {
return remaining;
if (remaining == 0) {
return;
}

runOnce();
Expand Down
9 changes: 3 additions & 6 deletions platform/qt/src/mbgl/run_loop.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,19 +91,16 @@ void RunLoop::runOnce() {
}
}

std::size_t RunLoop::waitForEmpty(std::chrono::milliseconds timeout) {
const auto startTime = mbgl::util::MonotonicTimer::now();
void RunLoop::waitForEmpty([[maybe_unused]] const void* tag) {
while (true) {
std::size_t remaining;
{
std::lock_guard<std::mutex> lock(mutex);
remaining = defaultQueue.size() + highPriorityQueue.size();
}

const auto elapsed = mbgl::util::MonotonicTimer::now() - startTime;
const auto elapsedMillis = std::chrono::duration_cast<std::chrono::milliseconds>(elapsed);
if (remaining == 0 || timeout <= elapsedMillis) {
return remaining;
if (remaining == 0) {
return;
}

runOnce();
Expand Down
12 changes: 4 additions & 8 deletions platform/qt/src/utils/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,23 +42,19 @@ void Scheduler::processEvents() {
cvEmpty.notify_all();
}

std::size_t Scheduler::waitForEmpty(std::chrono::milliseconds timeout) {
void Scheduler::waitForEmpty([[maybe_unused]] const void* tag) {
MBGL_VERIFY_THREAD(tid);

const auto startTime = mbgl::util::MonotonicTimer::now();
std::unique_lock<std::mutex> lock(m_taskQueueMutex);
const auto isDone = [&] {
return m_taskQueue.empty() && pendingItems == 0;
};

while (!isDone()) {
const auto elapsed = mbgl::util::MonotonicTimer::now() - startTime;
if (timeout <= elapsed || !cvEmpty.wait_for(lock, timeout - elapsed, isDone)) {
assert(isDone());
break;
}
cvEmpty.wait(lock);
}

return m_taskQueue.size() + pendingItems;
assert(m_taskQueue.size() + pendingItems == 0);
}

} // namespace QMapLibre
2 changes: 1 addition & 1 deletion platform/qt/src/utils/scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class Scheduler : public QObject, public mbgl::Scheduler {
// mbgl::Scheduler implementation.
void schedule(std::function<void()>&& function) final;

std::size_t waitForEmpty(std::chrono::milliseconds timeout) override;
void waitForEmpty(const void* tag = nullptr) override;

mapbox::base::WeakPtr<mbgl::Scheduler> makeWeakPtr() override { return weakFactory.makeWeakPtr(); }

Expand Down
4 changes: 2 additions & 2 deletions src/mbgl/annotation/render_annotation_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ namespace mbgl {
using namespace style;

RenderAnnotationSource::RenderAnnotationSource(Immutable<AnnotationSource::Impl> impl_,
std::shared_ptr<Scheduler> threadPool_)
: RenderTileSource(std::move(impl_), std::move(threadPool_)) {
const TaggedScheduler& threadPool_)
: RenderTileSource(std::move(impl_), threadPool_) {
assert(LayerManager::annotationsEnabled);
tilePyramid.setObserver(this);
}
Expand Down
Loading

0 comments on commit 6d7dd8f

Please sign in to comment.