Skip to content

Commit

Permalink
perf(ext/napi): port NAPI to v8 tasks (#21406)
Browse files Browse the repository at this point in the history
Part 2 of removing middleware.

This is somewhat awkward because `V8CrossThreadTaskSpawner` requires
tasks to be `Send`, but NAPI makes heavy use of `!Send` pointers. In
addition, Rust causes a closure to be `!Send` if you pull a `!Send`
value out of a struct.

---------

Signed-off-by: Matt Mastracci <[email protected]>
Co-authored-by: Divy Srivastava <[email protected]>
  • Loading branch information
mmastrac and littledivy authored Dec 11, 2023
1 parent a272bc1 commit d13e45f
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 110 deletions.
25 changes: 19 additions & 6 deletions cli/napi/async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ pub struct AsyncWork {
pub complete: napi_async_complete_callback,
}

unsafe impl Send for AsyncWork {}
unsafe impl Sync for AsyncWork {}

#[napi_sym::napi_sym]
fn napi_create_async_work(
_env: *mut Env,
Expand Down Expand Up @@ -61,12 +64,22 @@ fn napi_queue_async_work(
return napi_invalid_arg;
};

let fut = Box::new(move || {
(work.execute)(env_ptr as napi_env, work.data);
// Note: Must be called from the loop thread.
(work.complete)(env_ptr as napi_env, napi_ok, work.data);
});
env.add_async_work(fut);
#[repr(transparent)]
struct SendPtr<T>(*const T);
unsafe impl<T> Send for SendPtr<T> {}
unsafe impl<T> Sync for SendPtr<T> {}
let send_env = SendPtr(env_ptr);

#[inline(always)]
fn do_work(ptr: SendPtr<Env>, work: &AsyncWork) {
// SAFETY: This is a valid async work queue call and it runs on the event loop thread
unsafe {
(work.execute)(ptr.0 as napi_env, work.data);
(work.complete)(ptr.0 as napi_env, napi_ok, work.data);
}
}

env.add_async_work(move || do_work(send_env, work));

napi_ok
}
Expand Down
112 changes: 76 additions & 36 deletions cli/napi/threadsafe_functions.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.

use deno_core::futures::channel::mpsc;
use deno_core::V8CrossThreadTaskSpawner;
use deno_runtime::deno_napi::*;
use once_cell::sync::Lazy;
use std::mem::forget;
use std::ptr::NonNull;
use std::sync::atomic::AtomicUsize;
use std::sync::mpsc::channel;
use std::sync::Arc;

static TS_FN_ID_COUNTER: Lazy<AtomicUsize> = Lazy::new(|| AtomicUsize::new(0));
Expand All @@ -20,7 +21,7 @@ pub struct TsFn {
pub ref_counter: Arc<AtomicUsize>,
finalizer: Option<napi_finalize>,
finalizer_data: *mut c_void,
sender: mpsc::UnboundedSender<PendingNapiAsyncWork>,
sender: V8CrossThreadTaskSpawner,
tsfn_sender: mpsc::UnboundedSender<ThreadSafeFunctionStatus>,
}

Expand Down Expand Up @@ -84,47 +85,86 @@ impl TsFn {

pub fn call(&self, data: *mut c_void, is_blocking: bool) {
let js_func = self.maybe_func.clone();
let (tx, rx) = channel();

#[repr(transparent)]
struct SendPtr<T>(*const T);
unsafe impl<T> Send for SendPtr<T> {}
unsafe impl<T> Sync for SendPtr<T> {}

let env = SendPtr(self.env);
let context = SendPtr(self.context);
let data = SendPtr(data);

#[inline(always)]
fn spawn(
sender: &V8CrossThreadTaskSpawner,
is_blocking: bool,
f: impl FnOnce(&mut v8::HandleScope) + Send + 'static,
) {
if is_blocking {
sender.spawn_blocking(f);
} else {
sender.spawn(f);
}
}

if let Some(call_js_cb) = self.maybe_call_js_cb {
let context = self.context;
let env = self.env;
let call = Box::new(move || {
let scope = &mut unsafe { (*env).scope() };
match js_func {
Some(func) => {
let func: v8::Local<v8::Value> =
func.open(scope).to_object(scope).unwrap().into();
unsafe {
call_js_cb(env as *mut c_void, func.into(), context, data)
};
if let Some(func) = js_func {
let func = SendPtr(func.into_raw().as_ptr());
#[inline(always)]
fn call(
scope: &mut v8::HandleScope,
call_js_cb: napi_threadsafe_function_call_js,
func: SendPtr<v8::Function>,
env: SendPtr<Env>,
context: SendPtr<c_void>,
data: SendPtr<c_void>,
) {
// SAFETY: This is a valid global from above
let func: v8::Global<v8::Function> = unsafe {
v8::Global::<v8::Function>::from_raw(
scope,
NonNull::new_unchecked(func.0 as _),
)
};
let func: v8::Local<v8::Value> =
func.open(scope).to_object(scope).unwrap().into();
// SAFETY: env is valid for the duration of the callback.
// data lifetime is users responsibility.
unsafe {
call_js_cb(env.0 as _, func.into(), context.0 as _, data.0 as _)
}
None => {
unsafe {
call_js_cb(env as *mut c_void, std::mem::zeroed(), context, data)
};
}
spawn(&self.sender, is_blocking, move |scope| {
call(scope, call_js_cb, func, env, context, data);
});
} else {
#[inline(always)]
fn call(
call_js_cb: napi_threadsafe_function_call_js,
env: SendPtr<Env>,
context: SendPtr<c_void>,
data: SendPtr<c_void>,
) {
// SAFETY: We're calling the provided callback with valid args
unsafe {
call_js_cb(
env.0 as _,
std::mem::zeroed(),
context.0 as _,
data.0 as _,
)
}
}

// Receiver might have been already dropped
let _ = tx.send(());
});
// This call should never fail
self.sender.unbounded_send(call).unwrap();
} else if let Some(_js_func) = js_func {
let call = Box::new(move || {
spawn(&self.sender, is_blocking, move |_| {
call(call_js_cb, env, context, data);
});
}
} else {
spawn(&self.sender, is_blocking, |_| {
// TODO: func.call
// let func = js_func.open(scope);
// Receiver might have been already dropped
let _ = tx.send(());
});
// This call should never fail
self.sender.unbounded_send(call).unwrap();
}

if is_blocking {
let _ = rx.recv();
}
};
}
}

Expand Down
77 changes: 9 additions & 68 deletions ext/napi/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,17 @@ use core::ptr::NonNull;
use deno_core::error::type_error;
use deno_core::error::AnyError;
use deno_core::futures::channel::mpsc;
use deno_core::futures::StreamExt;
use deno_core::op2;
use deno_core::parking_lot::Mutex;
use deno_core::OpState;
use deno_core::V8CrossThreadTaskSpawner;
use std::cell::RefCell;
use std::ffi::CString;
use std::path::Path;
use std::path::PathBuf;
use std::rc::Rc;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use std::task::Poll;
use std::thread_local;

#[cfg(unix)]
Expand Down Expand Up @@ -231,13 +230,11 @@ pub struct napi_node_version {
pub release: *const c_char,
}

pub type PendingNapiAsyncWork = Box<dyn FnOnce()>;
pub trait PendingNapiAsyncWork: FnOnce() + Send + 'static {}
impl<T> PendingNapiAsyncWork for T where T: FnOnce() + Send + 'static {}

pub type ThreadsafeFunctionRefCounters = Vec<(usize, Arc<AtomicUsize>)>;
pub struct NapiState {
// Async tasks.
pub pending_async_work: Vec<PendingNapiAsyncWork>,
pub async_work_sender: mpsc::UnboundedSender<PendingNapiAsyncWork>,
pub async_work_receiver: mpsc::UnboundedReceiver<PendingNapiAsyncWork>,
// Thread safe functions.
pub active_threadsafe_functions: usize,
pub threadsafe_function_receiver:
Expand Down Expand Up @@ -318,7 +315,7 @@ pub struct Env {
pub isolate_ptr: *mut v8::OwnedIsolate,
pub open_handle_scopes: usize,
pub shared: *mut EnvShared,
pub async_work_sender: mpsc::UnboundedSender<PendingNapiAsyncWork>,
pub async_work_sender: V8CrossThreadTaskSpawner,
pub threadsafe_function_sender:
mpsc::UnboundedSender<ThreadSafeFunctionStatus>,
pub cleanup_hooks:
Expand All @@ -336,7 +333,7 @@ impl Env {
isolate_ptr: *mut v8::OwnedIsolate,
context: v8::Global<v8::Context>,
global: v8::Global<v8::Value>,
sender: mpsc::UnboundedSender<PendingNapiAsyncWork>,
sender: V8CrossThreadTaskSpawner,
threadsafe_function_sender: mpsc::UnboundedSender<ThreadSafeFunctionStatus>,
cleanup_hooks: Rc<
RefCell<Vec<(extern "C" fn(*const c_void), *const c_void)>>,
Expand Down Expand Up @@ -372,8 +369,8 @@ impl Env {
unsafe { &mut *self.shared }
}

pub fn add_async_work(&mut self, async_work: PendingNapiAsyncWork) {
self.async_work_sender.unbounded_send(async_work).unwrap();
pub fn add_async_work(&mut self, async_work: impl FnOnce() + Send + 'static) {
self.async_work_sender.spawn(|_| async_work());
}

#[inline]
Expand Down Expand Up @@ -418,74 +415,18 @@ deno_core::extension!(deno_napi,
op_napi_open<P>
],
state = |state| {
let (async_work_sender, async_work_receiver) =
mpsc::unbounded::<PendingNapiAsyncWork>();
let (threadsafe_function_sender, threadsafe_function_receiver) =
mpsc::unbounded::<ThreadSafeFunctionStatus>();
state.put(NapiState {
pending_async_work: Vec::new(),
async_work_sender,
async_work_receiver,
threadsafe_function_sender,
threadsafe_function_receiver,
active_threadsafe_functions: 0,
env_cleanup_hooks: Rc::new(RefCell::new(vec![])),
tsfn_ref_counters: Arc::new(Mutex::new(vec![])),
});
},
event_loop_middleware = event_loop_middleware,
);

fn event_loop_middleware(
op_state_rc: Rc<RefCell<OpState>>,
cx: &mut std::task::Context,
) -> bool {
// `work` can call back into the runtime. It can also schedule an async task
// but we don't know that now. We need to make the runtime re-poll to make
// sure no pending NAPI tasks exist.
let mut maybe_scheduling = false;

{
let mut op_state = op_state_rc.borrow_mut();
let napi_state = op_state.borrow_mut::<NapiState>();

while let Poll::Ready(Some(async_work_fut)) =
napi_state.async_work_receiver.poll_next_unpin(cx)
{
napi_state.pending_async_work.push(async_work_fut);
}

if napi_state.active_threadsafe_functions > 0 {
maybe_scheduling = true;
}

let tsfn_ref_counters = napi_state.tsfn_ref_counters.lock().clone();
for (_id, counter) in tsfn_ref_counters.iter() {
if counter.load(std::sync::atomic::Ordering::SeqCst) > 0 {
maybe_scheduling = true;
break;
}
}
}

loop {
let maybe_work = {
let mut op_state = op_state_rc.borrow_mut();
let napi_state = op_state.borrow_mut::<NapiState>();
napi_state.pending_async_work.pop()
};

if let Some(work) = maybe_work {
work();
maybe_scheduling = true;
} else {
break;
}
}

maybe_scheduling
}

pub trait NapiPermissions {
fn check(&mut self, path: Option<&Path>)
-> std::result::Result<(), AnyError>;
Expand Down Expand Up @@ -557,7 +498,7 @@ where
let napi_state = op_state.borrow::<NapiState>();
let isolate_ptr = op_state.borrow::<*mut v8::OwnedIsolate>();
(
napi_state.async_work_sender.clone(),
op_state.borrow::<V8CrossThreadTaskSpawner>().clone(),
napi_state.threadsafe_function_sender.clone(),
*isolate_ptr,
napi_state.env_cleanup_hooks.clone(),
Expand Down

0 comments on commit d13e45f

Please sign in to comment.