Skip to content
This repository has been archived by the owner on Feb 21, 2024. It is now read-only.

Commit

Permalink
Merge pull request paritytech#132 from subspace/farmer-rpc-server
Browse files Browse the repository at this point in the history
Farmer RPC server
  • Loading branch information
nazar-pc authored Nov 15, 2021
2 parents bac0fa5 + a34fed3 commit 6590519
Show file tree
Hide file tree
Showing 13 changed files with 227 additions and 76 deletions.
98 changes: 81 additions & 17 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/subspace-farmer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ env_logger = "0.9.0"
futures = "0.3.13"
hex = "0.4.3"
indicatif = "0.16.2"
jsonrpsee = { version = "0.3.1", features = ["client", "types"] }
jsonrpsee = { version = "0.4.1", features = ["client", "macros", "server"] }
log = "0.4.14"
lru = "0.6.6"
parity-scale-codec = "2.3.0"
Expand Down
39 changes: 28 additions & 11 deletions crates/subspace-farmer/src/commands/farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,25 @@ use crate::object_mappings::ObjectMappings;
use crate::plot::Plot;
use crate::plotting::Plotting;
use crate::ws_rpc::WsRpc;
use crate::ws_rpc_server::{RpcServer, RpcServerImpl};
use anyhow::{anyhow, Result};
use jsonrpsee::ws_server::WsServerBuilder;
use log::info;
use std::net::SocketAddr;
use std::path::PathBuf;
use subspace_solving::SubspaceCodec;

/// Start farming by using plot in specified path and connecting to WebSocket server at specified
/// address.
pub(crate) async fn farm(base_directory: PathBuf, ws_server: &str) -> Result<(), anyhow::Error> {
pub(crate) async fn farm(
base_directory: PathBuf,
node_rpc_url: &str,
ws_server_listen_addr: SocketAddr,
) -> Result<(), anyhow::Error> {
// TODO: This doesn't account for the fact that node can
// have a completely different history to what farmer expects
info!("Opening plot");
let plot = Plot::open_or_create(&base_directory.clone().into()).await?;
let plot = Plot::open_or_create(&base_directory).await?;

info!("Opening commitments");
let commitments = Commitments::new(base_directory.join("commitments").into()).await?;
Expand All @@ -27,21 +35,30 @@ pub(crate) async fn farm(base_directory: PathBuf, ws_server: &str) -> Result<(),
})
.await??;

info!("Connecting to RPC server: {}", ws_server);
let client = WsRpc::new(ws_server).await?;
info!("Connecting to node at {}", node_rpc_url);
let client = WsRpc::new(node_rpc_url).await?;

let identity = Identity::open_or_create(&base_directory)?;

let subspace_codec = SubspaceCodec::new(&identity.public_key());

// Start RPC server
let ws_server = WsServerBuilder::default()
.build(ws_server_listen_addr)
.await?;
let ws_server_addr = ws_server.local_addr()?;
let rpc_server = RpcServerImpl::new(plot.clone(), subspace_codec);
let _stop_handle = ws_server.start(rpc_server.into_rpc())?;

info!("WS RPC server listening on {}", ws_server_addr);

// start the farming task
let farming_instance = Farming::start(
plot.clone(),
commitments.clone(),
client.clone(),
identity.clone(),
);
let farming_instance =
Farming::start(plot.clone(), commitments.clone(), client.clone(), identity);

// start the background plotting
let plotting_instance = Plotting::start(plot, commitments, object_mappings, client, identity);
let plotting_instance =
Plotting::start(plot, commitments, object_mappings, client, subspace_codec);

