Skip to content

Commit

Permalink
Revamp JobQueue into JobExecutor and introduce NativeAsyncJob (#…
Browse files Browse the repository at this point in the history
…4118)

* Introduce `NativeAsyncJob`

* Finish implementation

* Docs & Reviews

* Fix realm of async job

* Revamp `JobQueue` into `JobExecutor`

* Fix execution bug in cli executor

* Propagate jobs into `JobExecutor::run_jobs`
  • Loading branch information
jedel1043 authored Jan 14, 2025
1 parent 3008f1d commit f64d937
Show file tree
Hide file tree
Showing 23 changed files with 769 additions and 438 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

62 changes: 40 additions & 22 deletions cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ mod helper;
use boa_engine::{
builtins::promise::PromiseState,
context::ContextBuilder,
job::{FutureJob, JobQueue, NativeJob},
job::{Job, JobExecutor, NativeAsyncJob, PromiseJob},
module::{Module, SimpleModuleLoader},
optimizer::OptimizerOptions,
script::Script,
vm::flowgraph::{Direction, Graph},
Context, JsError, Source,
Context, JsError, JsResult, Source,
};
use boa_parser::source::ReadChar;
use clap::{Parser, ValueEnum, ValueHint};
Expand Down Expand Up @@ -292,7 +292,7 @@ fn evaluate_file(
);

let promise = module.load_link_evaluate(context);
context.run_jobs();
context.run_jobs().map_err(|err| err.into_erased(context))?;
let result = promise.state();

return match result {
Expand All @@ -308,9 +308,9 @@ fn evaluate_file(
Ok(v) => println!("{}", v.display()),
Err(v) => eprintln!("Uncaught {v}"),
}
context.run_jobs();

Ok(())
context
.run_jobs()
.map_err(|err| err.into_erased(context).into())
}

fn evaluate_files(args: &Opt, context: &mut Context, loader: &SimpleModuleLoader) {
Expand All @@ -336,10 +336,10 @@ fn main() -> Result<()> {

let args = Opt::parse();

let queue = Rc::new(Jobs::default());
let executor = Rc::new(Executor::default());
let loader = Rc::new(SimpleModuleLoader::new(&args.root).map_err(|e| eyre!(e.to_string()))?);
let mut context = ContextBuilder::new()
.job_queue(queue)
.job_executor(executor)
.module_loader(loader.clone())
.build()
.map_err(|e| eyre!(e.to_string()))?;
Expand Down Expand Up @@ -425,7 +425,9 @@ fn main() -> Result<()> {
eprintln!("{}: {}", "Uncaught".red(), v.to_string().red());
}
}
context.run_jobs();
if let Err(err) = context.run_jobs() {
eprintln!("{err}");
};
}
}

Expand Down Expand Up @@ -453,29 +455,45 @@ fn add_runtime(context: &mut Context) {
}

#[derive(Default)]
struct Jobs(RefCell<VecDeque<NativeJob>>);
struct Executor {
promise_jobs: RefCell<VecDeque<PromiseJob>>,
async_jobs: RefCell<VecDeque<NativeAsyncJob>>,
}

