Skip to content

Commit

Permalink
fix(runtime): Fix runtime timeouts
Browse files Browse the repository at this point in the history
The handling for epoch-based timeouts wasn't quite right.
Using `set_epoch_deadline(1)` meant the timeout would be hit
on the next epoch, which could be anywhere between `0` and `timeout`
seconds away. `set_epoch_deadline(2)` will make that between `timeout`
and `2*timeout` seconds away.
  • Loading branch information
leoyvens committed Feb 23, 2024
1 parent 01be1d1 commit 656da4f
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 102 deletions.
3 changes: 1 addition & 2 deletions runtime/test/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,14 @@ async fn test_valid_module_and_store_with_timeout(
};

let module = WasmInstance::from_valid_module_with_ctx(
Arc::new(ValidModule::new(&logger, data_source.mapping.runtime.as_ref()).unwrap()),
Arc::new(ValidModule::new(&logger, data_source.mapping.runtime.as_ref(), timeout).unwrap()),
mock_context(
deployment.clone(),
data_source,
store.subgraph_store(),
api_version,
),
host_metrics,
timeout,
experimental_features,
)
.unwrap();
Expand Down
1 change: 0 additions & 1 deletion runtime/wasm/src/host_exports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,6 @@ impl HostExports {
valid_module.clone(),
ctx.derive_with_empty_block_state(),
host_metrics.clone(),
wasm_ctx.timeout,
wasm_ctx.experimental_features,
)?;
let result = module.handle_json_callback(&callback, &sv.value, &user_data)?;
Expand Down
44 changes: 39 additions & 5 deletions runtime/wasm/src/mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub fn spawn_module<C: Blockchain>(
where
<C as Blockchain>::MappingTrigger: ToAscPtr,
{
let valid_module = Arc::new(ValidModule::new(&logger, raw_module)?);
let valid_module = Arc::new(ValidModule::new(&logger, raw_module, timeout)?);

// Create channel for event handling requests
let (mapping_request_sender, mapping_request_receiver) = mpsc::channel(100);
Expand Down Expand Up @@ -60,7 +60,6 @@ where
valid_module.cheap_clone(),
ctx,
host_metrics.cheap_clone(),
timeout,
experimental_features,
)
.map_err(Into::into)
Expand Down Expand Up @@ -112,7 +111,6 @@ fn instantiate_module<C: Blockchain>(
valid_module: Arc<ValidModule>,
ctx: MappingContext,
host_metrics: Arc<HostMetrics>,
timeout: Option<Duration>,
experimental_features: ExperimentalFeatures,
) -> Result<WasmInstance, anyhow::Error>
where
Expand All @@ -124,7 +122,6 @@ where
valid_module,
ctx,
host_metrics.cheap_clone(),
timeout,
experimental_features,
)
.context("module instantiation failed")
Expand Down Expand Up @@ -248,11 +245,21 @@ pub struct ValidModule {
// AS now has an `@external("module", "name")` decorator which would make things cleaner, but
// the ship has sailed.
pub import_name_to_modules: BTreeMap<String, Vec<String>>,

// The timeout for the module.
pub timeout: Option<Duration>,

// Used as a guard to terminate this task dependency.
epoch_counter_abort_handle: Option<tokio::task::JoinHandle<()>>,
}

