Skip to content

Commit

Permalink
Merge pull request #23 from near/daniyar/no-actors
Browse files Browse the repository at this point in the history
feat: move away from ractor
  • Loading branch information
volovyks authored Apr 5, 2023
2 parents e0fb307 + 99cb4ba commit 1acafab
Show file tree
Hide file tree
Showing 10 changed files with 404 additions and 864 deletions.
386 changes: 2 additions & 384 deletions Cargo.lock

Large diffs are not rendered by default.

162 changes: 86 additions & 76 deletions integration-tests/tests/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ use bollard::network::CreateNetworkOptions;
use bollard::service::{HostConfig, Ipam, PortBinding};
use bollard::Docker;
use futures::StreamExt;
use hyper::{body::HttpBody, Body, Client, Method, Request};
use hyper::{Body, Client, Method, Request};
use mpc_recovery::msg::LeaderResponse;
use rand::{distributions::Alphanumeric, Rng};
use serde_json::json;
use std::collections::HashMap;
use std::convert::TryInto;
use std::time::Duration;
use threshold_crypto::{serde_impl::SerdeSecret, PublicKeySet, SecretKeyShare, Signature};
use threshold_crypto::{serde_impl::SerdeSecret, PublicKeySet, SecretKeyShare};
use tokio::io::AsyncWriteExt;
use tokio::spawn;

Expand Down Expand Up @@ -45,44 +45,22 @@ async fn continuously_print_docker_output(docker: &Docker, id: &str) -> anyhow::

