Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(ext/napi): port NAPI to v8 tasks #21406

Merged
merged 5 commits into from
Dec 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 19 additions & 6 deletions cli/napi/async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ pub struct AsyncWork {
pub complete: napi_async_complete_callback,
}

unsafe impl Send for AsyncWork {}
unsafe impl Sync for AsyncWork {}

#[napi_sym::napi_sym]
fn napi_create_async_work(
_env: *mut Env,
Expand Down Expand Up @@ -61,12 +64,22 @@ fn napi_queue_async_work(
return napi_invalid_arg;
};

let fut = Box::new(move || {
(work.execute)(env_ptr as napi_env, work.data);
// Note: Must be called from the loop thread.
(work.complete)(env_ptr as napi_env, napi_ok, work.data);
});
env.add_async_work(fut);
#[repr(transparent)]
struct SendPtr<T>(*const T);
unsafe impl<T> Send for SendPtr<T> {}
unsafe impl<T> Sync for SendPtr<T> {}
let send_env = SendPtr(env_ptr);

#[inline(always)]
fn do_work(ptr: SendPtr<Env>, work: &AsyncWork) {
// SAFETY: This is a valid async work queue call and it runs on the event loop thread
unsafe {
(work.execute)(ptr.0 as napi_env, work.data);
(work.complete)(ptr.0 as napi_env, napi_ok, work.data);
}
}

env.add_async_work(move || do_work(send_env, work));

napi_ok
}
Expand Down
112 changes: 76 additions & 36 deletions cli/napi/threadsafe_functions.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.

use deno_core::futures::channel::mpsc;
use deno_core::V8CrossThreadTaskSpawner;
use deno_runtime::deno_napi::*;
use once_cell::sync::Lazy;
use std::mem::forget;
use std::ptr::NonNull;
use std::sync::atomic::AtomicUsize;
use std::sync::mpsc::channel;
use std::sync::Arc;

static TS_FN_ID_COUNTER: Lazy<AtomicUsize> = Lazy::new(|| AtomicUsize::new(0));
Expand All @@ -20,7 +21,7 @@ pub struct TsFn {
pub ref_counter: Arc<AtomicUsize>,
finalizer: Option<napi_finalize>,
finalizer_data: *mut c_void,
sender: mpsc::UnboundedSender<PendingNapiAsyncWork>,
sender: V8CrossThreadTaskSpawner,
tsfn_sender: mpsc::UnboundedSender<ThreadSafeFunctionStatus>,
}