impl ValidModule {
/// Pre-process and validate the module.
pub fn new(logger: &Logger, raw_module: &[u8]) -> Result<Self, anyhow::Error> {
pub fn new(
logger: &Logger,
raw_module: &[u8],
timeout: Option<Duration>,
) -> Result<Self, anyhow::Error> {
// Add the gas calls here. Module name "gas" must match. See also
// e3f03e62-40e4-4f8c-b4a1-d0375cca0b76. We do this by round-tripping the module through
// parity - injecting gas then serializing again.
Expand Down Expand Up @@ -318,10 +325,37 @@ impl ValidModule {
.push(module.to_string());
}

let mut epoch_counter_abort_handle = None;
if let Some(timeout) = timeout {
let timeout = timeout.clone();
let engine = engine.clone();

// The epoch counter task will perpetually increment the epoch every `timeout` seconds.
// Timeouts on instantiated modules will trigger on epoch deltas.
// Note: The epoch is an u64 so it will never overflow.
// See also: runtime-timeouts
epoch_counter_abort_handle = Some(graph::spawn(async move {
loop {
tokio::time::sleep(timeout).await;
engine.increment_epoch();
}
}));
}

Ok(ValidModule {
module,
import_name_to_modules,
start_function,
timeout,
epoch_counter_abort_handle,
})
}
}

impl Drop for ValidModule {
fn drop(&mut self) {
if let Some(handle) = self.epoch_counter_abort_handle.take() {
handle.abort();
}
}
}
27 changes: 13 additions & 14 deletions runtime/wasm/src/module/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use graph::runtime::{asc_new, gas::GasCounter, DeterministicHostError, HostExpor

use super::asc_get;
use super::AscHeapCtx;
use super::TimeoutStopwatch;

pub(crate) struct WasmInstanceContext<'a> {
inner: StoreContextMut<'a, WasmInstanceData>,
Expand Down Expand Up @@ -56,6 +55,16 @@ impl WasmInstanceContext<'_> {
pub fn asc_heap_mut(&mut self) -> &mut AscHeapCtx {
self.as_mut().asc_heap_mut()
}

pub fn suspend_timeout(&mut self) {
// See also: runtime-timeouts
self.inner.set_epoch_deadline(u64::MAX);
}

pub fn start_timeout(&mut self) {
// See also: runtime-timeouts
self.inner.set_epoch_deadline(2);
}
}

impl AsContext for WasmInstanceContext<'_> {
Expand All @@ -76,10 +85,6 @@ pub struct WasmInstanceData {
pub ctx: MappingContext,
pub valid_module: Arc<ValidModule>,
pub host_metrics: Arc<HostMetrics>,
pub(crate) timeout: Option<Duration>,

// Used by ipfs.map.
pub(crate) timeout_stopwatch: Arc<std::sync::Mutex<TimeoutStopwatch>>,

// A trap ocurred due to a possible reorg detection.
pub possible_reorg: bool,
Expand All @@ -99,17 +104,13 @@ impl WasmInstanceData {
ctx: MappingContext,
valid_module: Arc<ValidModule>,
host_metrics: Arc<HostMetrics>,
timeout: Option<Duration>,
timeout_stopwatch: Arc<std::sync::Mutex<TimeoutStopwatch>>,
experimental_features: ExperimentalFeatures,
) -> Self {
WasmInstanceData {
asc_heap: None,
ctx,
valid_module,
host_metrics,
timeout,
timeout_stopwatch,
possible_reorg: false,
deterministic_host_trap: false,
experimental_features,
Expand Down Expand Up @@ -583,11 +584,8 @@ impl WasmInstanceContext<'_> {

let flags = asc_get(self, flags, gas)?;

// Pause the timeout while running ipfs_map, ensure it will be restarted by using a guard.
self.as_ref().timeout_stopwatch.lock().unwrap().stop();
let defer_stopwatch = self.as_ref().timeout_stopwatch.clone();
let _stopwatch_guard = defer::defer(|| defer_stopwatch.lock().unwrap().start());

// Pause the timeout while running ipfs_map, and resume it when done.
self.suspend_timeout();
let start_time = Instant::now();
let output_states = HostExports::ipfs_map(
&self.as_ref().ctx.host_exports.link_resolver.cheap_clone(),
Expand All @@ -597,6 +595,7 @@ impl WasmInstanceContext<'_> {
user_data,
flags,
)?;
self.start_timeout();

debug!(
&self.as_ref().ctx.logger,
Expand Down
32 changes: 12 additions & 20 deletions runtime/wasm/src/module/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use super::{IntoTrap, WasmInstanceContext};
use crate::error::DeterminismLevel;
use crate::mapping::MappingContext;
use crate::mapping::ValidModule;
use crate::module::{TimeoutStopwatch, WasmInstanceData};
use crate::module::WasmInstanceData;
use crate::ExperimentalFeatures;

use super::{is_trap_deterministic, AscHeapCtx, ToAscPtr};
Expand Down Expand Up @@ -191,6 +191,7 @@ impl WasmInstance {

// Treat timeouts anywhere in the error chain as a special case to have a better error
// message. Any `TrapCode::Interrupt` is assumed to be a timeout.
// See also: runtime-timeouts
Err(trap)
if trap
.chain()
Expand All @@ -200,7 +201,7 @@ impl WasmInstance {
return Err(MappingError::Unknown(Error::from(trap).context(format!(
"Handler '{}' hit the timeout of '{}' seconds",
handler,
self.instance_ctx().as_ref().timeout.unwrap().as_secs()
self.instance_ctx().as_ref().valid_module.timeout.unwrap().as_secs()
))));
}
Err(trap) => {
Expand Down Expand Up @@ -263,39 +264,30 @@ impl WasmInstance {
valid_module: Arc<ValidModule>,
ctx: MappingContext,
host_metrics: Arc<HostMetrics>,
timeout: Option<Duration>,
experimental_features: ExperimentalFeatures,
) -> Result<WasmInstance, anyhow::Error> {
let engine = valid_module.module.engine();
let mut linker: Linker<WasmInstanceData> = wasmtime::Linker::new(engine);
let host_fns = ctx.host_fns.cheap_clone();
let api_version = ctx.host_exports.data_source.api_version.clone();

// // Start the timeout watchdog task.
let timeout_stopwatch = Arc::new(std::sync::Mutex::new(TimeoutStopwatch::start_new()));

let wasm_ctx = WasmInstanceData::from_instance(
ctx,
valid_module.cheap_clone(),
host_metrics.cheap_clone(),
timeout,
timeout_stopwatch.clone(),
experimental_features,
);
let mut store = Store::new(engine, wasm_ctx);

// The epoch on the engine will only ever be incremeted if increment_epoch() is explicitly
// called, we only do so if a timeout has been set, otherwise 1 means it will run forever.
// If a timeout is provided then epoch 1 should happen roughly once the timeout duration
// has elapsed.
store.set_epoch_deadline(1);
if let Some(timeout) = timeout {
let timeout = timeout.clone();
let engine = engine.clone();
graph::spawn(async move {
tokio::time::sleep(timeout).await;
engine.increment_epoch();
});
}
// called, we only do so if a timeout has been set, it will run forever. When a timeout is
// set, the timeout duration is used as the duration of one epoch.
//
// Therefore, the setting of 2 here means that if a `timeout` is provided, then this
// interrupt will be triggered between a duration of `timeout` and `timeout * 2`.
//
// See also: runtime-timeouts
store.set_epoch_deadline(2);

// Because `gas` and `deterministic_host_trap` need to be accessed from the gas
// host fn, they need to be separate from the rest of the context.
Expand Down
2 changes: 0 additions & 2 deletions runtime/wasm/src/module/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use graph::runtime::{
IndexForAscTypeId,
};
pub use into_wasm_ret::IntoWasmRet;
pub use stopwatch::TimeoutStopwatch;

use crate::error::DeterminismLevel;
use crate::gas_rules::{GAS_COST_LOAD, GAS_COST_STORE};
Expand All @@ -31,7 +30,6 @@ pub use instance::*;
mod context;
mod instance;
mod into_wasm_ret;
pub mod stopwatch;

// Convenience for a 'top-level' asc_get, with depth 0.
fn asc_get<T, C: AscType, H: AscHeap + ?Sized>(
Expand Down
58 changes: 0 additions & 58 deletions runtime/wasm/src/module/stopwatch.rs

This file was deleted.

0 comments on commit 656da4f

Please sign in to comment.