-
Notifications
You must be signed in to change notification settings - Fork 66
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use modern v8::Platform worker threads APIs. #69
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,48 +16,50 @@ using v8::Platform; | |
using v8::Task; | ||
using v8::TracingController; | ||
|
||
static void BackgroundRunner(void* data) { | ||
TaskQueue<Task>* background_tasks = static_cast<TaskQueue<Task>*>(data); | ||
while (std::unique_ptr<Task> task = background_tasks->BlockingPop()) { | ||
namespace { | ||
|
||
static void WorkerThreadMain(void* data) { | ||
TaskQueue<Task>* pending_worker_tasks = static_cast<TaskQueue<Task>*>(data); | ||
while (std::unique_ptr<Task> task = pending_worker_tasks->BlockingPop()) { | ||
task->Run(); | ||
background_tasks->NotifyOfCompletion(); | ||
pending_worker_tasks->NotifyOfCompletion(); | ||
} | ||
} | ||
|
||
BackgroundTaskRunner::BackgroundTaskRunner(int thread_pool_size) { | ||
} // namespace | ||
|
||
WorkerThreadsTaskRunner::WorkerThreadsTaskRunner(int thread_pool_size) { | ||
for (int i = 0; i < thread_pool_size; i++) { | ||
std::unique_ptr<uv_thread_t> t { new uv_thread_t() }; | ||
if (uv_thread_create(t.get(), BackgroundRunner, &background_tasks_) != 0) | ||
if (uv_thread_create(t.get(), WorkerThreadMain, | ||
&pending_worker_tasks_) != 0) { | ||
break; | ||
} | ||
threads_.push_back(std::move(t)); | ||
} | ||
} | ||
|
||
void BackgroundTaskRunner::PostTask(std::unique_ptr<Task> task) { | ||
background_tasks_.Push(std::move(task)); | ||
} | ||
|
||
void BackgroundTaskRunner::PostIdleTask(std::unique_ptr<v8::IdleTask> task) { | ||
UNREACHABLE(); | ||
void WorkerThreadsTaskRunner::PostTask(std::unique_ptr<Task> task) { | ||
pending_worker_tasks_.Push(std::move(task)); | ||
} | ||
|
||
void BackgroundTaskRunner::PostDelayedTask(std::unique_ptr<v8::Task> task, | ||
double delay_in_seconds) { | ||
void WorkerThreadsTaskRunner::PostDelayedTask(std::unique_ptr<v8::Task> task, | ||
double delay_in_seconds) { | ||
UNREACHABLE(); | ||
} | ||
|
||
void BackgroundTaskRunner::BlockingDrain() { | ||
background_tasks_.BlockingDrain(); | ||
void WorkerThreadsTaskRunner::BlockingDrain() { | ||
pending_worker_tasks_.BlockingDrain(); | ||
} | ||
|
||
void BackgroundTaskRunner::Shutdown() { | ||
background_tasks_.Stop(); | ||
void WorkerThreadsTaskRunner::Shutdown() { | ||
pending_worker_tasks_.Stop(); | ||
for (size_t i = 0; i < threads_.size(); i++) { | ||
CHECK_EQ(0, uv_thread_join(threads_[i].get())); | ||
} | ||
} | ||
|
||
size_t BackgroundTaskRunner::NumberOfAvailableBackgroundThreads() const { | ||
int WorkerThreadsTaskRunner::NumberOfWorkerThreads() const { | ||
return threads_.size(); | ||
} | ||
|
||
|
@@ -120,8 +122,8 @@ NodePlatform::NodePlatform(int thread_pool_size, | |
TracingController* controller = new TracingController(); | ||
tracing_controller_.reset(controller); | ||
} | ||
background_task_runner_ = | ||
std::make_shared<BackgroundTaskRunner>(thread_pool_size); | ||
worker_thread_task_runner_ = | ||
std::make_shared<WorkerThreadsTaskRunner>(thread_pool_size); | ||
} | ||
|
||
void NodePlatform::RegisterIsolate(IsolateData* isolate_data, uv_loop_t* loop) { | ||
|
@@ -147,16 +149,16 @@ void NodePlatform::UnregisterIsolate(IsolateData* isolate_data) { | |
} | ||
|
||
void NodePlatform::Shutdown() { | ||
background_task_runner_->Shutdown(); | ||
worker_thread_task_runner_->Shutdown(); | ||
|
||
{ | ||
Mutex::ScopedLock lock(per_isolate_mutex_); | ||
per_isolate_.clear(); | ||
} | ||
} | ||
|
||
size_t NodePlatform::NumberOfAvailableBackgroundThreads() { | ||
return background_task_runner_->NumberOfAvailableBackgroundThreads(); | ||
int NodePlatform::NumberOfWorkerThreads() { | ||
return worker_thread_task_runner_->NumberOfWorkerThreads(); | ||
} | ||
|
||
void PerIsolatePlatformData::RunForegroundTask(std::unique_ptr<Task> task) { | ||
|
@@ -188,15 +190,12 @@ void PerIsolatePlatformData::CancelPendingDelayedTasks() { | |
scheduled_delayed_tasks_.clear(); | ||
} | ||
|
||
void NodePlatform::DrainBackgroundTasks(Isolate* isolate) { | ||
void NodePlatform::DrainTasks(Isolate* isolate) { | ||
std::shared_ptr<PerIsolatePlatformData> per_isolate = ForIsolate(isolate); | ||
|
||
do { | ||
// Right now, there is no way to drain only background tasks associated | ||
// with a specific isolate, so this sometimes does more work than | ||
// necessary. In the long run, that functionality is probably going to | ||
// be available anyway, though. | ||
background_task_runner_->BlockingDrain(); | ||
// Worker tasks aren't associated with an Isolate. | ||
worker_thread_task_runner_->BlockingDrain(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there any reason for updating the comment? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wanted to remove There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can feel free to remove it, but that sentence was more or less a reminder to myself that we might want this, so I was curious if there’s anything inherently wrong with the idea? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There should be worker threads for each core regardless of the number of isolates. Isolates are bound to a single task runner (usually mapped to a single thread), but worker threads are shared so we removed the Isolate* parameter (both Chrome and node didn't use it). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just for context, the original idea was that when an Isolate is being removed (e.g. one for a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Tasks posted to workers should either not use state bound to the Isolate or be cancellable. Isolate::DeInit() makes sure to wait for cancelled tasks to be flagged as cancelled or be done running via CancelAndWait() @ https://cs.chromium.org/chromium/src/v8/src/isolate.cc?type=cs&q=file:isolate.cc+CancelAndWait&sq=package:chromium As such, there's no need to drain tasks before getting rid of an Isolate, I think. It would be a good idea to mention this specifically in v8::Platform, make sure it's true for existing callers, and remove the draining logic in node IMO (but, to be clear, I will not do this -- just looking to clean up the old APIs before transitioning). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks, this was helpful!
I can take care of at least documenting it. :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Indeed, at the moment every task posted by V8 should be cancelable, so V8 can kind of clean up the task queue by itself when the isolate is tearing down. The reason is that tasks should not own the memory they operate on. This invariant avoids synchronization issues between isolate shutdown and task destruction. Therefore the isolate (directly or indirectly) owns the memory of the task, which is why the task has to be cancelled before the isolate goes away. I will add some documentation for this in V8 as well. |
||
} while (per_isolate->FlushForegroundTasksInternal()); | ||
} | ||
|
||
|
@@ -230,11 +229,17 @@ bool PerIsolatePlatformData::FlushForegroundTasksInternal() { | |
return did_work; | ||
} | ||
|
||
void NodePlatform::CallOnBackgroundThread(Task* task, | ||
ExpectedRuntime expected_runtime) { | ||
background_task_runner_->PostTask(std::unique_ptr<Task>(task)); | ||
void NodePlatform::CallOnWorkerThread(std::unique_ptr<v8::Task> task) { | ||
worker_thread_task_runner_->PostTask(std::move(task)); | ||
} | ||
|
||
void NodePlatform::CallDelayedOnWorkerThread(std::unique_ptr<v8::Task> task, | ||
double delay_in_seconds) { | ||
worker_thread_task_runner_->PostDelayedTask(std::move(task), | ||
delay_in_seconds); | ||
} | ||
|
||
|
||
std::shared_ptr<PerIsolatePlatformData> | ||
NodePlatform::ForIsolate(Isolate* isolate) { | ||
Mutex::ScopedLock lock(per_isolate_mutex_); | ||
|
@@ -264,11 +269,6 @@ void NodePlatform::CancelPendingDelayedTasks(v8::Isolate* isolate) { | |
|
||
bool NodePlatform::IdleTasksEnabled(Isolate* isolate) { return false; } | ||
|
||
std::shared_ptr<v8::TaskRunner> | ||
NodePlatform::GetBackgroundTaskRunner(Isolate* isolate) { | ||
return background_task_runner_; | ||
} | ||
|
||
std::shared_ptr<v8::TaskRunner> | ||
NodePlatform::GetForegroundTaskRunner(Isolate* isolate) { | ||
return ForIsolate(isolate); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -89,23 +89,22 @@ class PerIsolatePlatformData : | |
std::vector<DelayedTaskPointer> scheduled_delayed_tasks_; | ||
}; | ||
|
||
// This acts as the single background task runner for all Isolates. | ||
class BackgroundTaskRunner : public v8::TaskRunner { | ||
// This acts as the single worker thread task runner for all Isolates. | ||
class WorkerThreadsTaskRunner { | ||
public: | ||
explicit BackgroundTaskRunner(int thread_pool_size); | ||
explicit WorkerThreadsTaskRunner(int thread_pool_size); | ||
|
||
void PostTask(std::unique_ptr<v8::Task> task) override; | ||
void PostIdleTask(std::unique_ptr<v8::IdleTask> task) override; | ||
void PostTask(std::unique_ptr<v8::Task> task); | ||
void PostDelayedTask(std::unique_ptr<v8::Task> task, | ||
double delay_in_seconds) override; | ||
bool IdleTasksEnabled() override { return false; }; | ||
double delay_in_seconds); | ||
|
||
void BlockingDrain(); | ||
void Shutdown(); | ||
|
||
size_t NumberOfAvailableBackgroundThreads() const; | ||
int NumberOfWorkerThreads() const; | ||
|
||
private: | ||
TaskQueue<v8::Task> background_tasks_; | ||
TaskQueue<v8::Task> pending_worker_tasks_; | ||
std::vector<std::unique_ptr<uv_thread_t>> threads_; | ||
}; | ||
|
||
|
@@ -114,14 +113,15 @@ class NodePlatform : public MultiIsolatePlatform { | |
NodePlatform(int thread_pool_size, v8::TracingController* tracing_controller); | ||
virtual ~NodePlatform() {} | ||
|
||
void DrainBackgroundTasks(v8::Isolate* isolate) override; | ||
void DrainTasks(v8::Isolate* isolate) override; | ||
void CancelPendingDelayedTasks(v8::Isolate* isolate) override; | ||
void Shutdown(); | ||
|
||
// v8::Platform implementation. | ||
size_t NumberOfAvailableBackgroundThreads() override; | ||
void CallOnBackgroundThread(v8::Task* task, | ||
ExpectedRuntime expected_runtime) override; | ||
int NumberOfWorkerThreads() override; | ||
void CallOnWorkerThread(std::unique_ptr<v8::Task> task) override; | ||
void CallDelayedOnWorkerThread(std::unique_ptr<v8::Task> task, | ||
double delay_in_seconds) override; | ||
void CallOnForegroundThread(v8::Isolate* isolate, v8::Task* task) override; | ||
void CallDelayedOnForegroundThread(v8::Isolate* isolate, v8::Task* task, | ||
double delay_in_seconds) override; | ||
|
@@ -135,8 +135,6 @@ class NodePlatform : public MultiIsolatePlatform { | |
void RegisterIsolate(IsolateData* isolate_data, uv_loop_t* loop) override; | ||
void UnregisterIsolate(IsolateData* isolate_data) override; | ||
|
||
std::shared_ptr<v8::TaskRunner> GetBackgroundTaskRunner( | ||
v8::Isolate* isolate) override; | ||
std::shared_ptr<v8::TaskRunner> GetForegroundTaskRunner( | ||
v8::Isolate* isolate) override; | ||
|
||
|
@@ -148,7 +146,7 @@ class NodePlatform : public MultiIsolatePlatform { | |
std::shared_ptr<PerIsolatePlatformData>> per_isolate_; | ||
|
||
std::unique_ptr<v8::TracingController> tracing_controller_; | ||
std::shared_ptr<BackgroundTaskRunner> background_task_runner_; | ||
std::shared_ptr<WorkerThreadsTaskRunner> worker_thread_task_runner_; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this can be a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What do you mean? It is an std::shared_ptr..? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, sorry. I meant, can it be a |
||
}; | ||
|
||
} // namespace node | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to verify: Even though
CallDelayedOnWorkerThread()
is being added to the API, this is still unreachable code atm?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed. This doesn't change the previous logic. Having this be UNREACHABLE() was likely incorrect before and still is (but I'm not going to change that).
In practice the only v8 caller is devtools which I assume doesn't trigger in node so node has been getting away with this UNREACHABLE for now...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's unfortunate that
CallDelayedOnWorkerThread()
was used in V8 for the first time just before we were about to remove the API. I guess we can implement it the same as for foreground tasks if we ever have to.