Skip to content

Commit

Permalink
Merge branch 'main' into feature/net-739
Browse files Browse the repository at this point in the history
  • Loading branch information
folex authored Feb 19, 2024
2 parents b9461b4 + 980348f commit bd3f1c4
Show file tree
Hide file tree
Showing 8 changed files with 34 additions and 30 deletions.
4 changes: 2 additions & 2 deletions .fluence/aqua/services.aqua
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ data DealCreatedResult:
data Match:
compute_peer: string
deal_id: string
pat_id: []u8
unit_id: []u8
deal_creation_block: U256
app_cid: string

Expand Down Expand Up @@ -177,5 +177,5 @@ service ChainConnector("chain_connector"):
poll_deal_created(api_endpoint: string, address: string, left_boundary: string) -> DealCreatedResult
poll_deal_matches(chain: ChainInfo, left_boundary: string) -> MatchedResult
poll_deal_peer_removed_batch(api_endpoint: string, deals: []DealPeerRemovedReq) -> DealPeerRemovedBatchResult
register_worker(pat_id: []u8, worker_id: string, chain: ChainInfo, deal_addr: string) -> RegisterWorkerResult
register_worker(unit_id: []u8, worker_id: string, chain: ChainInfo, deal_addr: string) -> RegisterWorkerResult
resolve_subnet(deal_id: string, api_endpoint: string) -> Subnet
1 change: 1 addition & 0 deletions .github/workflows/e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ jobs:

