Skip to content

Commit

Permalink
feat: pass api endpoint via env [NET-479] (#39)
Browse files Browse the repository at this point in the history
  • Loading branch information
justprosh authored May 30, 2023
1 parent df9e732 commit 75aea7e
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 89 deletions.
2 changes: 1 addition & 1 deletion example/decider_init_args.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"listener_id": "connector",
"info": {
"net": "$FLUENCE_ENV_CONNECTOR_NET",
"api_endpoint": "$FLUENCE_ENV_CONNECTOR_API_ENDPOINT",
"address": "$FLUENCE_ENV_CONNECTOR_CONTRACT_ADDRESS"
},
"from_block": "0x75f3fbc",
Expand Down
2 changes: 1 addition & 1 deletion scripts/compile.sh
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ echo "create initial data for decider"
# "dat":
# "listener_id": aurora listener service
# "info":
# "net": net from which to poll
# "api_endpoint": net api endpoint from which to poll
# "address": contract address
# "from_block": "latest"
# "worker_script": worker.aqua script
Expand Down
31 changes: 14 additions & 17 deletions src/aqua/decider.aqua
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ func get_worker_settings(spell_id: string) -> ?WorkerSettings, bool:
func decide(deal: DealCreatedData) -> bool:
<- true

--
func join_deal(spell_id: string, block_number: string, deal: DealCreatedData) -> bool:
log = (msg: []⊤):
spell_log(spell_id, msg)
Expand Down Expand Up @@ -210,8 +209,8 @@ func get_latest_block(listener_id: string, net: string) -> ?string, bool:

<- block, is_ok!

-- Check if our local upper bound (to_block) until which we check new deals is outdated
-- aka less then the latest block of the chain.
-- Check if our local upper bound (to_block) until which we check new deals is outdated
-- aka less then the latest block of the chain.
func is_to_block_outdated(listener_id: string, latest_block: string, to_block: string) -> bool:
FluenceAuroraConnector listener_id
is: *bool
Expand All @@ -225,7 +224,7 @@ func is_to_block_outdated(listener_id: string, latest_block: string, to_block: s
<- is!

-- Check to the to_block is less than latest_block from chain.
-- That would meen that polling this range [from_block, to_block] won't give us new updated anymore
-- That would meen that polling this range [from_block, to_block] won't give us new updated anymore
-- and we need to move the range to be able to find updates.
func need_update_from_block(listener_id: string, net: string, to_block: string) -> bool:
need_update: *bool
Expand Down Expand Up @@ -256,7 +255,7 @@ func poll_deal_changes_batch(spell_id: string, listener_id: string, net: string)
deals_update: *DealUpdate
for joined_deal_str <- list.strings:
joined_deal <- JsonJoinedDeal.parse(joined_deal_str)
deal_str, is_ok <- get_string(spell_id, joined_deal.deal_id)
deal_str, is_ok <- get_string(spell_id, joined_deal.deal_id)
if is_ok == false:
log(["can't find state of the deal", joined_deal.deal_id, "; broken invariant, check poll_new_deals"])
else:
Expand Down Expand Up @@ -295,10 +294,8 @@ func poll_deal_changes_batch(spell_id: string, listener_id: string, net: string)

-- Data we need to poll new deals from aurora
data AuroraInfo:
-- Refers to which network to use:
-- * testnet (refers to Aurora Testnet)
-- * local (refers to the default local port)
net: string
-- Refers to which api endpoint to poll
api_endpoint: string
-- Chain contract address
address: string

Expand All @@ -312,7 +309,7 @@ func get_counter(spell_id: string) -> u32:
result <<- 0
<- result!

func poll_new_deals(spell_id: string, listener_id: string, info: AuroraInfo, from_block: string):
func poll_new_deals(spell_id: string, listener_id: string, info: AuroraInfo, from_block: string):
FluenceAuroraConnector listener_id
Spell spell_id
log = (msg: []⊤):
Expand All @@ -323,16 +320,16 @@ func poll_new_deals(spell_id: string, listener_id: string, info: AuroraInfo, fro
from_block_init: *string
counter <- get_counter(spell_id)
if counter > 1 == false:
bnumber <- FluenceAuroraConnector.latest_block_number(info.net)
bnumber <- FluenceAuroraConnector.latest_block_number(info.api_endpoint)
if bnumber.success:
log(["update from_block to the latest block: [init, new]", from_block, bnumber.result])
bnumber_str <- Json.stringify(bnumber.result)
Spell.set_string("from_block", bnumber_str)
Spell.set_string("from_block", bnumber_str)
from_block_init <<- bnumber.result
from_block_init <<- from_block
real_from_block = from_block_init[0]

result <- FluenceAuroraConnector.poll_deals(info.net, info.address, real_from_block)
result <- FluenceAuroraConnector.poll_deals(info.api_endpoint, info.address, real_from_block)
if result.success == false:
log(["can't receive info about new deals", result.error!])
else:
Expand All @@ -343,17 +340,17 @@ func poll_new_deals(spell_id: string, listener_id: string, info: AuroraInfo, fro
else:
process_deal(spell_id, deal)
new_from_block <- Json.stringify(deal.next_block_number)
Spell.set_string("from_block", new_from_block)
Spell.set_string("from_block", new_from_block)
-- If we found no deals, we check if need to move the from_block forward
-- We need to do it in case if no deals were done in range of 10000 blocks
if result.result.length > 1 == false:
need_update <- need_update_from_block(listener_id, info.net, result.to_block)
need_update <- need_update_from_block(listener_id, info.api_endpoint, result.to_block)
if need_update:
log(["updating outdated from_block: [previous from_block, new_from_block]", from_block, result.to_block])
to_block_str <- Json.stringify(result.to_block)
Spell.set_string("from_block", to_block_str)


func main(spell_id: string, listener_id: string, info: AuroraInfo, from_block: string):
-- Find new deals and create workers
try:
Expand All @@ -362,4 +359,4 @@ func main(spell_id: string, listener_id: string, info: AuroraInfo, from_block: s
-- TODO: I wonder what will happen when the decider join _many_ deals?
-- Will it be able to check them on time?
-- Update existing deals
poll_deal_changes_batch(spell_id, listener_id, info.net)
poll_deal_changes_batch(spell_id, listener_id, info.api_endpoint)
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ log = "0.4.17"
thiserror = "1.0.38"
ethabi = "18.0.0"
hex = "0.4.3"
url = "2.3.1"

[dev-dependencies]
marine-rs-sdk-test = "0.9.0"
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use marine_rs_sdk::marine;
use marine_rs_sdk::module_manifest;
use marine_rs_sdk::WasmLoggerBuilder;

use std::collections::HashMap;
use thiserror::Error;

use deal::changed_cid::*;
Expand All @@ -29,22 +28,12 @@ enum Error {
RequestError(#[from] request::RequestError),
#[error(transparent)]
JsonRpcError(#[from] jsonrpc::JsonRpcError),
#[error("unsupported network type: {0}")]
NetworkTypeError(String),
}

pub fn main() {
WasmLoggerBuilder::new().build().unwrap();
}

#[marine]
pub struct Net {
/// Short name for the net. Can be used in `poll` calls.
name: String,
/// URL of the net
url: String,
}

#[marine]
pub struct SupportedEvent {
/// Name of the event
Expand All @@ -56,22 +45,12 @@ pub struct SupportedEvent {
/// Service configuration
#[marine]
pub struct Env {
/// List of allowed networks
nets: Vec<Net>,
/// List of polled events with
/// List of polled events with topics
events: Vec<SupportedEvent>,
}

// TODO: allow owners to configure the service
#[marine]
pub fn get_env() -> Env {
let nets = nets()
.into_iter()
.map(|(name, url)| Net {
name: name.to_string(),
url: url.to_string(),
})
.collect::<_>();
let events = vec![
SupportedEvent {
name: DealCreated::EVENT_NAME.to_string(),
Expand All @@ -82,18 +61,7 @@ pub fn get_env() -> Env {
topic: DealChangedData::topic(),
},
];
Env { nets, events }
}

// Nets we allow to poll.
fn nets() -> HashMap<&'static str, &'static str> {
HashMap::from([
("testnet", "https://aged-tiniest-scion.matic-testnet.quiknode.pro/08133c1e70a6ec1e7a75545a1254d85640a6251d/"),
("polygon-testnet", "https://endpoints.omniatech.io/v1/matic/mumbai/public"),
("aurora-testnet", "https://testnet.aurora.dev"),
// Note: cool for debugging, but do we want to leave it here?
("local", "http://localhost:8545"),
])
Env { events }
}

#[marine]
Expand Down Expand Up @@ -122,15 +90,12 @@ impl BlockNumberResult {
}

#[marine]
pub fn latest_block_number(net: String) -> BlockNumberResult {
let url = match get_url(&net) {
None => {
return BlockNumberResult::error(Error::NetworkTypeError(net).to_string());
}
Some(url) => url,
};
pub fn latest_block_number(api_endpoint: String) -> BlockNumberResult {
if let Err(err) = check_url(&api_endpoint) {
return BlockNumberResult::error(err.to_string());
}

let result = match get_block_number(url) {
let result = match get_block_number(api_endpoint) {
Err(err) => {
log::debug!(target: "connector", "request error: {:?}", err);
return BlockNumberResult::error(err.to_string());
Expand All @@ -156,9 +121,6 @@ pub fn latest_block_number(net: String) -> BlockNumberResult {
BlockNumberResult::ok(result)
}

fn get_url(net: &str) -> Option<String> {
nets().get(net).map(|x| String::from(*x))
}

fn hex_to_int(block: &str) -> Option<u64> {
let block = block.trim_start_matches("0x");
Expand Down Expand Up @@ -223,14 +185,18 @@ impl DealCreatedResult {
// Don't see this functionallity in eth_getLogs
// TODO: need to restrict who can use this service to its spell.
//
// `net` -- network type to poll (right now it's possible to pass any URL for emergency cases)
// `api_endpoint` -- api endpoint to poll (right now it's possible to pass any URL for emergency cases)
// `address` -- address of the deal contract
// `from_block` -- from which block to poll deals
#[marine]
pub fn poll_deals(net: String, address: String, from_block: String) -> DealCreatedResult {
pub fn poll_deals(api_endpoint: String, address: String, from_block: String) -> DealCreatedResult {
if let Err(err) = check_url(&api_endpoint) {
return DealCreatedResult::error(err.to_string());
}

let to_block = get_to_block(&from_block);
let result = poll(
net,
api_endpoint,
address,
from_block,
to_block.clone(),
Expand Down Expand Up @@ -276,15 +242,19 @@ impl DealChangedResult {
}
}

// `net` -- network type to poll (right now it's possible to pass any URL for emergency cases)
// `api_endpoint` -- api endpoint to poll (right now it's possible to pass any URL for emergency cases)
// `address` -- address of the deal we are modifying
// `from_block` -- from which block to poll deals
#[marine]
pub fn poll_deal_changed(net: String, deal_id: String, from_block: String) -> DealChangedResult {
pub fn poll_deal_changed(api_endpoint: String, deal_id: String, from_block: String) -> DealChangedResult {
if let Err(err) = check_url(&api_endpoint) {
return DealChangedResult::error(deal_id, err.to_string());
}

let address = format!("0x{}", deal_id);
let to_block = get_to_block(&from_block);
let result = poll(
net,
api_endpoint,
address,
from_block,
to_block.clone(),
Expand Down Expand Up @@ -375,15 +345,12 @@ impl DealsUpdatedBatchResult {

#[marine]
pub fn poll_deals_latest_update_batch(
net: String,
api_endpoint: String,
deals: Vec<DealUpdate>,
) -> DealsUpdatedBatchResult {
let url = match get_url(&net) {
None => {
return DealsUpdatedBatchResult::error(Error::NetworkTypeError(net).to_string());
}
Some(url) => url,
};
if let Err(err) = check_url(&api_endpoint) {
return DealsUpdatedBatchResult::error(err.to_string());
}

if deals.is_empty() {
return DealsUpdatedBatchResult::ok(Vec::new());
Expand All @@ -405,7 +372,7 @@ pub fn poll_deals_latest_update_batch(
req.to_jsonrpc(idx as u32)
})
.collect::<Vec<_>>();
let result = get_logs_batch(url, reqs);
let result = get_logs_batch(api_endpoint, reqs);
match result {
Err(err) => {
return DealsUpdatedBatchResult::error(err.to_string());
Expand Down Expand Up @@ -442,21 +409,14 @@ pub fn poll_deals_latest_update_batch(
}

fn poll(
net: String,
api_endpoint: String,
address: String,
from_block: String,
to_block: String,
topic: String,
) -> Result<Vec<GetLogsResp>, Error> {
let url = match get_url(&net) {
None => {
return Err(Error::NetworkTypeError(net));
}
Some(url) => url,
};

log::debug!("sending request to {}", url);
let value = get_logs(url, address, vec![topic], from_block, to_block)?;
log::debug!("sending request to {}", api_endpoint);
let value = get_logs(api_endpoint, address, vec![topic], from_block, to_block)?;
log::debug!("request result: {:?}", value);
let deals = value.get_result()?;
Ok(deals)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use marine_rs_sdk::marine;
use marine_rs_sdk::MountedBinaryResult;
use serde_json::json;
use thiserror::Error;
use url::Url;

#[derive(Debug, Error)]
pub enum RequestError {
Expand All @@ -14,6 +15,12 @@ pub enum RequestError {
ParseError(serde_json::Error, String),
#[error("error occured with `curl`: {0}")]
OtherError(String),
#[error("invalid URL: {0}")]
ParseUrlError(#[source] url::ParseError),
}

pub fn check_url(url: &str) -> Result<(), RequestError> {
Url::parse(url).map_err(RequestError::ParseUrlError).map(|_| ())
}

pub fn get_block_number(url: String) -> Result<JsonRpcResp<String>, RequestError> {
Expand Down Expand Up @@ -108,7 +115,7 @@ pub fn get_logs(
fn curl_params(url: String, data: String) -> Vec<String> {
let params = vec![
url.as_str(),
// To avoid unneccessary data in stderr
// To avoid unnecessary data in stderr
"--no-progress-meter",
"-X", "POST",
"-H", "Content-Type: application/json",
Expand Down

0 comments on commit 75aea7e

Please sign in to comment.