Skip to content

Commit

Permalink
Refactor OCW processing (#20)
Browse files Browse the repository at this point in the history
* Stub for OCW processing.

* Add error msg for storage read error

* Refactor OCW to deal w/ transient listener issues.
  • Loading branch information
Steve Degosserie authored Jul 20, 2020
1 parent e2b9376 commit 5718b20
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 110 deletions.
167 changes: 120 additions & 47 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
#![cfg_attr(not(feature = "std"), no_std)]

use codec::alloc::string::ToString;
use core::convert::TryInto;
use frame_support::{
decl_error, decl_event, decl_module, decl_storage, dispatch, ensure, sp_std::prelude::*,
debug, decl_error, decl_event, decl_module, decl_storage, dispatch, ensure,
sp_runtime::offchain::storage::StorageValueRef, sp_std::prelude::*,
};
use frame_system::{ self as system, ensure_signed, offchain::SendTransactionTypes };
use frame_system::{self as system, ensure_signed, offchain::SendTransactionTypes};

use product_registry::ProductId;

Expand Down Expand Up @@ -65,6 +68,7 @@ decl_error! {
ShipmentHasTooManyProducts,
ShippingEventAlreadyExists,
ShippingEventMaxExceeded,
OffchainWorkerAlreadyBusy
}
}

Expand Down Expand Up @@ -176,25 +180,64 @@ decl_module! {
Ok(())
}

// fn offchain_worker(_block_number: T::BlockNumber) {
// if Self::ocw_tasks().len() == 0 { return; }
// let mut tasks: Vec<OcwTask<T::AccountId, T::Moment>> = <OcwTasks<T>>::get();

// while tasks.len() > 0 {
// let task = tasks.remove(0);
// debug::info!("ocw task: {:?}", task);
// let _ = Self::notify_listener(&task).map_err(|e| {
// debug::error!("Error notifying listener. Err: {:?}", e);
// });
// }

// // Submit a transaction back on-chain to clear the task queue
// let call = Call::clear_ocwtasks();

// let _ = SubmitTransaction::<T, Call<T>>::submit_unsigned_transaction(call.into()).map_err(|e| {
// debug::error!("Failed in submitting tx for clearing ocw taskqueue. Err: {:?}", e);
// });
// }
fn offchain_worker(block_number: T::BlockNumber) {
if block_number == 0.into() {
return;
}

// Acquire lock
let s_lock = match Self::ocw_acquire_lock(StorageValueRef::persistent(b"product_tracking_ocw::lock")) {
Ok(s_lock) => s_lock,
Err(e) => {
debug::info!("[product_tracking_ocw] Aborting: {:?}", e); //debug
return;
}
};

// Check last processed block
let last_processed_block_ref = StorageValueRef::persistent(b"product_tracking_ocw::last_proccessed_block");
let mut last_processed_block: u32 = match last_processed_block_ref.get::<T::BlockNumber>() {
Some(Some(last_proccessed_block)) if last_proccessed_block >= block_number => {
debug::info!("[product_tracking_ocw] Skipping: Block {:?} has already been processed.", block_number);
return;
},
Some(Some(last_proccessed_block)) => last_proccessed_block.try_into().ok().unwrap() as u32,
None => 0u32, //TODO: define a OCW_MAX_BACKTRACK_PERIOD param
_ => {
debug::error!("[product_tracking_ocw] Error reading product_tracking_ocw::last_proccessed_block.");
return;
}
};

let start_block = last_processed_block + 1;
let end_block = block_number.try_into().ok().unwrap() as u32;
for current_block in start_block..end_block {
debug::debug!("[product_tracking_ocw] Processing notifications for block {}", current_block);
let ev_indices = Self::ocw_notifications::<T::BlockNumber>(current_block.into());

let listener_results: Result<Vec<_>, _> = ev_indices.iter()
.map(|idx| match Self::event_by_idx(idx) {
Some(ev) => Self::notify_listener(&ev),
None => Ok(())
})
.collect();

if let Err(err) = listener_results {
debug::warn!("[product_tracking_ocw] notify_listener error: {}", err);
break;
}
last_processed_block = current_block;
}

// Save last processed block
if last_processed_block >= start_block {
last_processed_block_ref.set(&last_processed_block);
debug::info!("[product_tracking_ocw] Notifications successfully processed up to block {}", last_processed_block);
}

// Release lock
Self::ocw_release_lock(s_lock);
}
}
}

