Skip to content

Commit

Permalink
feat(decider, connector): poll new-app-cid events [fixes NET-384] (#8)
Browse files Browse the repository at this point in the history
* Add deal change event polling in the connector in batch
* Check updates from last seen block + 1
  • Loading branch information
kmd-fl authored Mar 22, 2023
1 parent a161b97 commit bc0d221
Show file tree
Hide file tree
Showing 8 changed files with 457 additions and 198 deletions.
242 changes: 131 additions & 111 deletions src/aqua/decider.aqua

Large diffs are not rendered by default.

31 changes: 27 additions & 4 deletions src/aqua/fluence_aurora_connector.aqua
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
module FluenceAuroraConnector declares *

data BlockNumberResult:
success: bool
result: string
error: []string

data DealChangedData:
app_cid: string

data DealChanged:
next_block_number: string
block_number: string
info: DealChangedData

Expand All @@ -16,6 +16,7 @@ data DealChangedResult:
success: bool
result: []DealChanged
to_block: string
deal_id: string

data U256:
bytes: []u8
Expand All @@ -34,6 +35,7 @@ data DealCreatedData:

data DealCreated:
block_number: string
next_block_number: string
info: DealCreatedData

data DealCreatedResult:
Expand All @@ -42,6 +44,26 @@ data DealCreatedResult:
result: []DealCreated
to_block: string

data DealInfo:
worker_id: string
deal_id: string

data DealUpdate:
deal_info: DealInfo
from_block: string

data DealUpdatedBatchResult:
success: bool
error: []string
result: []DealChanged
to_block: string
deal_info: DealInfo

data DealsUpdatedBatchResult:
result: []DealUpdatedBatchResult
success: bool
error: []string

data Net:
name: string
url: string
Expand All @@ -54,9 +76,10 @@ data Env:
nets: []Net
events: []SupportedEvent

service FluenceAuroraConnector:
service FluenceAuroraConnector("connector"):
blocks_diff(from: string, to: string) -> u64
get_env() -> Env
latest_block_number(net: string) -> BlockNumberResult
poll_deal_change(net: string, address: string, from_block: string) -> DealChangedResult
poll_deal_changed(net: string, deal_id: string, from_block: string) -> DealChangedResult
poll_deals(net: string, address: string, from_block: string) -> DealCreatedResult
poll_deals_latest_update_batch(net: string, deals: []DealUpdate) -> DealsUpdatedBatchResult
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ pub struct DealChangedData {
app_cid: String,
}

#[derive(Debug)]
#[marine]
pub struct DealChanged {
next_block_number: String,
block_number: String,
info: DealChangedData,
}
Expand Down Expand Up @@ -50,7 +52,11 @@ impl ChainData for DealChangedData {
}

impl ChainEvent<DealChangedData> for DealChanged {
fn new(block_number: String, info: DealChangedData) -> Self {
Self { block_number, info }
fn new(next_block_number: String, block_number: String, info: DealChangedData) -> Self {
Self {
next_block_number,
block_number,
info,
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,12 @@ pub struct DealCreatedData {
epoch: u64,
}

#[derive(Debug)]
#[marine]
pub struct DealCreated {
block_number: String,
/// The number of the block next to the one of the deal
next_block_number: String,
info: DealCreatedData,
}

Expand Down Expand Up @@ -117,14 +120,18 @@ impl ChainData for DealCreatedData {
}

impl ChainEvent<DealCreatedData> for DealCreated {
fn new(block_number: String, info: DealCreatedData) -> Self {
Self { block_number, info }
fn new(next_block_number: String, block_number: String, info: DealCreatedData) -> Self {
Self {
next_block_number,
block_number,
info,
}
}
}

#[cfg(test)]
mod test {
use crate::{parse_chain_deal_created_data, DealParseError};
use crate::*;
use std::assert_matches::assert_matches;

// Cannot now provide an example of encoded data with effectors
Expand All @@ -149,7 +156,7 @@ mod test {
fn test_chain_parsing_ok_empty_effectors() {
let data = "0x00000000000000000000000094952482aa36dc9ec113bbba0df49284ecc071e20000000000000000000000005f7a3a2dab601ee4a1970b53088bebca176e13f40000000000000000000000000000000000000000000000000de0b6b3a76400000000000000000000000000000000000000000000000000000de0b6b3a7640000000000000000000000000000000000000000000000000000000000000000000300000000000000000000000000000000000000000000000000000000009896800000000000000000000000000000000000000000000000000000000000000005000000000000000000000000000000000000000000000000000000000000014000000000000000000000000000000000000000000000000000000000000001a00000000000000000000000000000000000000000000000000000000000554509000000000000000000000000000000000000000000000000000000000000002e516d5758616131534b41445274774e7472773278714a5556447864734472536d4a635542614a7946324c353476500000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000";

let result = parse_chain_deal_data(data);
let result = DealCreatedData::parse(data);
assert!(result.is_ok(), "can't parse data: {:?}", result);
let result = result.unwrap();
assert_eq!(result.deal_id, "94952482aa36dc9ec113bbba0df49284ecc071e2");
Expand Down Expand Up @@ -180,19 +187,19 @@ mod test {
#[test]
fn test_chain_parsing_fail_empty() {
let data = "";
let result = parse_chain_deal_data(data);
let result = DealCreatedData::parse(data);
assert!(result.is_err());
assert_matches!(result, Err(DealParseError::Empty));
assert_matches!(result, Err(deal::DealParseError::Empty));
}

#[test]
fn test_chain_parsing_fail_something() {
let data = "0x1234567890";
let result = parse_chain_deal_data(data);
let result = DealCreatedData::parse(data);
assert!(result.is_err());
assert_matches!(
result,
Err(DealParseError::EthError(ethabi::Error::InvalidData))
Err(deal::DealParseError::EthError(ethabi::Error::InvalidData))
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use marine_rs_sdk::marine;
use thiserror::Error;

pub trait ChainEvent<ChainData> {
fn new(block_number: String, data: ChainData) -> Self;
fn new(next_block_number: String, block_number: String, data: ChainData) -> Self;
}

pub trait ChainData {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,10 @@ use thiserror::Error;
//
const JSON_RPC_VERSION: &str = "2.0";

// We don't id `id` field, but we need to verify the response.
const JSON_RPC_ID: u32 = 0;

#[derive(Debug, Error)]
pub enum JsonRpcError {
#[error("wrong JSON RPC version in the response: expected {JSON_RPC_VERSION}, got {0}")]
WrongVersion(String),
#[error("wrong JSON RPC id in the response: expected {JSON_RPC_ID}, got {0}")]
WrongId(u32),
}

#[derive(Debug, Serialize, Deserialize)]
Expand All @@ -35,10 +30,6 @@ impl<T> JsonRpcResp<T> {
if self.jsonrpc != JSON_RPC_VERSION {
return Err(JsonRpcError::WrongVersion(self.jsonrpc));
}
if self.id != JSON_RPC_ID {
return Err(JsonRpcError::WrongId(self.id));
}

Ok(self.result)
}
}
Expand Down Expand Up @@ -66,10 +57,10 @@ pub struct GetLogsReq {
}

impl GetLogsReq {
pub fn to_jsonrpc(self) -> JsonRpcReq<Vec<Self>> {
pub fn to_jsonrpc(self, id: u32) -> JsonRpcReq<Vec<Self>> {
JsonRpcReq {
jsonrpc: JSON_RPC_VERSION.to_string(),
id: 0,
id,
method: "eth_getLogs".to_string(),
params: vec![self],
}
Expand Down Expand Up @@ -122,7 +113,7 @@ mod tests {
"0x04157dc3f231c23b7cbecbadb1af08b865aa2e8d6624fe39a72a17279da72278".to_string(),
],
};
let jsonrpc_req = req.to_jsonrpc();
let jsonrpc_req = req.to_jsonrpc(0);

assert_eq!(
serde_json::to_string(&result).unwrap(),
Expand Down
Loading

0 comments on commit bc0d221

Please sign in to comment.