Skip to content

Commit

Permalink
Add TaggedScheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
mwilsnd committed Jun 5, 2024
1 parent d3d29cb commit 4e71b38
Show file tree
Hide file tree
Showing 67 changed files with 395 additions and 302 deletions.
7 changes: 7 additions & 0 deletions include/mbgl/actor/actor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

namespace mbgl {

class TaggedScheduler;

/**
An `Actor<O>` is an owning reference to an asynchronous object of type `O`:
an "actor". Communication with an actor happens via message passing: you send
Expand Down Expand Up @@ -60,6 +62,11 @@ class Actor {
template <class... Args>
Actor(Scheduler& scheduler, Args&&... args)
: target(scheduler, parent, std::forward<Args>(args)...) {}

template <class... Args>
Actor(const TaggedScheduler& scheduler, Args&&... args)
: retainer(scheduler.get()),
target(scheduler, parent, std::forward<Args>(args)...) {}

template <class... Args>
Actor(std::shared_ptr<Scheduler> scheduler, Args&&... args)
Expand Down
18 changes: 17 additions & 1 deletion include/mbgl/actor/established_actor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,18 @@ class EstablishedActor {
class... Args,
typename std::enable_if_t<std::is_constructible_v<U, Args...> ||
std::is_constructible_v<U, ActorRef<U>, Args...>>* = nullptr>
EstablishedActor(Scheduler& scheduler, AspiringActor<Object>& parent_, Args&&... args)
EstablishedActor(Scheduler& scheduler, AspiringActor<Object>& parent_, Args&&... args)
: parent(parent_) {
emplaceObject(std::forward<Args>(args)...);
parent.mailbox->open(scheduler);
}

// Construct the Object from a parameter pack `args` (i.e. `Object(args...)`)
template <typename U = Object,
class... Args,
typename std::enable_if_t<std::is_constructible_v<U, Args...> ||
std::is_constructible_v<U, ActorRef<U>, Args...>>* = nullptr>
EstablishedActor(const TaggedScheduler& scheduler, AspiringActor<Object>& parent_, Args&&... args)
: parent(parent_) {
emplaceObject(std::forward<Args>(args)...);
parent.mailbox->open(scheduler);
Expand All @@ -48,6 +59,11 @@ class EstablishedActor {
parent.mailbox->open(scheduler);
}

template <class ArgsTuple, std::size_t ArgCount = std::tuple_size<std::decay_t<ArgsTuple>>::value>
EstablishedActor(const TaggedScheduler& scheduler, AspiringActor<Object>& parent_, ArgsTuple&& args) {
EstablishedActor(*scheduler.get(), parent_, std::forward<ArgsTuple>(args));
}

EstablishedActor(const EstablishedActor&) = delete;

~EstablishedActor() {
Expand Down
6 changes: 5 additions & 1 deletion include/mbgl/actor/mailbox.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <queue>

#include <mapbox/std/weak.hpp>
#include <mbgl/actor/scheduler.hpp>

namespace mbgl {

Expand All @@ -21,11 +22,13 @@ class Mailbox : public std::enable_shared_from_this<Mailbox> {
Mailbox();

Mailbox(Scheduler&);
Mailbox(const TaggedScheduler&);

/// Attach the given scheduler to this mailbox and begin processing messages
/// sent to it. The mailbox must be a "holding" mailbox, as created by the
/// default constructor Mailbox().
void open(Scheduler& scheduler_);
void open(const TaggedScheduler& scheduler_);
void open(Scheduler&);
void close();

// Indicate this mailbox will no longer be checked for messages
Expand All @@ -46,6 +49,7 @@ class Mailbox : public std::enable_shared_from_this<Mailbox> {
Abandoned
};

const void* schedulerTag{nullptr};
mapbox::base::WeakPtr<Scheduler> weakScheduler;

std::recursive_mutex receivingMutex;
Expand Down
40 changes: 34 additions & 6 deletions include/mbgl/actor/scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,15 @@ class Scheduler {

/// Enqueues a function for execution.
virtual void schedule(std::function<void()>&&) = 0;
virtual void schedule(const void*, std::function<void()>&&) = 0;

/// Makes a weak pointer to this Scheduler.
virtual mapbox::base::WeakPtr<Scheduler> makeWeakPtr() = 0;
/// Enqueues a function for execution on the render thread.
virtual void runOnRenderThread(std::function<void()>&&) {};
virtual void runRenderJobs() {}

/// Enqueues a function for execution on the render thread owned by the given tag.
virtual void runOnRenderThread(const void*, std::function<void()>&&) {}
/// Run render thread jobs for the given tag address
/// @param closeQueue Runs all render jobs and then removes the internal queue.
virtual void runRenderJobs(const void*, [[maybe_unused]] bool closeQueue = false) {}
/// Returns a closure wrapping the given one.
///
/// When the returned closure is invoked for the first time, it schedules
Expand All @@ -69,8 +72,7 @@ class Scheduler {

/// Wait until there's nothing pending or in process
/// Must not be called from a task provided to this scheduler.
/// @param timeout Time to wait, or zero to wait forever.
virtual std::size_t waitForEmpty(Milliseconds timeout = Milliseconds{0}) = 0;
virtual void waitForEmpty(const void* tag = nullptr) = 0;

/// Set/Get the current Scheduler for this thread
static Scheduler* GetCurrent();
Expand Down Expand Up @@ -116,4 +118,30 @@ class Scheduler {
std::function<void(const std::exception_ptr)> handler;
};

/// @brief A TaggedScheduler pairs a scheduler with a memory address. Tasklets submitted via a TaggedScheduler
/// are bucketed with the tag address to enable queries on tasks related to that tag. This allows multiple map
/// instances to all use the same scheduler and await processing of all their tasks prior to map deletion.
class TaggedScheduler {
public:
TaggedScheduler() = delete;
TaggedScheduler(std::shared_ptr<Scheduler> scheduler_, const void* tagAddr_)
: scheduler(std::move(scheduler_)),
tagAddr(tagAddr_) {}
TaggedScheduler(const TaggedScheduler&) = default;

/// @brief Get the wrapped scheduler
/// @return
const std::shared_ptr<Scheduler>& get() const noexcept { return scheduler; }
const void* tag() const noexcept { return tagAddr; }

void schedule(std::function<void()>&& fn) { scheduler->schedule(tagAddr, std::move(fn)); }
void runOnRenderThread(std::function<void()>&& fn) { scheduler->runOnRenderThread(tagAddr, std::move(fn)); }
void runRenderJobs(bool closeQueue = false) { scheduler->runRenderJobs(tagAddr, closeQueue); }
void waitForEmpty() const noexcept { scheduler->waitForEmpty(tagAddr); }

private:
std::shared_ptr<Scheduler> scheduler;
const void* tagAddr;
};

} // namespace mbgl
5 changes: 1 addition & 4 deletions include/mbgl/gfx/context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ using VertexAttributeArrayPtr = std::shared_ptr<VertexAttributeArray>;
class Context {
protected:
Context(uint32_t maximumVertexBindingCount_)
: maximumVertexBindingCount(maximumVertexBindingCount_),
backgroundScheduler(Scheduler::GetBackground()) {}
: maximumVertexBindingCount(maximumVertexBindingCount_) {}

public:
static constexpr const uint32_t minimumRequiredVertexBindingCount = 8;
Expand Down Expand Up @@ -174,8 +173,6 @@ class Context {
virtual std::unique_ptr<DrawScopeResource> createDrawScopeResource() = 0;

gfx::RenderingStats stats;

std::shared_ptr<Scheduler> backgroundScheduler;
};

} // namespace gfx
Expand Down
6 changes: 6 additions & 0 deletions include/mbgl/gfx/renderer_backend.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <mbgl/actor/scheduler.hpp>
#include <mbgl/util/util.hpp>

#include <memory>
Expand All @@ -8,6 +9,7 @@
namespace mbgl {

class ProgramParameters;
class Map;

namespace gfx {

Expand All @@ -31,6 +33,9 @@ class RendererBackend {
RendererBackend(const RendererBackend&) = delete;
RendererBackend& operator=(const RendererBackend&) = delete;

// Return the background thread pool assigned to this backend
TaggedScheduler& getThreadPool() noexcept { return threadPool; }

/// Returns the device's context.
Context& getContext();

Expand Down Expand Up @@ -70,6 +75,7 @@ class RendererBackend {
std::unique_ptr<Context> context;
const ContextMode contextMode;
std::once_flag initialized;
TaggedScheduler threadPool;

friend class BackendScope;
};
Expand Down
2 changes: 1 addition & 1 deletion include/mbgl/mtl/renderer_backend.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ using FramebufferID = uint32_t;

class RendererBackend : public gfx::RendererBackend {
public:
RendererBackend(gfx::ContextMode);
RendererBackend(gfx::ContextMode, mbgl::Map*);
~RendererBackend() override;

/// Called prior to rendering to update the internally assumed MetalMetal state.
Expand Down
3 changes: 2 additions & 1 deletion include/mbgl/util/run_loop.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,10 @@ class RunLoop : public Scheduler, private util::noncopyable {
}

void schedule(std::function<void()>&& fn) override { invoke(std::move(fn)); }
void schedule(const void*, std::function<void()>&& fn) override { schedule(std::move(fn)); }
::mapbox::base::WeakPtr<Scheduler> makeWeakPtr() override { return weakFactory.makeWeakPtr(); }

std::size_t waitForEmpty(Milliseconds timeout) override;
void waitForEmpty(const void* tag = nullptr) override;

class Impl;

Expand Down
9 changes: 3 additions & 6 deletions platform/android/MapLibreAndroid/src/cpp/map_renderer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,20 +84,17 @@ void MapRenderer::schedule(std::function<void()>&& scheduled) {
}
}

std::size_t MapRenderer::waitForEmpty(Milliseconds timeout) {
void MapRenderer::waitForEmpty([[maybe_unused]] const void* tag) {
try {
android::UniqueEnv _env = android::AttachEnv();
static auto& javaClass = jni::Class<MapRenderer>::Singleton(*_env);
static auto waitForEmpty = javaClass.GetMethod<jni::jint(jni::jlong)>(*_env, "waitForEmpty");
static auto waitForEmpty = javaClass.GetMethod<void()>(*_env, "waitForEmpty");
if (auto weakReference = javaPeer.get(*_env)) {
return weakReference.Call(*_env, waitForEmpty, static_cast<int64_t>(timeout.count()));
return weakReference.Call(*_env, waitForEmpty);
}
// If the peer is already cleaned up, there's nothing to wait for
return 0;
} catch (...) {
Log::Error(Event::Android, "MapRenderer::waitForEmpty failed");
jni::ThrowJavaError(*android::AttachEnv(), std::current_exception());
return 0;
}
}

Expand Down
5 changes: 3 additions & 2 deletions platform/android/MapLibreAndroid/src/cpp/map_renderer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,12 @@ class MapRenderer : public Scheduler {
// From Scheduler. Schedules by using callbacks to the
// JVM to process the mailbox on the right thread.
void schedule(std::function<void()>&& scheduled) override;
void schedule(const void*, std::function<void()>&& fn) override { schedule(std::move(fn)); };

mapbox::base::WeakPtr<Scheduler> makeWeakPtr() override { return weakFactory.makeWeakPtr(); }

// Wait for the queue to be empty
// A timeout of zero results in an unbounded wait
std::size_t waitForEmpty(Milliseconds timeout) override;
void waitForEmpty(const void*) override;

void requestRender();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,6 @@ void queueEvent(MapRendererRunnable runnable) {
this.queueEvent((Runnable) runnable);
}

/// Wait indefinitely for the queue to become empty
public void waitForEmpty() {
waitForEmpty(0);
}

private native void nativeInitialize(MapRenderer self,
float pixelRatio,
String localIdeographFontFamily);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,4 @@ public interface MapRendererScheduler {

@Keep
void waitForEmpty();

@Keep
long waitForEmpty(long timeoutMillis);
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public void queueEvent(Runnable runnable) {
* {@inheritDoc}
*/
@Override
public long waitForEmpty(long timeoutMillis) {
return glSurfaceView.waitForEmpty(timeoutMillis);
public void waitForEmpty() {
glSurfaceView.waitForEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -322,8 +322,8 @@ public void queueEvent(Runnable r) {
* @param timeoutMillis Timeout in milliseconds
* @return Number of queue items remaining
*/
public long waitForEmpty(long timeoutMillis) {
return glThread.waitForEmpty(timeoutMillis);
public void waitForEmpty() {
glThread.waitForEmpty();
}


Expand Down Expand Up @@ -1038,31 +1038,16 @@ public void queueEvent(@NonNull Runnable r) {
* @param timeoutMillis Timeout in milliseconds, zero for indefinite wait
* @return Number of queue items remaining
*/
public int waitForEmpty(long timeoutMillis) {
final long startTime = System.nanoTime();
public void waitForEmpty() {
synchronized (glThreadManager) {
// Wait for the queue to be empty
while (!this.eventQueue.isEmpty()) {
if (timeoutMillis > 0) {
final long elapsedMillis = (System.nanoTime() - startTime) / 1000 / 1000;
if (elapsedMillis < timeoutMillis) {
try {
glThreadManager.wait(timeoutMillis - elapsedMillis);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
} else {
break;
}
} else {
try {
glThreadManager.wait();
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
try {
glThreadManager.wait();
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
return this.eventQueue.size();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ public void queueEvent(Runnable runnable) {
* {@inheritDoc}
*/
@Override
public long waitForEmpty(long timeoutMillis) {
return renderThread.waitForEmpty(timeoutMillis);
public void waitForEmpty() {
renderThread.waitForEmpty();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,31 +141,16 @@ void queueEvent(@NonNull Runnable runnable) {
* @return The number of items remaining in the queue
*/
@UiThread
int waitForEmpty(long timeoutMillis) {
final long startTime = System.nanoTime();
void waitForEmpty() {
synchronized (lock) {
// Wait for the queue to be empty
while (!this.eventQueue.isEmpty()) {
if (timeoutMillis > 0) {
final long elapsedMillis = (System.nanoTime() - startTime) / 1000 / 1000;
if (elapsedMillis < timeoutMillis) {
try {
lock.wait(timeoutMillis - elapsedMillis);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
} else {
break;
}
} else {
try {
lock.wait(0);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
try {
lock.wait(0);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
return this.eventQueue.size();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,9 +443,8 @@ class NativeMapViewTest : AppCenter() {
// no-op
}

override fun waitForEmpty(timeoutMillis: Long): Long {
override fun waitForEmpty() {
// no-op
return 0
}
}
}
Loading

0 comments on commit 4e71b38

Please sign in to comment.