diff --git a/runtime/test/src/test.rs b/runtime/test/src/test.rs index 7241734ab5a..5610e82a7d0 100644 --- a/runtime/test/src/test.rs +++ b/runtime/test/src/test.rs @@ -102,7 +102,7 @@ async fn test_valid_module_and_store_with_timeout( }; let module = WasmInstance::from_valid_module_with_ctx( - Arc::new(ValidModule::new(&logger, data_source.mapping.runtime.as_ref()).unwrap()), + Arc::new(ValidModule::new(&logger, data_source.mapping.runtime.as_ref(), timeout).unwrap()), mock_context( deployment.clone(), data_source, @@ -110,7 +110,6 @@ async fn test_valid_module_and_store_with_timeout( api_version, ), host_metrics, - timeout, experimental_features, ) .unwrap(); diff --git a/runtime/wasm/src/host_exports.rs b/runtime/wasm/src/host_exports.rs index 6b4505cd542..3e17c3a68ef 100644 --- a/runtime/wasm/src/host_exports.rs +++ b/runtime/wasm/src/host_exports.rs @@ -517,7 +517,6 @@ impl HostExports { valid_module.clone(), ctx.derive_with_empty_block_state(), host_metrics.clone(), - wasm_ctx.timeout, wasm_ctx.experimental_features, )?; let result = module.handle_json_callback(&callback, &sv.value, &user_data)?; diff --git a/runtime/wasm/src/mapping.rs b/runtime/wasm/src/mapping.rs index 45cd79d6852..62a7aa166fa 100644 --- a/runtime/wasm/src/mapping.rs +++ b/runtime/wasm/src/mapping.rs @@ -27,7 +27,7 @@ pub fn spawn_module( where ::MappingTrigger: ToAscPtr, { - let valid_module = Arc::new(ValidModule::new(&logger, raw_module)?); + let valid_module = Arc::new(ValidModule::new(&logger, raw_module, timeout)?); // Create channel for event handling requests let (mapping_request_sender, mapping_request_receiver) = mpsc::channel(100); @@ -60,7 +60,6 @@ where valid_module.cheap_clone(), ctx, host_metrics.cheap_clone(), - timeout, experimental_features, ) .map_err(Into::into) @@ -112,7 +111,6 @@ fn instantiate_module( valid_module: Arc, ctx: MappingContext, host_metrics: Arc, - timeout: Option, experimental_features: ExperimentalFeatures, ) -> Result where @@ -124,7 +122,6 @@ where valid_module, ctx, host_metrics.cheap_clone(), - timeout, experimental_features, ) .context("module instantiation failed") @@ -248,11 +245,21 @@ pub struct ValidModule { // AS now has an `@external("module", "name")` decorator which would make things cleaner, but // the ship has sailed. pub import_name_to_modules: BTreeMap>, + + // The timeout for the module. + pub timeout: Option, + + // Used as a guard to terminate this task dependency. + epoch_counter_abort_handle: Option>, } impl ValidModule { /// Pre-process and validate the module. - pub fn new(logger: &Logger, raw_module: &[u8]) -> Result { + pub fn new( + logger: &Logger, + raw_module: &[u8], + timeout: Option, + ) -> Result { // Add the gas calls here. Module name "gas" must match. See also // e3f03e62-40e4-4f8c-b4a1-d0375cca0b76. We do this by round-tripping the module through // parity - injecting gas then serializing again. @@ -318,10 +325,37 @@ impl ValidModule { .push(module.to_string()); } + let mut epoch_counter_abort_handle = None; + if let Some(timeout) = timeout { + let timeout = timeout.clone(); + let engine = engine.clone(); + + // The epoch counter task will perpetually increment the epoch every `timeout` seconds. + // Timeouts on instantiated modules will trigger on epoch deltas. + // Note: The epoch is an u64 so it will never overflow. + // See also: runtime-timeouts + epoch_counter_abort_handle = Some(graph::spawn(async move { + loop { + tokio::time::sleep(timeout).await; + engine.increment_epoch(); + } + })); + } + Ok(ValidModule { module, import_name_to_modules, start_function, + timeout, + epoch_counter_abort_handle, }) } } + +impl Drop for ValidModule { + fn drop(&mut self) { + if let Some(handle) = self.epoch_counter_abort_handle.take() { + handle.abort(); + } + } +} diff --git a/runtime/wasm/src/module/context.rs b/runtime/wasm/src/module/context.rs index 079deef8d88..52efa395464 100644 --- a/runtime/wasm/src/module/context.rs +++ b/runtime/wasm/src/module/context.rs @@ -28,7 +28,6 @@ use graph::runtime::{asc_new, gas::GasCounter, DeterministicHostError, HostExpor use super::asc_get; use super::AscHeapCtx; -use super::TimeoutStopwatch; pub(crate) struct WasmInstanceContext<'a> { inner: StoreContextMut<'a, WasmInstanceData>, @@ -56,6 +55,16 @@ impl WasmInstanceContext<'_> { pub fn asc_heap_mut(&mut self) -> &mut AscHeapCtx { self.as_mut().asc_heap_mut() } + + pub fn suspend_timeout(&mut self) { + // See also: runtime-timeouts + self.inner.set_epoch_deadline(u64::MAX); + } + + pub fn start_timeout(&mut self) { + // See also: runtime-timeouts + self.inner.set_epoch_deadline(2); + } } impl AsContext for WasmInstanceContext<'_> { @@ -76,10 +85,6 @@ pub struct WasmInstanceData { pub ctx: MappingContext, pub valid_module: Arc, pub host_metrics: Arc, - pub(crate) timeout: Option, - - // Used by ipfs.map. - pub(crate) timeout_stopwatch: Arc>, // A trap ocurred due to a possible reorg detection. pub possible_reorg: bool, @@ -99,8 +104,6 @@ impl WasmInstanceData { ctx: MappingContext, valid_module: Arc, host_metrics: Arc, - timeout: Option, - timeout_stopwatch: Arc>, experimental_features: ExperimentalFeatures, ) -> Self { WasmInstanceData { @@ -108,8 +111,6 @@ impl WasmInstanceData { ctx, valid_module, host_metrics, - timeout, - timeout_stopwatch, possible_reorg: false, deterministic_host_trap: false, experimental_features, @@ -583,11 +584,8 @@ impl WasmInstanceContext<'_> { let flags = asc_get(self, flags, gas)?; - // Pause the timeout while running ipfs_map, ensure it will be restarted by using a guard. - self.as_ref().timeout_stopwatch.lock().unwrap().stop(); - let defer_stopwatch = self.as_ref().timeout_stopwatch.clone(); - let _stopwatch_guard = defer::defer(|| defer_stopwatch.lock().unwrap().start()); - + // Pause the timeout while running ipfs_map, and resume it when done. + self.suspend_timeout(); let start_time = Instant::now(); let output_states = HostExports::ipfs_map( &self.as_ref().ctx.host_exports.link_resolver.cheap_clone(), @@ -597,6 +595,7 @@ impl WasmInstanceContext<'_> { user_data, flags, )?; + self.start_timeout(); debug!( &self.as_ref().ctx.logger, diff --git a/runtime/wasm/src/module/instance.rs b/runtime/wasm/src/module/instance.rs index dbc9be86b0b..6d47cccb950 100644 --- a/runtime/wasm/src/module/instance.rs +++ b/runtime/wasm/src/module/instance.rs @@ -24,7 +24,7 @@ use super::{IntoTrap, WasmInstanceContext}; use crate::error::DeterminismLevel; use crate::mapping::MappingContext; use crate::mapping::ValidModule; -use crate::module::{TimeoutStopwatch, WasmInstanceData}; +use crate::module::WasmInstanceData; use crate::ExperimentalFeatures; use super::{is_trap_deterministic, AscHeapCtx, ToAscPtr}; @@ -191,6 +191,7 @@ impl WasmInstance { // Treat timeouts anywhere in the error chain as a special case to have a better error // message. Any `TrapCode::Interrupt` is assumed to be a timeout. + // See also: runtime-timeouts Err(trap) if trap .chain() @@ -200,7 +201,7 @@ impl WasmInstance { return Err(MappingError::Unknown(Error::from(trap).context(format!( "Handler '{}' hit the timeout of '{}' seconds", handler, - self.instance_ctx().as_ref().timeout.unwrap().as_secs() + self.instance_ctx().as_ref().valid_module.timeout.unwrap().as_secs() )))); } Err(trap) => { @@ -263,7 +264,6 @@ impl WasmInstance { valid_module: Arc, ctx: MappingContext, host_metrics: Arc, - timeout: Option, experimental_features: ExperimentalFeatures, ) -> Result { let engine = valid_module.module.engine(); @@ -271,31 +271,23 @@ impl WasmInstance { let host_fns = ctx.host_fns.cheap_clone(); let api_version = ctx.host_exports.data_source.api_version.clone(); - // // Start the timeout watchdog task. - let timeout_stopwatch = Arc::new(std::sync::Mutex::new(TimeoutStopwatch::start_new())); - let wasm_ctx = WasmInstanceData::from_instance( ctx, valid_module.cheap_clone(), host_metrics.cheap_clone(), - timeout, - timeout_stopwatch.clone(), experimental_features, ); let mut store = Store::new(engine, wasm_ctx); + // The epoch on the engine will only ever be incremeted if increment_epoch() is explicitly - // called, we only do so if a timeout has been set, otherwise 1 means it will run forever. - // If a timeout is provided then epoch 1 should happen roughly once the timeout duration - // has elapsed. - store.set_epoch_deadline(1); - if let Some(timeout) = timeout { - let timeout = timeout.clone(); - let engine = engine.clone(); - graph::spawn(async move { - tokio::time::sleep(timeout).await; - engine.increment_epoch(); - }); - } + // called, we only do so if a timeout has been set, it will run forever. When a timeout is + // set, the timeout duration is used as the duration of one epoch. + // + // Therefore, the setting of 2 here means that if a `timeout` is provided, then this + // interrupt will be triggered between a duration of `timeout` and `timeout * 2`. + // + // See also: runtime-timeouts + store.set_epoch_deadline(2); // Because `gas` and `deterministic_host_trap` need to be accessed from the gas // host fn, they need to be separate from the rest of the context. diff --git a/runtime/wasm/src/module/mod.rs b/runtime/wasm/src/module/mod.rs index 203fc4717e8..ffe4f7aba8e 100644 --- a/runtime/wasm/src/module/mod.rs +++ b/runtime/wasm/src/module/mod.rs @@ -20,7 +20,6 @@ use graph::runtime::{ IndexForAscTypeId, }; pub use into_wasm_ret::IntoWasmRet; -pub use stopwatch::TimeoutStopwatch; use crate::error::DeterminismLevel; use crate::gas_rules::{GAS_COST_LOAD, GAS_COST_STORE}; @@ -31,7 +30,6 @@ pub use instance::*; mod context; mod instance; mod into_wasm_ret; -pub mod stopwatch; // Convenience for a 'top-level' asc_get, with depth 0. fn asc_get( diff --git a/runtime/wasm/src/module/stopwatch.rs b/runtime/wasm/src/module/stopwatch.rs deleted file mode 100644 index 52d3b716708..00000000000 --- a/runtime/wasm/src/module/stopwatch.rs +++ /dev/null @@ -1,58 +0,0 @@ -// Copied from https://github.com/ellisonch/rust-stopwatch -// Copyright (c) 2014 Chucky Ellison under MIT license - -use std::default::Default; -use std::time::{Duration, Instant}; - -#[derive(Clone, Copy)] -pub struct TimeoutStopwatch { - /// The time the stopwatch was started last, if ever. - start_time: Option, - /// The time elapsed while the stopwatch was running (between start() and stop()). - pub elapsed: Duration, -} - -impl Default for TimeoutStopwatch { - fn default() -> TimeoutStopwatch { - TimeoutStopwatch { - start_time: None, - elapsed: Duration::from_secs(0), - } - } -} - -impl TimeoutStopwatch { - /// Returns a new stopwatch. - pub fn new() -> TimeoutStopwatch { - let sw: TimeoutStopwatch = Default::default(); - sw - } - - /// Returns a new stopwatch which will immediately be started. - pub fn start_new() -> TimeoutStopwatch { - let mut sw = TimeoutStopwatch::new(); - sw.start(); - sw - } - - /// Starts the stopwatch. - pub fn start(&mut self) { - self.start_time = Some(Instant::now()); - } - - /// Stops the stopwatch. - pub fn stop(&mut self) { - self.elapsed = self.elapsed(); - self.start_time = None; - } - - /// Returns the elapsed time since the start of the stopwatch. - pub fn elapsed(&self) -> Duration { - match self.start_time { - // stopwatch is running - Some(t1) => t1.elapsed() + self.elapsed, - // stopwatch is not running - None => self.elapsed, - } - } -}