Expand All @@ -208,32 +251,7 @@ impl<T: Trait> Module<T> {
ShippingEventBuilder::<T::Moment>::default()
}

// fn notify_listener(task: &OcwTask<T::AccountId, T::Moment>) -> Result<(), &'static str> {
// let request =
// sp_runtime::offchain::http::Request::post(&LISTENER_ENDPOINT, vec![task.to_string()]);

// let timeout =
// sp_io::offchain::timestamp().add(sp_runtime::offchain::Duration::from_millis(3000));

// let pending = request
// .add_header(&"Content-Type", &"text/plain")
// .deadline(timeout) // Setting the timeout time
// .send() // Sending the request out by the host
// .map_err(|_| "http post request building error")?;

// let response = pending
// .try_wait(timeout)
// .map_err(|_| "http post request sent error")?
// .map_err(|_| "http post request sent error")?;

// if response.code != 200 {
// return Err("http response error");
// }

// Ok(())
// }

pub fn store_event(event: ShippingEvent<T::Moment>) -> Result<ShippingEventIndex, Error<T>> {
fn store_event(event: ShippingEvent<T::Moment>) -> Result<ShippingEventIndex, Error<T>> {
let event_idx = EventCount::get()
.checked_add(1)
.ok_or(Error::<T>::ShippingEventMaxExceeded)?;
Expand All @@ -245,6 +263,7 @@ impl<T: Trait> Module<T> {
Ok(event_idx)
}

// (Public) Validation methods
pub fn validate_identifier(id: &[u8]) -> Result<(), Error<T>> {
// Basic identifier validation
ensure!(!id.is_empty(), Error::<T>::InvalidOrMissingIdentifier);
Expand All @@ -271,4 +290,58 @@ impl<T: Trait> Module<T> {
);
Ok(())
}

// Offchain worker methods
fn notify_listener(ev: &ShippingEvent<T::Moment>) -> Result<(), &'static str> {
debug::info!("notifying listener: {:?}", ev);

let request =
sp_runtime::offchain::http::Request::post(&LISTENER_ENDPOINT, vec![ev.to_string()]);

let timeout =
sp_io::offchain::timestamp().add(sp_runtime::offchain::Duration::from_millis(3000));

let pending = request
.add_header(&"Content-Type", &"text/plain")
.deadline(timeout) // Setting the timeout time
.send() // Sending the request out by the host
.map_err(|_| "http post request building error")?;

let response = pending
.try_wait(timeout)
.map_err(|_| "http post request sent error")?
.map_err(|_| "http post request sent error")?;

if response.code != 200 {
return Err("http response error");
}

Ok(())
}

fn ocw_acquire_lock(s_lock: StorageValueRef) -> Result<StorageValueRef, Error<T>> {
// We are implementing a mutex lock here with `s_lock`
s_lock
.mutate(|s: Option<Option<bool>>| {
match s {
// `s` can be one of the following:
// `None`: the lock has never been set. Treated as the lock is free
// `Some(None)`: unexpected case, treated it as AlreadyFetch
// `Some(Some(false))`: the lock is free
// `Some(Some(true))`: the lock is held

// If the lock has never been set or is free (false), return true to execute `fetch_n_parse`
None | Some(Some(false)) => Ok(true),

// Otherwise, someone already hold the lock (true), we want to skip `fetch_n_parse`.
// Covering cases: `Some(None)` and `Some(Some(true))`
_ => Err(<Error<T>>::OffchainWorkerAlreadyBusy),
}
})
.map(|_| s_lock)
}

fn ocw_release_lock(s_lock: StorageValueRef) {
s_lock.set(&false);
}
}
56 changes: 12 additions & 44 deletions src/types.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use codec::{ Decode, Encode };
use codec::{Decode, Encode};
use core::fmt;
use fixed::types::I16F16;
use frame_support::{ sp_runtime::RuntimeDebug, sp_std::prelude::* };
use frame_support::{sp_runtime::RuntimeDebug, sp_std::prelude::*};
use product_registry::ProductId;