tokio::select! {
res = plotting_instance.wait() => if let Err(error) = res {
Expand Down
8 changes: 2 additions & 6 deletions crates/subspace-farmer/src/commitments/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@ async fn create() {
let solution_range = u64::from_be_bytes([0xff_u8, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff]);
let index = 0;

let plot = Plot::open_or_create(&base_directory.path().to_path_buf().into())
.await
.unwrap();
let plot = Plot::open_or_create(&base_directory).await.unwrap();
let commitments = Commitments::new(base_directory.path().join("commitments").into())
.await
.unwrap();
Expand All @@ -45,9 +43,7 @@ async fn find_by_tag() {
let base_directory = TempDir::new().unwrap();
let salt: Salt = [1u8; 8];

let plot = Plot::open_or_create(&base_directory.path().to_path_buf().into())
.await
.unwrap();
let plot = Plot::open_or_create(&base_directory).await.unwrap();
let commitments = Commitments::new(base_directory.path().join("commitments").into())
.await
.unwrap();
Expand Down
4 changes: 2 additions & 2 deletions crates/subspace-farmer/src/identity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ pub struct Identity {
}

impl Identity {
pub fn open_or_create(path: &Path) -> Result<Identity, Error> {
let identity_file = path.join("identity.bin");
pub fn open_or_create<B: AsRef<Path>>(base_directory: B) -> Result<Identity, Error> {
let identity_file = base_directory.as_ref().join("identity.bin");
let keypair = if identity_file.exists() {
info!("Opening existing keypair"); // TODO: turn this into a channel
Keypair::from_bytes(&fs::read(identity_file)?).map_err(Error::msg)?
Expand Down
2 changes: 2 additions & 0 deletions crates/subspace-farmer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ pub(crate) mod plot;
pub(crate) mod plotting;
pub(crate) mod rpc;
pub(crate) mod ws_rpc;
pub mod ws_rpc_server;

pub use commitments::{CommitmentError, Commitments};
pub use farming::Farming;
pub use identity::Identity;
pub use jsonrpsee;
pub use object_mappings::{ObjectMappingError, ObjectMappings};
pub use plot::{Plot, PlotError};
pub use plotting::Plotting;
Expand Down
18 changes: 12 additions & 6 deletions crates/subspace-farmer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@ mod plotting;
mod rpc;
mod utils;
mod ws_rpc;
mod ws_rpc_server;

use anyhow::Result;
use clap::{Parser, ValueHint};
use env_logger::Env;
use log::info;
use std::fs;
use std::net::SocketAddr;
use std::path::{Path, PathBuf};

// TODO: Separate commands for erasing the plot and wiping everything
Expand All @@ -46,12 +48,15 @@ enum Command {
},
/// Start a farmer using previously created plot
Farm {
/// Use custom path for data storage instead of platform-specific default
/// Custom path for data storage instead of platform-specific default
#[clap(long, value_hint = ValueHint::FilePath)]
custom_path: Option<PathBuf>,
/// Specify WebSocket RPC server TCP port
#[clap(long, default_value = "ws://127.0.0.1:9944")]
ws_server: String,
/// WebSocket RPC URL of the Subspace node to connect to
#[clap(long, value_hint = ValueHint::Url, default_value = "ws://127.0.0.1:9944")]
node_rpc_url: String,
/// Host and port where built-in WebSocket RPC server should listen for incoming connections
#[clap(long, default_value = "127.0.0.1:9955")]
ws_server_listen_addr: SocketAddr,
},
}

Expand Down Expand Up @@ -87,10 +92,11 @@ async fn main() -> Result<()> {
}
Command::Farm {
custom_path,
ws_server,
node_rpc_url,
ws_server_listen_addr,
} => {
let path = utils::get_path(custom_path);
commands::farm(path, &ws_server).await?;
commands::farm(path, &node_rpc_url, ws_server_listen_addr).await?;
}
}
Ok(())
Expand Down
8 changes: 4 additions & 4 deletions crates/subspace-farmer/src/plot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
mod tests;

use async_std::fs::OpenOptions;
use async_std::path::PathBuf;
use futures::channel::mpsc as async_mpsc;
use futures::channel::oneshot;
use futures::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SinkExt, StreamExt};
use log::error;
use rocksdb::DB;
use std::io;
use std::io::SeekFrom;
use std::path::Path;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Weak};
use subspace_core_primitives::{Piece, RootBlock, PIECE_SIZE};
Expand Down Expand Up @@ -70,12 +70,12 @@ pub struct Plot {

impl Plot {
/// Creates a new plot for persisting encoded pieces to disk
pub async fn open_or_create(base_directory: &PathBuf) -> Result<Plot, PlotError> {
pub async fn open_or_create<B: AsRef<Path>>(base_directory: B) -> Result<Plot, PlotError> {
let mut plot_file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(base_directory.join("plot.bin"))
.open(base_directory.as_ref().join("plot.bin"))
.await
.map_err(PlotError::PlotOpen)?;

Expand All @@ -88,7 +88,7 @@ impl Plot {
let piece_count = Arc::new(AtomicU64::new(plot_size / PIECE_SIZE as u64));

let plot_metadata_db = tokio::task::spawn_blocking({
let path = base_directory.join("plot-metadata");
let path = base_directory.as_ref().join("plot-metadata");

move || DB::open_default(path)
})
Expand Down
Loading

0 comments on commit 6590519

Please sign in to comment.