Expand Down Expand Up @@ -84,47 +85,86 @@ impl TsFn {

pub fn call(&self, data: *mut c_void, is_blocking: bool) {
let js_func = self.maybe_func.clone();
let (tx, rx) = channel();

#[repr(transparent)]
struct SendPtr<T>(*const T);
unsafe impl<T> Send for SendPtr<T> {}
unsafe impl<T> Sync for SendPtr<T> {}
Comment on lines +89 to +92
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SendPtr from cli/napi/async.rs can be reused here?


let env = SendPtr(self.env);
let context = SendPtr(self.context);
let data = SendPtr(data);

#[inline(always)]
fn spawn(
sender: &V8CrossThreadTaskSpawner,
is_blocking: bool,
f: impl FnOnce(&mut v8::HandleScope) + Send + 'static,
) {
if is_blocking {
sender.spawn_blocking(f);
} else {
sender.spawn(f);
}
}

if let Some(call_js_cb) = self.maybe_call_js_cb {
let context = self.context;
let env = self.env;
let call = Box::new(move || {
let scope = &mut unsafe { (*env).scope() };
match js_func {
Some(func) => {
let func: v8::Local<v8::Value> =
func.open(scope).to_object(scope).unwrap().into();
unsafe {
call_js_cb(env as *mut c_void, func.into(), context, data)
};
if let Some(func) = js_func {
let func = SendPtr(func.into_raw().as_ptr());
#[inline(always)]
fn call(
scope: &mut v8::HandleScope,
call_js_cb: napi_threadsafe_function_call_js,
func: SendPtr<v8::Function>,
env: SendPtr<Env>,
context: SendPtr<c_void>,
data: SendPtr<c_void>,
) {
// SAFETY: This is a valid global from above
let func: v8::Global<v8::Function> = unsafe {
v8::Global::<v8::Function>::from_raw(
scope,
NonNull::new_unchecked(func.0 as _),
)
};
let func: v8::Local<v8::Value> =
func.open(scope).to_object(scope).unwrap().into();
// SAFETY: env is valid for the duration of the callback.
// data lifetime is users responsibility.
unsafe {
call_js_cb(env.0 as _, func.into(), context.0 as _, data.0 as _)
}
None => {
unsafe {
call_js_cb(env as *mut c_void, std::mem::zeroed(), context, data)
};
}
spawn(&self.sender, is_blocking, move |scope| {
call(scope, call_js_cb, func, env, context, data);
});
} else {
#[inline(always)]
fn call(
call_js_cb: napi_threadsafe_function_call_js,
env: SendPtr<Env>,
context: SendPtr<c_void>,
data: SendPtr<c_void>,
) {
// SAFETY: We're calling the provided callback with valid args
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto

unsafe {
call_js_cb(
env.0 as _,
std::mem::zeroed(),
context.0 as _,
data.0 as _,
)
}
}

// Receiver might have been already dropped
let _ = tx.send(());
});
// This call should never fail
self.sender.unbounded_send(call).unwrap();
} else if let Some(_js_func) = js_func {
let call = Box::new(move || {
spawn(&self.sender, is_blocking, move |_| {
call(call_js_cb, env, context, data);
});
}
} else {
spawn(&self.sender, is_blocking, |_| {
// TODO: func.call
// let func = js_func.open(scope);
// Receiver might have been already dropped
let _ = tx.send(());
});
// This call should never fail
self.sender.unbounded_send(call).unwrap();
}

if is_blocking {
let _ = rx.recv();
}
};
}
}

Expand Down
77 changes: 9 additions & 68 deletions ext/napi/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,17 @@ use core::ptr::NonNull;
use deno_core::error::type_error;
use deno_core::error::AnyError;
use deno_core::futures::channel::mpsc;
use deno_core::futures::StreamExt;
use deno_core::op2;
use deno_core::parking_lot::Mutex;
use deno_core::OpState;
use deno_core::V8CrossThreadTaskSpawner;
use std::cell::RefCell;
use std::ffi::CString;
use std::path::Path;
use std::path::PathBuf;
use std::rc::Rc;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use std::task::Poll;
use std::thread_local;

#[cfg(unix)]
Expand Down Expand Up @@ -231,13 +230,11 @@ pub struct napi_node_version {
pub release: *const c_char,
}

pub type PendingNapiAsyncWork = Box<dyn FnOnce()>;
pub trait PendingNapiAsyncWork: FnOnce() + Send + 'static {}
impl<T> PendingNapiAsyncWork for T where T: FnOnce() + Send + 'static {}

pub type ThreadsafeFunctionRefCounters = Vec<(usize, Arc<AtomicUsize>)>;
pub struct NapiState {
// Async tasks.
pub pending_async_work: Vec<PendingNapiAsyncWork>,
pub async_work_sender: mpsc::UnboundedSender<PendingNapiAsyncWork>,
pub async_work_receiver: mpsc::UnboundedReceiver<PendingNapiAsyncWork>,
// Thread safe functions.
pub active_threadsafe_functions: usize,
pub threadsafe_function_receiver:
Expand Down Expand Up @@ -318,7 +315,7 @@ pub struct Env {
pub isolate_ptr: *mut v8::OwnedIsolate,
pub open_handle_scopes: usize,
pub shared: *mut EnvShared,
pub async_work_sender: mpsc::UnboundedSender<PendingNapiAsyncWork>,
pub async_work_sender: V8CrossThreadTaskSpawner,
pub threadsafe_function_sender:
mpsc::UnboundedSender<ThreadSafeFunctionStatus>,
pub cleanup_hooks:
Expand All @@ -336,7 +333,7 @@ impl Env {
isolate_ptr: *mut v8::OwnedIsolate,
context: v8::Global<v8::Context>,
global: v8::Global<v8::Value>,
sender: mpsc::UnboundedSender<PendingNapiAsyncWork>,
sender: V8CrossThreadTaskSpawner,
threadsafe_function_sender: mpsc::UnboundedSender<ThreadSafeFunctionStatus>,
cleanup_hooks: Rc<
RefCell<Vec<(extern "C" fn(*const c_void), *const c_void)>>,
Expand Down Expand Up @@ -372,8 +369,8 @@ impl Env {
unsafe { &mut *self.shared }
}

pub fn add_async_work(&mut self, async_work: PendingNapiAsyncWork) {
self.async_work_sender.unbounded_send(async_work).unwrap();
pub fn add_async_work(&mut self, async_work: impl FnOnce() + Send + 'static) {
self.async_work_sender.spawn(|_| async_work());
}

#[inline]
Expand Down Expand Up @@ -418,74 +415,18 @@ deno_core::extension!(deno_napi,
op_napi_open<P>
],
state = |state| {
let (async_work_sender, async_work_receiver) =
mpsc::unbounded::<PendingNapiAsyncWork>();
let (threadsafe_function_sender, threadsafe_function_receiver) =
mpsc::unbounded::<ThreadSafeFunctionStatus>();
state.put(NapiState {
pending_async_work: Vec::new(),
async_work_sender,
async_work_receiver,
threadsafe_function_sender,
threadsafe_function_receiver,
active_threadsafe_functions: 0,
env_cleanup_hooks: Rc::new(RefCell::new(vec![])),
tsfn_ref_counters: Arc::new(Mutex::new(vec![])),
});
},
event_loop_middleware = event_loop_middleware,
);

fn event_loop_middleware(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice

op_state_rc: Rc<RefCell<OpState>>,
cx: &mut std::task::Context,
) -> bool {
// `work` can call back into the runtime. It can also schedule an async task
// but we don't know that now. We need to make the runtime re-poll to make
// sure no pending NAPI tasks exist.
let mut maybe_scheduling = false;

{
let mut op_state = op_state_rc.borrow_mut();
let napi_state = op_state.borrow_mut::<NapiState>();

while let Poll::Ready(Some(async_work_fut)) =
napi_state.async_work_receiver.poll_next_unpin(cx)
{
napi_state.pending_async_work.push(async_work_fut);
}

if napi_state.active_threadsafe_functions > 0 {
maybe_scheduling = true;
}

let tsfn_ref_counters = napi_state.tsfn_ref_counters.lock().clone();
for (_id, counter) in tsfn_ref_counters.iter() {
if counter.load(std::sync::atomic::Ordering::SeqCst) > 0 {
maybe_scheduling = true;
break;
}
}
}

loop {
let maybe_work = {
let mut op_state = op_state_rc.borrow_mut();
let napi_state = op_state.borrow_mut::<NapiState>();
napi_state.pending_async_work.pop()
};

if let Some(work) = maybe_work {
work();
maybe_scheduling = true;
} else {
break;
}
}

maybe_scheduling
}

pub trait NapiPermissions {
fn check(&mut self, path: Option<&Path>)
-> std::result::Result<(), AnyError>;
Expand Down Expand Up @@ -557,7 +498,7 @@ where
let napi_state = op_state.borrow::<NapiState>();
let isolate_ptr = op_state.borrow::<*mut v8::OwnedIsolate>();
(
napi_state.async_work_sender.clone(),
op_state.borrow::<V8CrossThreadTaskSpawner>().clone(),
napi_state.threadsafe_function_sender.clone(),
*isolate_ptr,
napi_state.env_cleanup_hooks.clone(),
Expand Down