-
Notifications
You must be signed in to change notification settings - Fork 5.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
perf(ext/napi): port NAPI to v8 tasks #21406
Merged
Merged
Changes from all commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,11 +1,12 @@ | ||
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. | ||
|
||
use deno_core::futures::channel::mpsc; | ||
use deno_core::V8CrossThreadTaskSpawner; | ||
use deno_runtime::deno_napi::*; | ||
use once_cell::sync::Lazy; | ||
use std::mem::forget; | ||
use std::ptr::NonNull; | ||
use std::sync::atomic::AtomicUsize; | ||
use std::sync::mpsc::channel; | ||
use std::sync::Arc; | ||
|
||
static TS_FN_ID_COUNTER: Lazy<AtomicUsize> = Lazy::new(|| AtomicUsize::new(0)); | ||
|
@@ -20,7 +21,7 @@ pub struct TsFn { | |
pub ref_counter: Arc<AtomicUsize>, | ||
finalizer: Option<napi_finalize>, | ||
finalizer_data: *mut c_void, | ||
sender: mpsc::UnboundedSender<PendingNapiAsyncWork>, | ||
sender: V8CrossThreadTaskSpawner, | ||
tsfn_sender: mpsc::UnboundedSender<ThreadSafeFunctionStatus>, | ||
} | ||
|
||
|
@@ -84,47 +85,86 @@ impl TsFn { | |
|
||
pub fn call(&self, data: *mut c_void, is_blocking: bool) { | ||
let js_func = self.maybe_func.clone(); | ||
let (tx, rx) = channel(); | ||
|
||
#[repr(transparent)] | ||
struct SendPtr<T>(*const T); | ||
unsafe impl<T> Send for SendPtr<T> {} | ||
unsafe impl<T> Sync for SendPtr<T> {} | ||
|
||
let env = SendPtr(self.env); | ||
let context = SendPtr(self.context); | ||
let data = SendPtr(data); | ||
|
||
#[inline(always)] | ||
fn spawn( | ||
sender: &V8CrossThreadTaskSpawner, | ||
is_blocking: bool, | ||
f: impl FnOnce(&mut v8::HandleScope) + Send + 'static, | ||
) { | ||
if is_blocking { | ||
sender.spawn_blocking(f); | ||
} else { | ||
sender.spawn(f); | ||
} | ||
} | ||
|
||
if let Some(call_js_cb) = self.maybe_call_js_cb { | ||
let context = self.context; | ||
let env = self.env; | ||
let call = Box::new(move || { | ||
let scope = &mut unsafe { (*env).scope() }; | ||
match js_func { | ||
Some(func) => { | ||
let func: v8::Local<v8::Value> = | ||
func.open(scope).to_object(scope).unwrap().into(); | ||
unsafe { | ||
call_js_cb(env as *mut c_void, func.into(), context, data) | ||
}; | ||
if let Some(func) = js_func { | ||
let func = SendPtr(func.into_raw().as_ptr()); | ||
#[inline(always)] | ||
fn call( | ||
scope: &mut v8::HandleScope, | ||
call_js_cb: napi_threadsafe_function_call_js, | ||
func: SendPtr<v8::Function>, | ||
env: SendPtr<Env>, | ||
context: SendPtr<c_void>, | ||
data: SendPtr<c_void>, | ||
) { | ||
// SAFETY: This is a valid global from above | ||
let func: v8::Global<v8::Function> = unsafe { | ||
v8::Global::<v8::Function>::from_raw( | ||
scope, | ||
NonNull::new_unchecked(func.0 as _), | ||
) | ||
}; | ||
let func: v8::Local<v8::Value> = | ||
func.open(scope).to_object(scope).unwrap().into(); | ||
// SAFETY: env is valid for the duration of the callback. | ||
// data lifetime is users responsibility. | ||
unsafe { | ||
call_js_cb(env.0 as _, func.into(), context.0 as _, data.0 as _) | ||
} | ||
None => { | ||
unsafe { | ||
call_js_cb(env as *mut c_void, std::mem::zeroed(), context, data) | ||
}; | ||
} | ||
spawn(&self.sender, is_blocking, move |scope| { | ||
call(scope, call_js_cb, func, env, context, data); | ||
}); | ||
} else { | ||
#[inline(always)] | ||
fn call( | ||
call_js_cb: napi_threadsafe_function_call_js, | ||
env: SendPtr<Env>, | ||
context: SendPtr<c_void>, | ||
data: SendPtr<c_void>, | ||
) { | ||
// SAFETY: We're calling the provided callback with valid args | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ditto |
||
unsafe { | ||
call_js_cb( | ||
env.0 as _, | ||
std::mem::zeroed(), | ||
context.0 as _, | ||
data.0 as _, | ||
) | ||
} | ||
} | ||
|
||
// Receiver might have been already dropped | ||
let _ = tx.send(()); | ||
}); | ||
// This call should never fail | ||
self.sender.unbounded_send(call).unwrap(); | ||
} else if let Some(_js_func) = js_func { | ||
let call = Box::new(move || { | ||
spawn(&self.sender, is_blocking, move |_| { | ||
call(call_js_cb, env, context, data); | ||
}); | ||
} | ||
} else { | ||
spawn(&self.sender, is_blocking, |_| { | ||
// TODO: func.call | ||
// let func = js_func.open(scope); | ||
// Receiver might have been already dropped | ||
let _ = tx.send(()); | ||
}); | ||
// This call should never fail | ||
self.sender.unbounded_send(call).unwrap(); | ||
} | ||
|
||
if is_blocking { | ||
let _ = rx.recv(); | ||
} | ||
}; | ||
} | ||
} | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,18 +9,17 @@ use core::ptr::NonNull; | |
use deno_core::error::type_error; | ||
use deno_core::error::AnyError; | ||
use deno_core::futures::channel::mpsc; | ||
use deno_core::futures::StreamExt; | ||
use deno_core::op2; | ||
use deno_core::parking_lot::Mutex; | ||
use deno_core::OpState; | ||
use deno_core::V8CrossThreadTaskSpawner; | ||
use std::cell::RefCell; | ||
use std::ffi::CString; | ||
use std::path::Path; | ||
use std::path::PathBuf; | ||
use std::rc::Rc; | ||
use std::sync::atomic::AtomicUsize; | ||
use std::sync::Arc; | ||
use std::task::Poll; | ||
use std::thread_local; | ||
|
||
#[cfg(unix)] | ||
|
@@ -231,13 +230,11 @@ pub struct napi_node_version { | |
pub release: *const c_char, | ||
} | ||
|
||
pub type PendingNapiAsyncWork = Box<dyn FnOnce()>; | ||
pub trait PendingNapiAsyncWork: FnOnce() + Send + 'static {} | ||
impl<T> PendingNapiAsyncWork for T where T: FnOnce() + Send + 'static {} | ||
|
||
pub type ThreadsafeFunctionRefCounters = Vec<(usize, Arc<AtomicUsize>)>; | ||
pub struct NapiState { | ||
// Async tasks. | ||
pub pending_async_work: Vec<PendingNapiAsyncWork>, | ||
pub async_work_sender: mpsc::UnboundedSender<PendingNapiAsyncWork>, | ||
pub async_work_receiver: mpsc::UnboundedReceiver<PendingNapiAsyncWork>, | ||
// Thread safe functions. | ||
pub active_threadsafe_functions: usize, | ||
pub threadsafe_function_receiver: | ||
|
@@ -318,7 +315,7 @@ pub struct Env { | |
pub isolate_ptr: *mut v8::OwnedIsolate, | ||
pub open_handle_scopes: usize, | ||
pub shared: *mut EnvShared, | ||
pub async_work_sender: mpsc::UnboundedSender<PendingNapiAsyncWork>, | ||
pub async_work_sender: V8CrossThreadTaskSpawner, | ||
pub threadsafe_function_sender: | ||
mpsc::UnboundedSender<ThreadSafeFunctionStatus>, | ||
pub cleanup_hooks: | ||
|
@@ -336,7 +333,7 @@ impl Env { | |
isolate_ptr: *mut v8::OwnedIsolate, | ||
context: v8::Global<v8::Context>, | ||
global: v8::Global<v8::Value>, | ||
sender: mpsc::UnboundedSender<PendingNapiAsyncWork>, | ||
sender: V8CrossThreadTaskSpawner, | ||
threadsafe_function_sender: mpsc::UnboundedSender<ThreadSafeFunctionStatus>, | ||
cleanup_hooks: Rc< | ||
RefCell<Vec<(extern "C" fn(*const c_void), *const c_void)>>, | ||
|
@@ -372,8 +369,8 @@ impl Env { | |
unsafe { &mut *self.shared } | ||
} | ||
|
||
pub fn add_async_work(&mut self, async_work: PendingNapiAsyncWork) { | ||
self.async_work_sender.unbounded_send(async_work).unwrap(); | ||
pub fn add_async_work(&mut self, async_work: impl FnOnce() + Send + 'static) { | ||
self.async_work_sender.spawn(|_| async_work()); | ||
} | ||
|
||
#[inline] | ||
|
@@ -418,74 +415,18 @@ deno_core::extension!(deno_napi, | |
op_napi_open<P> | ||
], | ||
state = |state| { | ||
let (async_work_sender, async_work_receiver) = | ||
mpsc::unbounded::<PendingNapiAsyncWork>(); | ||
let (threadsafe_function_sender, threadsafe_function_receiver) = | ||
mpsc::unbounded::<ThreadSafeFunctionStatus>(); | ||
state.put(NapiState { | ||
pending_async_work: Vec::new(), | ||
async_work_sender, | ||
async_work_receiver, | ||
threadsafe_function_sender, | ||
threadsafe_function_receiver, | ||
active_threadsafe_functions: 0, | ||
env_cleanup_hooks: Rc::new(RefCell::new(vec![])), | ||
tsfn_ref_counters: Arc::new(Mutex::new(vec![])), | ||
}); | ||
}, | ||
event_loop_middleware = event_loop_middleware, | ||
); | ||
|
||
fn event_loop_middleware( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice |
||
op_state_rc: Rc<RefCell<OpState>>, | ||
cx: &mut std::task::Context, | ||
) -> bool { | ||
// `work` can call back into the runtime. It can also schedule an async task | ||
// but we don't know that now. We need to make the runtime re-poll to make | ||
// sure no pending NAPI tasks exist. | ||
let mut maybe_scheduling = false; | ||
|
||
{ | ||
let mut op_state = op_state_rc.borrow_mut(); | ||
let napi_state = op_state.borrow_mut::<NapiState>(); | ||
|
||
while let Poll::Ready(Some(async_work_fut)) = | ||
napi_state.async_work_receiver.poll_next_unpin(cx) | ||
{ | ||
napi_state.pending_async_work.push(async_work_fut); | ||
} | ||
|
||
if napi_state.active_threadsafe_functions > 0 { | ||
maybe_scheduling = true; | ||
} | ||
|
||
let tsfn_ref_counters = napi_state.tsfn_ref_counters.lock().clone(); | ||
for (_id, counter) in tsfn_ref_counters.iter() { | ||
if counter.load(std::sync::atomic::Ordering::SeqCst) > 0 { | ||
maybe_scheduling = true; | ||
break; | ||
} | ||
} | ||
} | ||
|
||
loop { | ||
let maybe_work = { | ||
let mut op_state = op_state_rc.borrow_mut(); | ||
let napi_state = op_state.borrow_mut::<NapiState>(); | ||
napi_state.pending_async_work.pop() | ||
}; | ||
|
||
if let Some(work) = maybe_work { | ||
work(); | ||
maybe_scheduling = true; | ||
} else { | ||
break; | ||
} | ||
} | ||
|
||
maybe_scheduling | ||
} | ||
|
||
pub trait NapiPermissions { | ||
fn check(&mut self, path: Option<&Path>) | ||
-> std::result::Result<(), AnyError>; | ||
|
@@ -557,7 +498,7 @@ where | |
let napi_state = op_state.borrow::<NapiState>(); | ||
let isolate_ptr = op_state.borrow::<*mut v8::OwnedIsolate>(); | ||
( | ||
napi_state.async_work_sender.clone(), | ||
op_state.borrow::<V8CrossThreadTaskSpawner>().clone(), | ||
napi_state.threadsafe_function_sender.clone(), | ||
*isolate_ptr, | ||
napi_state.env_cleanup_hooks.clone(), | ||
|
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SendPtr from cli/napi/async.rs can be reused here?