// Custom types
Expand Down Expand Up @@ -74,6 +75,15 @@ pub struct ShippingEvent<Moment> {
pub timestamp: Moment,
}

impl<Moment> fmt::Display for ShippingEvent<Moment>
where
Moment: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{:?}", self)
}
}

#[derive(Encode, Decode, Clone, PartialEq, Eq, RuntimeDebug)]
pub struct ReadPoint {
pub latitude: Decimal,
Expand All @@ -97,45 +107,3 @@ pub struct Reading<Moment> {
pub timestamp: Moment,
pub value: Decimal,
}

// #[derive(Encode, Decode, Clone, PartialEq, Eq, RuntimeDebug)]
// pub enum OcwTaskType {
// ShipmentRegistration,
// ShipmentPickup,
// ShipmentDelivery,
// }

// impl OcwTaskType {
// pub fn from_shipping_event_type(
// shipping_event_type: &ShippingEventType,
// ) -> Result<OcwTaskType, &'static str> {
// match shipping_event_type {
// ShippingEventType::ShipmentRegistered => Ok(OcwTaskType::ShipmentRegistration),
// ShippingEventType::ShipmentPickup => Ok(OcwTaskType::ShipmentPickup),
// ShippingEventType::ShipmentDelivery => Ok(OcwTaskType::ShipmentDelivery),
// ShippingEventType::SensorReading => Err("Unsupported shipping event type conversion"),
// }
// }
// }

// #[derive(Encode, Decode, Clone, PartialEq, Eq, RuntimeDebug)]
// pub enum OcwTaskPayload<AccountId, Moment> {
// Shipment(Shipment<AccountId, Moment>),
// ShippingEvent(ShippingEvent<Moment>),
// }

// #[derive(Encode, Decode, Clone, PartialEq, Eq, RuntimeDebug)]
// pub struct OcwTask<AccountId, Moment> {
// r#type: OcwTaskType,
// payload: OcwTaskPayload<AccountId, Moment>,
// }

// impl<A, M> fmt::Display for OcwTask<A, M>
// where
// A: fmt::Debug,
// M: fmt::Debug,
// {
// fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// write!(f, "{:?}", self)
// }
// }
40 changes: 21 additions & 19 deletions types.json
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
{
"Identifier": "Vec<u8>",
"Decimal": "u32",
"ProductId": "Identifier",
"Identifier": "Vec<u8>",
"Decimal": "i32",
"ShipmentId": "Identifier",
"ShippingEventIndex": "u128",
"DeviceId": "Identifier",

"ShipmentStatus": {
"_enum": [
"Pending",
Expand All @@ -18,22 +21,32 @@
"registered": "Moment",
"delivered": "Option<Moment>"
},
"ShippingEventIndex": "u64",
"DeviceId": "Identifier",
"ShippingOperation": {
"_enum": [
"ShipmentPickup",
"ShipmentDelivery"
"Pickup",
"Scan",
"Deliver"
]
},
"ShippingEventType": {
"_enum": [
"ShipmentRegistration",
"ShipmentPickup",
"SensorReading",
"ShipmentDelivery"
"ShipmentScan",
"ShipmentDeliver"
]
},
"ShippingEvent": {
"event_type": "ShippingEventType",
"shipment_id": "ShipmentId",
"location": "Option<ReadPoint>",
"readings": "Vec<Reading<Moment>>",
"timestamp": "Moment"
},
"ReadPoint": {
"latitude": "Decimal",
"longitude": "Decimal"
},
"ReadingType": {
"_enum": [
"Humidity",
Expand All @@ -44,21 +57,10 @@
"Vibration"
]
},
"ReadPoint": {
"latitude": "Decimal",
"longitude": "Decimal"
},
"Reading": {
"device_id": "DeviceId",
"reading_type": "ReadingType",
"timestamp": "Moment",
"value": "Decimal"
},
"ShippingEvent": {
"event_type": "ShippingEventType",
"shipment_id": "ShipmentId",
"location": "Option<ReadPoint>",
"readings": "Vec<Reading<Moment>>",
"timestamp": "Moment"
}
}

0 comments on commit 5718b20

Please sign in to comment.