diff --git a/src/lib.rs b/src/lib.rs index 9219982..aa3475c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,9 +1,12 @@ #![cfg_attr(not(feature = "std"), no_std)] +use codec::alloc::string::ToString; +use core::convert::TryInto; use frame_support::{ - decl_error, decl_event, decl_module, decl_storage, dispatch, ensure, sp_std::prelude::*, + debug, decl_error, decl_event, decl_module, decl_storage, dispatch, ensure, + sp_runtime::offchain::storage::StorageValueRef, sp_std::prelude::*, }; -use frame_system::{ self as system, ensure_signed, offchain::SendTransactionTypes }; +use frame_system::{self as system, ensure_signed, offchain::SendTransactionTypes}; use product_registry::ProductId; @@ -65,6 +68,7 @@ decl_error! { ShipmentHasTooManyProducts, ShippingEventAlreadyExists, ShippingEventMaxExceeded, + OffchainWorkerAlreadyBusy } } @@ -176,25 +180,64 @@ decl_module! { Ok(()) } - // fn offchain_worker(_block_number: T::BlockNumber) { - // if Self::ocw_tasks().len() == 0 { return; } - // let mut tasks: Vec> = >::get(); - - // while tasks.len() > 0 { - // let task = tasks.remove(0); - // debug::info!("ocw task: {:?}", task); - // let _ = Self::notify_listener(&task).map_err(|e| { - // debug::error!("Error notifying listener. Err: {:?}", e); - // }); - // } - - // // Submit a transaction back on-chain to clear the task queue - // let call = Call::clear_ocwtasks(); - - // let _ = SubmitTransaction::>::submit_unsigned_transaction(call.into()).map_err(|e| { - // debug::error!("Failed in submitting tx for clearing ocw taskqueue. Err: {:?}", e); - // }); - // } + fn offchain_worker(block_number: T::BlockNumber) { + if block_number == 0.into() { + return; + } + + // Acquire lock + let s_lock = match Self::ocw_acquire_lock(StorageValueRef::persistent(b"product_tracking_ocw::lock")) { + Ok(s_lock) => s_lock, + Err(e) => { + debug::info!("[product_tracking_ocw] Aborting: {:?}", e); //debug + return; + } + }; + + // Check last processed block + let last_processed_block_ref = StorageValueRef::persistent(b"product_tracking_ocw::last_proccessed_block"); + let mut last_processed_block: u32 = match last_processed_block_ref.get::() { + Some(Some(last_proccessed_block)) if last_proccessed_block >= block_number => { + debug::info!("[product_tracking_ocw] Skipping: Block {:?} has already been processed.", block_number); + return; + }, + Some(Some(last_proccessed_block)) => last_proccessed_block.try_into().ok().unwrap() as u32, + None => 0u32, //TODO: define a OCW_MAX_BACKTRACK_PERIOD param + _ => { + debug::error!("[product_tracking_ocw] Error reading product_tracking_ocw::last_proccessed_block."); + return; + } + }; + + let start_block = last_processed_block + 1; + let end_block = block_number.try_into().ok().unwrap() as u32; + for current_block in start_block..end_block { + debug::debug!("[product_tracking_ocw] Processing notifications for block {}", current_block); + let ev_indices = Self::ocw_notifications::(current_block.into()); + + let listener_results: Result, _> = ev_indices.iter() + .map(|idx| match Self::event_by_idx(idx) { + Some(ev) => Self::notify_listener(&ev), + None => Ok(()) + }) + .collect(); + + if let Err(err) = listener_results { + debug::warn!("[product_tracking_ocw] notify_listener error: {}", err); + break; + } + last_processed_block = current_block; + } + + // Save last processed block + if last_processed_block >= start_block { + last_processed_block_ref.set(&last_processed_block); + debug::info!("[product_tracking_ocw] Notifications successfully processed up to block {}", last_processed_block); + } + + // Release lock + Self::ocw_release_lock(s_lock); + } } } @@ -208,32 +251,7 @@ impl Module { ShippingEventBuilder::::default() } - // fn notify_listener(task: &OcwTask) -> Result<(), &'static str> { - // let request = - // sp_runtime::offchain::http::Request::post(&LISTENER_ENDPOINT, vec![task.to_string()]); - - // let timeout = - // sp_io::offchain::timestamp().add(sp_runtime::offchain::Duration::from_millis(3000)); - - // let pending = request - // .add_header(&"Content-Type", &"text/plain") - // .deadline(timeout) // Setting the timeout time - // .send() // Sending the request out by the host - // .map_err(|_| "http post request building error")?; - - // let response = pending - // .try_wait(timeout) - // .map_err(|_| "http post request sent error")? - // .map_err(|_| "http post request sent error")?; - - // if response.code != 200 { - // return Err("http response error"); - // } - - // Ok(()) - // } - - pub fn store_event(event: ShippingEvent) -> Result> { + fn store_event(event: ShippingEvent) -> Result> { let event_idx = EventCount::get() .checked_add(1) .ok_or(Error::::ShippingEventMaxExceeded)?; @@ -245,6 +263,7 @@ impl Module { Ok(event_idx) } + // (Public) Validation methods pub fn validate_identifier(id: &[u8]) -> Result<(), Error> { // Basic identifier validation ensure!(!id.is_empty(), Error::::InvalidOrMissingIdentifier); @@ -271,4 +290,58 @@ impl Module { ); Ok(()) } + + // Offchain worker methods + fn notify_listener(ev: &ShippingEvent) -> Result<(), &'static str> { + debug::info!("notifying listener: {:?}", ev); + + let request = + sp_runtime::offchain::http::Request::post(&LISTENER_ENDPOINT, vec![ev.to_string()]); + + let timeout = + sp_io::offchain::timestamp().add(sp_runtime::offchain::Duration::from_millis(3000)); + + let pending = request + .add_header(&"Content-Type", &"text/plain") + .deadline(timeout) // Setting the timeout time + .send() // Sending the request out by the host + .map_err(|_| "http post request building error")?; + + let response = pending + .try_wait(timeout) + .map_err(|_| "http post request sent error")? + .map_err(|_| "http post request sent error")?; + + if response.code != 200 { + return Err("http response error"); + } + + Ok(()) + } + + fn ocw_acquire_lock(s_lock: StorageValueRef) -> Result> { + // We are implementing a mutex lock here with `s_lock` + s_lock + .mutate(|s: Option>| { + match s { + // `s` can be one of the following: + // `None`: the lock has never been set. Treated as the lock is free + // `Some(None)`: unexpected case, treated it as AlreadyFetch + // `Some(Some(false))`: the lock is free + // `Some(Some(true))`: the lock is held + + // If the lock has never been set or is free (false), return true to execute `fetch_n_parse` + None | Some(Some(false)) => Ok(true), + + // Otherwise, someone already hold the lock (true), we want to skip `fetch_n_parse`. + // Covering cases: `Some(None)` and `Some(Some(true))` + _ => Err(>::OffchainWorkerAlreadyBusy), + } + }) + .map(|_| s_lock) + } + + fn ocw_release_lock(s_lock: StorageValueRef) { + s_lock.set(&false); + } } diff --git a/src/types.rs b/src/types.rs index 74b58e3..5461fd9 100644 --- a/src/types.rs +++ b/src/types.rs @@ -1,6 +1,7 @@ -use codec::{ Decode, Encode }; +use codec::{Decode, Encode}; +use core::fmt; use fixed::types::I16F16; -use frame_support::{ sp_runtime::RuntimeDebug, sp_std::prelude::* }; +use frame_support::{sp_runtime::RuntimeDebug, sp_std::prelude::*}; use product_registry::ProductId; // Custom types @@ -74,6 +75,15 @@ pub struct ShippingEvent { pub timestamp: Moment, } +impl fmt::Display for ShippingEvent +where + Moment: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{:?}", self) + } +} + #[derive(Encode, Decode, Clone, PartialEq, Eq, RuntimeDebug)] pub struct ReadPoint { pub latitude: Decimal, @@ -97,45 +107,3 @@ pub struct Reading { pub timestamp: Moment, pub value: Decimal, } - -// #[derive(Encode, Decode, Clone, PartialEq, Eq, RuntimeDebug)] -// pub enum OcwTaskType { -// ShipmentRegistration, -// ShipmentPickup, -// ShipmentDelivery, -// } - -// impl OcwTaskType { -// pub fn from_shipping_event_type( -// shipping_event_type: &ShippingEventType, -// ) -> Result { -// match shipping_event_type { -// ShippingEventType::ShipmentRegistered => Ok(OcwTaskType::ShipmentRegistration), -// ShippingEventType::ShipmentPickup => Ok(OcwTaskType::ShipmentPickup), -// ShippingEventType::ShipmentDelivery => Ok(OcwTaskType::ShipmentDelivery), -// ShippingEventType::SensorReading => Err("Unsupported shipping event type conversion"), -// } -// } -// } - -// #[derive(Encode, Decode, Clone, PartialEq, Eq, RuntimeDebug)] -// pub enum OcwTaskPayload { -// Shipment(Shipment), -// ShippingEvent(ShippingEvent), -// } - -// #[derive(Encode, Decode, Clone, PartialEq, Eq, RuntimeDebug)] -// pub struct OcwTask { -// r#type: OcwTaskType, -// payload: OcwTaskPayload, -// } - -// impl fmt::Display for OcwTask -// where -// A: fmt::Debug, -// M: fmt::Debug, -// { -// fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { -// write!(f, "{:?}", self) -// } -// } diff --git a/types.json b/types.json index 3e09344..79607c6 100644 --- a/types.json +++ b/types.json @@ -1,8 +1,11 @@ { - "Identifier": "Vec", - "Decimal": "u32", "ProductId": "Identifier", + "Identifier": "Vec", + "Decimal": "i32", "ShipmentId": "Identifier", + "ShippingEventIndex": "u128", + "DeviceId": "Identifier", + "ShipmentStatus": { "_enum": [ "Pending", @@ -18,22 +21,32 @@ "registered": "Moment", "delivered": "Option" }, - "ShippingEventIndex": "u64", - "DeviceId": "Identifier", "ShippingOperation": { "_enum": [ - "ShipmentPickup", - "ShipmentDelivery" + "Pickup", + "Scan", + "Deliver" ] }, "ShippingEventType": { "_enum": [ "ShipmentRegistration", "ShipmentPickup", - "SensorReading", - "ShipmentDelivery" + "ShipmentScan", + "ShipmentDeliver" ] }, + "ShippingEvent": { + "event_type": "ShippingEventType", + "shipment_id": "ShipmentId", + "location": "Option", + "readings": "Vec>", + "timestamp": "Moment" + }, + "ReadPoint": { + "latitude": "Decimal", + "longitude": "Decimal" + }, "ReadingType": { "_enum": [ "Humidity", @@ -44,21 +57,10 @@ "Vibration" ] }, - "ReadPoint": { - "latitude": "Decimal", - "longitude": "Decimal" - }, "Reading": { "device_id": "DeviceId", "reading_type": "ReadingType", "timestamp": "Moment", "value": "Decimal" - }, - "ShippingEvent": { - "event_type": "ShippingEventType", - "shipment_id": "ShipmentId", - "location": "Option", - "readings": "Vec>", - "timestamp": "Moment" } } \ No newline at end of file