uses: fluencelabs/nox/.github/workflows/build.yml@master
with:
ref: update-subnet-resolve
cargo-dependencies: |
[
{
Expand Down
5 changes: 4 additions & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,10 @@ jobs:
- name: Setup nextest
uses: taiki-e/install-action@nextest

- name: Run cargo nextest
- name: Run cargo Decider Unit Tests
run: cargo nextest run --release --all-features --profile ci

- name: Run cargo Decider Distro Tests
working-directory: src/tests/decider-distro-tests-rs
run: cargo nextest run --release --all-features --profile ci

Expand Down
2 changes: 1 addition & 1 deletion src/aqua/chain/new_deals.aqua
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func join_deals(chain: ChainInfo, spell_id: SpellId, logs: []DealMatched, left:
else:
-- At this point, if worker registration return is_ok=false, it did all it could do
-- so here we move forward anyway
is_ok <- register_worker(chain, spell_id, deal_id, worker_id!, log.info.pat_id)
is_ok <- register_worker(chain, spell_id, deal_id, worker_id!, log.info.unit_id)
if is_ok:
store_deal(spell_id, deal_id, worker_id!, log.block_number)
deal_ids <<- deal_id
Expand Down
6 changes: 3 additions & 3 deletions src/aqua/decider/register_worker.aqua
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func remove_tx_status(spell_id: SpellId, status: WorkerTxStatus):
-- TODO:
-- Store joined worker in KV, status = unregistered
-- Immediately go and register
-- Update worker status in KV with status = registered and pat_id = xxx
-- Update worker status in KV with status = registered and unit_id = xxx
-- On start, first go through unregistered workers

-- How to store workers?
Expand All @@ -56,8 +56,8 @@ func remove_tx_status(spell_id: SpellId, status: WorkerTxStatus):
-- If TTL happens during registration, last_seen_block won't be incremented,
-- so we'll load same Chain Events and repeat the whole deployment and registration once again.
-- Remember to track registration failures though, and retry them
func register_worker(chain: ChainInfo, spell_id: SpellId, deal_id: DealId, worker_id: WorkerId, pat_id: []u8) -> bool:
result <- ChainConnector.register_worker(pat_id, worker_id, chain, deal_id)
func register_worker(chain: ChainInfo, spell_id: SpellId, deal_id: DealId, worker_id: WorkerId, unit_id: []u8) -> bool:
result <- ChainConnector.register_worker(unit_id, worker_id, chain, deal_id)
is_ok = result.success
if result.success:
deal_log(spell_id, deal_id, ["registered worker tx_hash", result.tx_hash])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ use crate::peer_id::parse_peer_id;
/// bytes32 hash;
/// }
///
/// event ComputePeerMatched(
/// bytes32 indexed peerId
/// event ComputeUnitMatched(
/// bytes32 indexed peerId,
/// address deal
/// bytes32 patId
/// uint dealCreationBlock
/// bytes32 unitId,
/// uint256 dealCreationBlock,
/// CIDV1 appCID
/// );
/// ```
Expand All @@ -31,7 +31,7 @@ use crate::peer_id::parse_peer_id;
pub struct Match {
compute_peer: String,
deal_id: String,
pat_id: Vec<u8>,
unit_id: Vec<u8>,
deal_creation_block: U256,
app_cid: String,
}
Expand All @@ -44,7 +44,7 @@ pub struct DealMatched {
}

impl DealMatched {
pub const EVENT_NAME: &'static str = "ComputePeerMatched";
pub const EVENT_NAME: &'static str = "ComputeUnitMatched";
}

impl ChainData for Match {
Expand All @@ -58,7 +58,7 @@ impl ChainData for Match {
Indexed(ParamType::FixedBytes(32)),
// deal
NotIndexed(ParamType::Address),
// pat_id
// unit_id
NotIndexed(ParamType::FixedBytes(32)),
// deal_creation_block
NotIndexed(ParamType::Uint(256)),
Expand All @@ -80,7 +80,7 @@ impl ChainData for Match {
let compute_peer = parse_peer_id(compute_peer)?.to_string();

let deal = next_opt(tokens, "deal", Token::into_address)?;
let pat_id = next_opt(tokens, "pat_id", Token::into_fixed_bytes)?;
let unit_id = next_opt(tokens, "unit_id", Token::into_fixed_bytes)?;
let deal_creation_block = next_opt(tokens, "deal_creation_block", U256::from_token)?;

let app_cid = &mut next_opt(tokens, "app_cid", Token::into_tuple)?.into_iter();
Expand All @@ -92,7 +92,7 @@ impl ChainData for Match {
Ok(Match {
compute_peer,
deal_id: format!("{deal:#x}"),
pat_id,
unit_id,
deal_creation_block,
app_cid,
})
Expand Down Expand Up @@ -200,7 +200,7 @@ mod tests {
m.deal_id.to_lowercase(),
"0xFfA0611a099AB68AD7C3C67B4cA5bbBEE7a58B99".to_lowercase()
);
//assert_eq!(m.pat_id.len(), 3);
//assert_eq!(m.unit_id.len(), 3);
assert_eq!(m.deal_creation_block.to_eth().as_u32(), 80);
assert_eq!(
m.app_cid.to_string(),
Expand All @@ -219,7 +219,7 @@ mod tests {
m.deal_id.to_lowercase(),
"0x67b2AD3866429282e16e55B715d12A77F85B7CE8".to_lowercase()
);
//assert_eq!(m.pat_id.len(), 3);
//assert_eq!(m.unit_id.len(), 3);
assert_eq!(m.deal_creation_block.to_eth().as_u32(), 86);
assert_eq!(
m.app_cid.to_string(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ impl RegisterWorkerResult {

#[marine]
pub fn register_worker(
pat_id: Vec<u8>,
unit_id: Vec<u8>,
worker_id: &str,
chain: ChainInfo,
deal_addr: &str,
Expand All @@ -76,7 +76,7 @@ pub fn register_worker(

let r: Result<_, RegisterWorkerError> = try {
let key = parse_wallet_key(&chain.wallet_key)?;
let input = encode_call(pat_id, worker_id)?;
let input = encode_call(unit_id, worker_id)?;
let nonce = load_nonce(key.to_address(), endpoint)?;
let gas_price = get_gas_price(endpoint)?;
let tx = make_tx(input, key, gas, nonce, gas_price, deal_addr, network_id)?;
Expand Down Expand Up @@ -143,15 +143,15 @@ fn function() -> Function {
}

/// Encode `setWorker` call to bytes
fn encode_call(pat_id: Vec<u8>, worker_id: &str) -> Result<Vec<u8>, RegisterWorkerError> {
// let pat_id = decode_hex(pat_id).map_err(|e| EncodeArgument(e, "pat_id"))?;
let pat_id = Token::FixedBytes(pat_id);
fn encode_call(unit_id: Vec<u8>, worker_id: &str) -> Result<Vec<u8>, RegisterWorkerError> {
// let unit_id = decode_hex(unit_id).map_err(|e| EncodeArgument(e, "unit_id"))?;
let unit_id = Token::FixedBytes(unit_id);

let worker_id = PeerId::from_str(worker_id).map_err(|e| InvalidWorkerId(e, "worker_id"))?;
let worker_id = serialize_peer_id(worker_id);
let worker_id = Token::FixedBytes(worker_id);

let input = function().encode_input(&[pat_id, worker_id])?;
let input = function().encode_input(&[unit_id, worker_id])?;
Ok(input)
}

Expand Down Expand Up @@ -230,7 +230,7 @@ mod tests {
use crate::hex::decode_hex;
use crate::jsonrpc::register_worker::{encode_call, function};

fn pat_id() -> Vec<u8> {
fn unit_id() -> Vec<u8> {
decode_hex("0xe532c726aa9c2f223fb21b5a488f874583e809257685ac3c40c9e0f7c89c082e")
.expect("decode pat id from hex")
}
Expand All @@ -246,7 +246,7 @@ mod tests {
let signature_hex = hex::encode(f.short_signature());
assert_eq!(signature_hex, "d5053ab0");

let input = encode_call(pat_id(), WORKER_ID).expect("encode call");
let input = encode_call(unit_id(), WORKER_ID).expect("encode call");
let input = hex::encode(input);

assert_eq!(input, "d5053ab0e532c726aa9c2f223fb21b5a488f874583e809257685ac3c40c9e0f7c89c082e529d4dabfa72abfd83c48adca7a2d49a921fa7351689d12e2a6c68375052f0b5");
Expand Down Expand Up @@ -322,7 +322,7 @@ mod tests {
tetraplets: vec![],
};
let result = connector.register_worker_cp(
pat_id().into(),
unit_id().into(),
WORKER_ID.into(),
chain,
"0x6328bb918a01603adc91eae689b848a9ecaef26d".into(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ fn decode_compute_units(data: String) -> Result<Vec<JoinedWorker>, ResolveSubnet
let tuple = token.into_tuple().ok_or(InvalidParsedToken("tuple"))?;
let mut tuple = tuple.into_iter();

let pat_id = next_opt(&mut tuple, "pat_id", Token::into_fixed_bytes)?;
let pat_id = hex::encode(pat_id);
let unit_id = next_opt(&mut tuple, "unit_id", Token::into_fixed_bytes)?;
let unit_id = hex::encode(unit_id);

let peer_id = next_opt(&mut tuple, "compute_peer_id", Token::into_fixed_bytes)?;
let peer_id = parse_peer_id(peer_id).map_err(|e| InvalidPeerId(e, "compute_peer_id"))?;
Expand All @@ -102,7 +102,7 @@ fn decode_compute_units(data: String) -> Result<Vec<JoinedWorker>, ResolveSubnet
};

let pat = JoinedWorker {
cu_id: format!("0x{}", pat_id),
cu_id: format!("0x{}", unit_id),
host_id: peer_id.to_string(),
worker_id,
};
Expand Down

0 comments on commit bd3f1c4

Please sign in to comment.