impl JobQueue for Jobs {
fn enqueue_promise_job(&self, job: NativeJob, _: &mut Context) {
self.0.borrow_mut().push_back(job);
impl JobExecutor for Executor {
fn enqueue_job(&self, job: Job, _: &mut Context) {
match job {
Job::PromiseJob(job) => self.promise_jobs.borrow_mut().push_back(job),
Job::AsyncJob(job) => self.async_jobs.borrow_mut().push_back(job),
job => eprintln!("unsupported job type {job:?}"),
}
}

fn run_jobs(&self, context: &mut Context) {
fn run_jobs(&self, context: &mut Context) -> JsResult<()> {
loop {
let jobs = std::mem::take(&mut *self.0.borrow_mut());
if jobs.is_empty() {
return;
if self.promise_jobs.borrow().is_empty() && self.async_jobs.borrow().is_empty() {
return Ok(());
}

let jobs = std::mem::take(&mut *self.promise_jobs.borrow_mut());
for job in jobs {
if let Err(e) = job.call(context) {
eprintln!("Uncaught {e}");
}
}
}
}

fn enqueue_future_job(&self, future: FutureJob, _: &mut Context) {
let job = pollster::block_on(future);
self.0.borrow_mut().push_back(job);
let async_jobs = std::mem::take(&mut *self.async_jobs.borrow_mut());
for async_job in async_jobs {
if let Err(err) = pollster::block_on(async_job.call(&RefCell::new(context))) {
eprintln!("Uncaught {err}");
}
let jobs = std::mem::take(&mut *self.promise_jobs.borrow_mut());
for job in jobs {
if let Err(e) = job.call(context) {
eprintln!("Uncaught {e}");
}
}
}
}
}
}
22 changes: 12 additions & 10 deletions core/engine/src/builtins/promise/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{
builtins::{Array, BuiltInObject},
context::intrinsics::{Intrinsics, StandardConstructor, StandardConstructors},
error::JsNativeError,
job::{JobCallback, NativeJob},
job::{JobCallback, PromiseJob},
js_string,
native_function::NativeFunction,
object::{
Expand Down Expand Up @@ -1888,8 +1888,8 @@ impl Promise {

// c. Perform HostEnqueuePromiseJob(fulfillJob.[[Job]], fulfillJob.[[Realm]]).
context
.job_queue()
.enqueue_promise_job(fulfill_job, context);
.job_executor()
.enqueue_job(fulfill_job.into(), context);
}

// 11. Else,
Expand All @@ -1909,7 +1909,9 @@ impl Promise {
let reject_job = new_promise_reaction_job(reject_reaction, reason.clone(), context);

// e. Perform HostEnqueuePromiseJob(rejectJob.[[Job]], rejectJob.[[Realm]]).
context.job_queue().enqueue_promise_job(reject_job, context);
context
.job_executor()
.enqueue_job(reject_job.into(), context);

// 12. Set promise.[[PromiseIsHandled]] to true.
promise
Expand Down Expand Up @@ -1985,7 +1987,7 @@ impl Promise {
let job = new_promise_reaction_job(reaction, argument.clone(), context);

// b. Perform HostEnqueuePromiseJob(job.[[Job]], job.[[Realm]]).
context.job_queue().enqueue_promise_job(job, context);
context.job_executor().enqueue_job(job.into(), context);
}
// 2. Return unused.
}
Expand Down Expand Up @@ -2178,7 +2180,7 @@ impl Promise {
);

// 15. Perform HostEnqueuePromiseJob(job.[[Job]], job.[[Realm]]).
context.job_queue().enqueue_promise_job(job, context);
context.job_executor().enqueue_job(job.into(), context);

// 16. Return undefined.
Ok(JsValue::undefined())
Expand Down Expand Up @@ -2239,7 +2241,7 @@ fn new_promise_reaction_job(
mut reaction: ReactionRecord,
argument: JsValue,
context: &mut Context,
) -> NativeJob {
) -> PromiseJob {
// Inverting order since `job` captures `reaction` by value.

// 2. Let handlerRealm be null.
Expand Down Expand Up @@ -2320,7 +2322,7 @@ fn new_promise_reaction_job(
};

// 4. Return the Record { [[Job]]: job, [[Realm]]: handlerRealm }.
NativeJob::with_realm(job, realm, context)
PromiseJob::with_realm(job, realm, context)
}

/// More information:
Expand All @@ -2332,7 +2334,7 @@ fn new_promise_resolve_thenable_job(
thenable: JsValue,
then: JobCallback,
context: &mut Context,
) -> NativeJob {
) -> PromiseJob {
// Inverting order since `job` captures variables by value.

// 2. Let getThenRealmResult be Completion(GetFunctionRealm(then.[[Callback]])).
Expand Down Expand Up @@ -2374,5 +2376,5 @@ fn new_promise_resolve_thenable_job(
};

// 6. Return the Record { [[Job]]: job, [[Realm]]: thenRealm }.
NativeJob::with_realm(job, realm, context)
PromiseJob::with_realm(job, realm, context)
}
3 changes: 1 addition & 2 deletions core/engine/src/builtins/promise/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ fn promise() {
count += 1;
"#}),
TestAction::assert_eq("count", 2),
#[allow(clippy::redundant_closure_for_method_calls)]
TestAction::inspect_context(|ctx| ctx.run_jobs()),
TestAction::inspect_context(|ctx| ctx.run_jobs().unwrap()),
TestAction::assert_eq("count", 3),
]);
}
66 changes: 38 additions & 28 deletions core/engine/src/context/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! The ECMAScript context.
use std::cell::RefCell;
use std::{cell::Cell, path::Path, rc::Rc};

use boa_ast::StatementList;
Expand All @@ -13,11 +14,12 @@ use intrinsics::Intrinsics;
#[cfg(feature = "temporal")]
use temporal_rs::tzdb::FsTzdbProvider;

use crate::job::Job;
use crate::vm::RuntimeLimits;
use crate::{
builtins,
class::{Class, ClassBuilder},
job::{JobQueue, NativeJob, SimpleJobQueue},
job::{JobExecutor, SimpleJobExecutor},
js_string,
module::{IdleModuleLoader, ModuleLoader, SimpleModuleLoader},
native_function::NativeFunction,
Expand Down Expand Up @@ -111,7 +113,7 @@ pub struct Context {

host_hooks: &'static dyn HostHooks,

job_queue: Rc<dyn JobQueue>,
job_executor: Rc<dyn JobExecutor>,

module_loader: Rc<dyn ModuleLoader>,

Expand All @@ -133,7 +135,7 @@ impl std::fmt::Debug for Context {
.field("interner", &self.interner)
.field("vm", &self.vm)
.field("strict", &self.strict)
.field("promise_job_queue", &"JobQueue")
.field("job_executor", &"JobExecutor")
.field("hooks", &"HostHooks")
.field("module_loader", &"ModuleLoader")
.field("optimizer_options", &self.optimizer_options);
Expand Down Expand Up @@ -186,7 +188,7 @@ impl Context {
/// ```
///
/// Note that this won't run any scheduled promise jobs; you need to call [`Context::run_jobs`]
/// on the context or [`JobQueue::run_jobs`] on the provided queue to run them.
/// on the context or [`JobExecutor::run_jobs`] on the provided queue to run them.
#[allow(clippy::unit_arg, dropping_copy_types)]
pub fn eval<R: ReadChar>(&mut self, src: Source<'_, R>) -> JsResult<JsValue> {
let main_timer = Profiler::global().start_event("Script evaluation", "Main");
Expand Down Expand Up @@ -467,30 +469,35 @@ impl Context {
self.strict = strict;
}

/// Enqueues a [`NativeJob`] on the [`JobQueue`].
/// Enqueues a [`Job`] on the [`JobExecutor`].
#[inline]
pub fn enqueue_job(&mut self, job: NativeJob) {
self.job_queue().enqueue_promise_job(job, self);
pub fn enqueue_job(&mut self, job: Job) {
self.job_executor().enqueue_job(job, self);
}

/// Runs all the jobs in the job queue.
/// Runs all the jobs with the provided job executor.
#[inline]
pub fn run_jobs(&mut self) {
self.job_queue().run_jobs(self);
pub fn run_jobs(&mut self) -> JsResult<()> {
let result = self.job_executor().run_jobs(self);
self.clear_kept_objects();
result
}

/// Asynchronously runs all the jobs in the job queue.
/// Asynchronously runs all the jobs with the provided job executor.
///
/// # Note
///
/// Concurrent job execution cannot be guaranteed by the engine, since this depends on the
/// specific handling of each [`JobQueue`]. If you want to execute jobs concurrently, you must
/// provide a custom implementor of `JobQueue` to the context.
/// specific handling of each [`JobExecutor`]. If you want to execute jobs concurrently, you must
/// provide a custom implementatin of `JobExecutor` to the context.
#[allow(clippy::future_not_send)]
pub async fn run_jobs_async(&mut self) {
self.job_queue().run_jobs_async(self).await;
pub async fn run_jobs_async(&mut self) -> JsResult<()> {
let result = self
.job_executor()
.run_jobs_async(&RefCell::new(self))
.await;
self.clear_kept_objects();
result
}

/// Abstract operation [`ClearKeptObjects`][clear].
Expand Down Expand Up @@ -546,11 +553,11 @@ impl Context {
self.host_hooks
}

/// Gets the job queue.
/// Gets the job executor.
#[inline]
#[must_use]
pub fn job_queue(&self) -> Rc<dyn JobQueue> {
self.job_queue.clone()
pub fn job_executor(&self) -> Rc<dyn JobExecutor> {
self.job_executor.clone()
}

/// Gets the module loader.
Expand Down Expand Up @@ -881,7 +888,7 @@ impl Context {
pub struct ContextBuilder {
interner: Option<Interner>,
host_hooks: Option<&'static dyn HostHooks>,
job_queue: Option<Rc<dyn JobQueue>>,
job_executor: Option<Rc<dyn JobExecutor>>,
module_loader: Option<Rc<dyn ModuleLoader>>,
can_block: bool,
#[cfg(feature = "intl")]
Expand All @@ -893,7 +900,7 @@ pub struct ContextBuilder {
impl std::fmt::Debug for ContextBuilder {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
#[derive(Clone, Copy, Debug)]
struct JobQueue;
struct JobExecutor;
#[derive(Clone, Copy, Debug)]
struct HostHooks;
#[derive(Clone, Copy, Debug)]
Expand All @@ -903,7 +910,10 @@ impl std::fmt::Debug for ContextBuilder {

out.field("interner", &self.interner)
.field("host_hooks", &self.host_hooks.as_ref().map(|_| HostHooks))
.field("job_queue", &self.job_queue.as_ref().map(|_| JobQueue))
.field(
"job_executor",
&self.job_executor.as_ref().map(|_| JobExecutor),
)
.field(
"module_loader",
&self.module_loader.as_ref().map(|_| ModuleLoader),
Expand Down Expand Up @@ -1016,10 +1026,10 @@ impl ContextBuilder {
self
}

/// Initializes the [`JobQueue`] for the context.
/// Initializes the [`JobExecutor`] for the context.
#[must_use]
pub fn job_queue<Q: JobQueue + 'static>(mut self, job_queue: Rc<Q>) -> Self {
self.job_queue = Some(job_queue);
pub fn job_executor<Q: JobExecutor + 'static>(mut self, job_executor: Rc<Q>) -> Self {
self.job_executor = Some(job_executor);
self
}

Expand Down Expand Up @@ -1090,9 +1100,9 @@ impl ContextBuilder {
Rc::new(IdleModuleLoader)
};

let job_queue = self
.job_queue
.unwrap_or_else(|| Rc::new(SimpleJobQueue::new()));
let job_executor = self
.job_executor
.unwrap_or_else(|| Rc::new(SimpleJobExecutor::new()));

let mut context = Context {
interner: self.interner.unwrap_or_default(),
Expand All @@ -1119,7 +1129,7 @@ impl ContextBuilder {
instructions_remaining: self.instructions_remaining,
kept_alive: Vec::new(),
host_hooks,
job_queue,
job_executor,
module_loader,
optimizer_options: OptimizerOptions::OPTIMIZE_ALL,
root_shape,
Expand Down
Loading

0 comments on commit f64d937

Please sign in to comment.