async fn start_mpc_node(
docker: &Docker,
node_id: u64,
pk_set: &PublicKeySet,
sk_share: &SecretKeyShare,
actor_address: Option<String>,
) -> anyhow::Result<(String, String, String)> {
let actor_port = portpicker::pick_unused_port().expect("no free ports");
let web_port = portpicker::pick_unused_port().expect("no free ports");

let empty = HashMap::<(), ()>::new();
cmd: Vec<String>,
web_port: u16,
expose_web_port: bool,
) -> anyhow::Result<(String, String)> {
let mut exposed_ports = HashMap::new();
exposed_ports.insert(format!("{actor_port}/tcp"), empty.clone());
exposed_ports.insert(format!("{web_port}/tcp"), empty);
let mut port_bindings = HashMap::new();
port_bindings.insert(
format!("{actor_port}/tcp"),
Some(vec![PortBinding {
host_ip: None,
host_port: Some(actor_port.to_string()),
}]),
);
port_bindings.insert(
format!("{web_port}/tcp"),
Some(vec![PortBinding {
host_ip: None,
host_port: Some(web_port.to_string()),
}]),
);

let mut cmd = vec![
"start".to_string(),
node_id.to_string(),
serde_json::to_string(&pk_set)?,
serde_json::to_string(&SerdeSecret(sk_share))?,
actor_port.to_string(),
web_port.to_string(),
];
if let Some(actor_address) = actor_address {
cmd.push(actor_address);
if expose_web_port {
let empty = HashMap::<(), ()>::new();
exposed_ports.insert(format!("{web_port}/tcp"), empty);
port_bindings.insert(
format!("{web_port}/tcp"),
Some(vec![PortBinding {
host_ip: None,
host_port: Some(web_port.to_string()),
}]),
);
}

let mpc_recovery_config = Config {
Expand Down Expand Up @@ -123,11 +101,60 @@ async fn start_mpc_node(
.ip_address
.unwrap();

Ok((
id,
format!("{ip_address}:{actor_port}"),
format!("localhost:{web_port}"),
))
Ok((id, ip_address))
}

async fn start_mpc_leader_node(
docker: &Docker,
node_id: u64,
pk_set: &PublicKeySet,
sk_share: &SecretKeyShare,
sign_nodes: Vec<String>,
) -> anyhow::Result<String> {
let web_port = portpicker::pick_unused_port().expect("no free ports");

let mut cmd = vec![
"start-leader".to_string(),
node_id.to_string(),
"--pk-set".to_string(),
serde_json::to_string(&pk_set)?,
"--sk-share".to_string(),
serde_json::to_string(&SerdeSecret(sk_share))?,
"--web-port".to_string(),
web_port.to_string(),
];
for sign_node in sign_nodes {
cmd.push("--sign-nodes".to_string());
cmd.push(sign_node);
}

start_mpc_node(docker, cmd, web_port, true).await?;
// exposed host address
Ok(format!("http://localhost:{web_port}"))
}

async fn start_mpc_sign_node(
docker: &Docker,
node_id: u64,
pk_set: &PublicKeySet,
sk_share: &SecretKeyShare,
) -> anyhow::Result<String> {
let web_port = portpicker::pick_unused_port().expect("no free ports");

let cmd = vec![
"start-sign".to_string(),
node_id.to_string(),
"--pk-set".to_string(),
serde_json::to_string(&pk_set)?,
"--sk-share".to_string(),
serde_json::to_string(&SerdeSecret(sk_share))?,
"--web-port".to_string(),
web_port.to_string(),
];

let (_, ip_address) = start_mpc_node(docker, cmd, web_port, false).await?;
// internal network address
Ok(format!("http://{ip_address}:{web_port}"))
}

async fn create_network(docker: &Docker) -> anyhow::Result<()> {
Expand Down Expand Up @@ -167,56 +194,39 @@ async fn test_trio() -> anyhow::Result<()> {
// but only instantiates 3 nodes.
let (pk_set, sk_shares) = mpc_recovery::generate(4, 3)?;

let mut actor_addresses = Vec::new();
let mut web_ports = Vec::new();
for (i, sk_share) in sk_shares.into_iter().enumerate().take(3) {
let (_id, actor_address, web_port) = start_mpc_node(
&docker,
(i + 1) as u64,
&pk_set,
&sk_share,
actor_addresses.first().cloned(),
)
.await?;
actor_addresses.push(actor_address);
web_ports.push(web_port);
let mut sign_nodes = Vec::new();
for i in 2..=3 {
let addr = start_mpc_sign_node(&docker, i as u64, &pk_set, &sk_shares[i - 1]).await?;
sign_nodes.push(addr);
}
let leader_node = start_mpc_leader_node(&docker, 1, &pk_set, &sk_shares[0], sign_nodes).await?;

// Wait until all nodes initialize
tokio::time::sleep(Duration::from_millis(2000)).await;

// TODO: only leader node works for now, other nodes struggling to connect to each other
// for some reason.
let web_port = &web_ports[0];
let payload: String = rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(10)
.map(char::from)
.collect();
let req = Request::builder()
.method(Method::POST)
.uri(format!("http://{}/submit", web_port))
.uri(format!("{}/submit", leader_node))
.header("content-type", "application/json")
.body(Body::from(json!({ "payload": payload }).to_string()))?;

let client = Client::new();
let mut resp = client.request(req).await?;
let response = client.request(req).await?;

assert_eq!(resp.status(), 200);
assert_eq!(response.status(), 200);

let data = resp.body_mut().data().await.expect("no response body")?;
let response_body: String = serde_json::from_slice(&data)?;
let signature_bytes = hex::decode(response_body)?;
let signature_array: [u8; 96] = signature_bytes.as_slice().try_into().map_err(|_e| {
anyhow::anyhow!(
"signature has incorrect length: expected 96 bytes, but got {}",
signature_bytes.len()
)
})?;
let signature = Signature::from_bytes(signature_array)
.map_err(|e| anyhow::anyhow!("malformed signature: {}", e))?;

assert!(pk_set.public_key().verify(&signature, payload));
let data = hyper::body::to_bytes(response).await?;
let response: LeaderResponse = serde_json::from_slice(&data)?;
if let LeaderResponse::Ok { signature } = response {
assert!(pk_set.public_key().verify(&signature, payload));
} else {
panic!("response was not successful");
}

Ok(())
}
3 changes: 1 addition & 2 deletions mpc-recovery/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ axum = "0.6"
clap = { version = "4.2", features = ["derive"] }
futures = "0.3"
hex = "0.4"
ractor = { version = "0.7", features = ["cluster"] }
ractor_cluster = "0.7"
hyper = { version = "0.14", features = ["full"] }
actix-rt = "2.8"
threshold_crypto = "0.4.0"
rand = "0.7"
Expand Down
Loading

0 comments on commit 1acafab

Please sign in to comment.