diff --git a/.gitignore b/.gitignore index 088ba6ba..52650aac 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,4 @@ Cargo.lock # These are backup files generated by rustfmt **/*.rs.bk +*~ diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 00000000..edf16c3d --- /dev/null +++ b/.travis.yml @@ -0,0 +1,17 @@ +language: rust + +rust: + - stable + - beta + - nightly + +matrix: + allow_failures: + - rust: nightly + fast_finish: true + +cache: cargo + +script: + - cargo build --verbose --all + - cargo test --verbose --all diff --git a/ARCH.md b/ARCH.md new file mode 100644 index 00000000..604c70a4 --- /dev/null +++ b/ARCH.md @@ -0,0 +1,252 @@ +The LDK sample is broken between 2 primary process : `ldk-node` and `ldk-cli`. + +The `ldk-cli` is a simple JSON-RPC client to interact with the node. + +The `ldk-node` is a multi-threaded process embedding all the main components of a regular Lightning +node : +* `MicroSPVClient`, a chain validation backend providing blocks updates, utxo filtering and utxo set access +* `ChainMonitor`, a chain processing entity enforcing onchain the channel logic negotiated offchain +* `EventHandler`, a default handler informing the client logic about internal events +* `ChannelManager`, a core lightning state machine, supervising channel lifecycle and payment initiation/relay +* `NetGraphMsgHandler`, a lightning router, handling gossips updates and providing payment path draws +* `PeerManager`, a lightning networking stack, managing peer connections lifecycle +* `setup_socket_listener`, a simple network handler to listen incoming connections +* `setup_rpc_server`, a simple RPC server handler to listen to `ldk-cli` + +All those components are running in their own threads and interact through inter-threading +handlers powered by `tokio::sync::mpsc`. + +# A Gentle Guided Tour of LDK-sample components + +The initial step is to load state from previous node session and user config file. Note the sample +config file LdkConfig is wider than RL's UserConfig as it scopes daemon-specific configuration +variables (`bitcoind` RPC port, bitcoin network, ...) + +``` + let daemon_dir = if env::args().len() > 2 { + let path_dir = env::args().skip(2).next().unwrap(); + let path = if fs::metadata(&path_dir).unwrap().is_dir() { + path_dir + "/ldk-node.conf" + } else { exit_on!("Need daemon directory to exist and be a directory"); }; + path + } else { + let mut path = if let Some(path) = env::home_dir() { + path + } else { exit_on!("No home directory found"); }; + path.push(".ldk-node/ldk-node.conf"); + String::from_str(path.to_str().unwrap()).unwrap() + }; + + let ldk_config = LdkConfig::create(&daemon_dir); +``` + +The second step is establishing a connection with your Bitcoin validation backend. It should +always be recalled that your Lightning node must keep a lively view of the chain to ensure +security and well-accuracy of its operations. Note, you should also have permanent access to +the tx-relay p2p network and a trustworhty fee-estimation. For the sample, we opted to rely on +Bitcoin Core RPC interface, but other options such as Electrum or BIP157 can be envisioned. + +``` + let bitcoind_client = Arc::new(RpcClient::new(&ldk_config.get_str("bitcoind_credentials"), &ldk_config.get_str("bitcoind_hostport"))); + + let network = if let Ok(net) = ldk_config.get_network() { + net + } else { exit_on!("No network found in config file"); }; + + log_sample!(logger, "Checking validity of RPC URL to bitcoind..."); + if let Ok(v) = bitcoind_client.make_rpc_call("getblockchaininfo", &[], false).await { + assert!(v["verificationprogress"].as_f64().unwrap() > 0.99); + assert!( + v["bip9_softforks"]["segwit"]["status"].as_str() == Some("active") || + v["softforks"]["segwit"]["type"].as_str() == Some("buried")); + let bitcoind_net = match v["chain"].as_str().unwrap() { + "main" => constants::Network::Bitcoin, + "test" => constants::Network::Testnet, + "regtest" => constants::Network::Regtest, + _ => panic!("Unknown network type"), + }; + if !(network == bitcoind_net) { exit_on!("Divergent network between LDK node and bitcoind"); } + } else { exit_on!("Failed to connect to bitcoind RPC server, check your `bitcoind_hostport`/`bitcoind_credentials` settings"); } + +``` + +The third step is the initialization or loading of the LN-specific key material. Ideally this material +should be encrypted and replicated to increase security and fault-tolerance of your node. We're +using the default LDK key-management sample `KeysManager`). + +``` + let secp_ctx = Secp256k1::new(); + + let our_node_seed = if let Ok(seed) = fs::read(data_path.clone() + "/key_seed") { + assert_eq!(seed.len(), 32); + let mut key = [0; 32]; + key.copy_from_slice(&seed); + key + } else { + let mut key = [0; 32]; + thread_rng().fill_bytes(&mut key); + let mut f = fs::File::create(data_path.clone() + "/key_seed").unwrap(); + f.write_all(&key).expect("Failed to write seed to disk"); + f.sync_all().expect("Failed to sync seed to disk"); + key + }; + let cur = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap(); + let keys = Arc::new(KeysManager::new(&our_node_seed, network, cur.as_secs(), cur.subsec_nanos())); +``` + +We previously established our connection through RPC with a Bitcoin Core. In the fourth step, +we'll use this RPC access to boostrap our chain backend client `MicroSPVClient`. This entity +is providing block connections/disconnections through the `ChainListener` interface +(implem `ChainConnector`, a subcomponent of `MicroSPVClient`). + +Current consumers of those chain updates are `ChannelManager`/`ChainMonitor`/`NetworkGraphHandler`. +Requirements of these components will be detailed in further steps. + +``` + log_sample!(logger, "Starting chain backend thread..."); + + let (outbound_blocks_chan_manager, inbound_blocks_chan_manager) = mpsc::channel(100); + + let buffer_blocks = Arc::new(ChainConnector::new()); + let chain_listener = buffer_blocks.clone(); + + let (handle_1, handle_2) = setup_chain_backend(starting_blockhash.unwrap(), (ldk_config.get_str("bitcoind_credentials"), ldk_config.get_str("bitcoind_hostport")), buffer_blocks, chain_listener, outbound_blocks_chan_manager).await; + join_handles.push(handle_1); + join_handles.push(handle_2); +``` + +The fifth step is starting the chain processing thread. Onchain events to care about are numerous +and spread from revoked counterparty commitment to punish to monitoring channel outputs belonging to +us until they're reorg-safe to spend. The LDK sample is relying on the default RL implementation +`ChainMonitor`. This component relies on `MicroSPVClient` through the `chain::Filter` to register +UTXO to watch, `BroadcasterInterface` to propagate "reactive" transactions and `FeeEstimator` to +vet those transactions with a good feerate. + +This component is critical for the safety of the Lightning node and its state should be dutifully +persisted and replicated. It will serves the `ChannelManager` as a `chain::Watch` interface. +TODO: code a connector linking ChainMonitor to ChannelManager. + +``` + log_sample!(logger, "Starting chain processing..."); + + let chain_source = Arc::new(ChainSource::new()); + let handle = setup_chain_processing(chain_source.clone(), tx_broadcaster.clone(), fee_estimator.clone(), logger.clone(), persister.clone()).await; + join_handles.push(handle); +``` + +The sixth step is the warm up of an event handler thread. Current LDK sample handler is pretty +summary, and only covers notification of inbound connections. It might serve as a default endpoint +to propopagate `Events` to the client logic (e.g payment reception, peer disconnections, ...). + +It receives updates from the peer manager thread through the `InboundEventConnector`. + +``` + log_sample!(logger, "Starting event handler..."); + + let (outbound_event_peers, inbound_event_peers) = mpsc::channel(1); + let (outbound_event_chan_manager, inbound_event_chan_manager) = mpsc::channel(1); + + let inbound_event_connector = InboundEventConnector::new(inbound_event_peers, inbound_event_chan_manager); + + let handle = setup_event_handler(inbound_event_connector).await; + join_handles.push(handle); +``` + +The seventh step is the most meaningful one of the initialization sequence, setting up `ChannelManager`. +This component is driving per-channels logics, receiving updates, dispatching them, relaying HTLCs, +initiating payments or receiving ones. It will consumes block updates from the connector `ChainConnector`, +produced by `MicroSPVClient`. For per-channel key material, it requires access to a `KeysInterface` +(implem `KeysManager`). + +The LDK `ChannelManager thread is waiting RPC command on the `rpccmd` communication channel. Those +commands like `open` or `send` will trigger corresponding state updates in the channel targeted. +`MessageSendEvent` might be generated and send to the network thread through the `chanman_msg_events`/ +`peerman_notify` communications channels. + +Interactions with the network thread are bidirectional, which means that `ChannelManager` will also +consume network messages from the `netmsg` communication channel. Those messages are issued by a +connected peer and will also affect channel states. + +It's also consuming network messages through the `netmsg` communication channel. Those messages are +issued by a connected peer and will also affect channel states and might trigger response back +to the network thread. + +``` + log_sample!(logger, "Starting channel manager..."); + + let (outbound_chanman_rpccmd, inbound_chanman_rpccmd) = mpsc::channel(1); + let (outbound_chanman_netmsg, inbound_chanman_netmsg) = mpsc::channel(1); + let (outbound_chanman_msg_events, inbound_chanman_msg_events) = mpsc::channel(1); + let (outbound_peerman_notify, inbound_peerman_notify) = mpsc::channel(1); + + let outbound_chan_manager_connector = OutboundChanManagerConnector::new(outbound_chanman_msg_events, outbound_peerman_notify); + + let chain_watchdog = Arc::new(ChainWatchdog::new()); + let (handle_1, handle_2) = setup_channel_manager(inbound_chanman_rpccmd, inbound_chanman_netmsg, inbound_blocks_chan_manager, outbound_chan_manager_connector, network, fee_estimator.clone(), chain_watchdog.clone(), tx_broadcaster.clone(), logger.clone(), keys.clone(), config, 0, bitcoind_client.clone()).await; + join_handles.push(handle_1); + join_handles.push(handle_2); +``` + +To send payment, our `ChannelManager` requires access to the graph. LDK sample gossips handling +and routing processing is done by RL's default implementation `NetGraphMsgHandler`. This component +is dependent of `MicroSPVClient` to fetch and verify UTXO of announced channels. UTXO access +is provided through the `chain::Access` interface. TODO: actually do the communication channel for +this access. + +``` + log_sample!(logger, "Starting node router..."); + + let (outbound_router_rpccmd, inbound_router_rpccmd) = mpsc::channel(1); + let (outbound_router_rpcreply, inbound_router_rpcreply) = mpsc::channel(1); + let (outbound_routing_msg, inbound_routing_msg): (Sender, Receiver) = mpsc::channel(1); + + let utxo_accessor = Arc::new(UtxoWatchdog::new()); + let outbound_router_connector = OutboundRouterConnector::new(outbound_router_rpcreply); + let inbound_router_connector = InboundRouterConnector::new(inbound_router_rpccmd, inbound_router_rpcreply); + let handle = setup_router(outbound_router_connector, inbound_router_connector, utxo_accessor.clone(), logger.clone()).await; + join_handles.push(handle); +``` + +The ninth step is concerning the LN network stack, `PeerManager`. This thread handles network +encryption and messages traffic with Lightning peers. It's interacting with the RPC server through +the `peers_rpccmd` communication channel for network-related commands such as `connect`/`disconnect`. +It's also both in a role of producer/consumers w.rt to `ChannelManager`, relaying channel updates +messages (`update_add_htlc`, `commitment_signed`, ...) and consuming back `MessageSendEvent`. Note, +it will be also triggered by the inbound connection thread through the `new_inbound_connection` +callback. + +``` + log_sample!(logger, "Starting peer manager..."); + + let (outbound_peers_rpccmd, inbound_peers_rpccmd) = mpsc::channel(1); + let (outbound_socket_events, inbound_socket_events): (Sender<()>, Receiver<()>) = mpsc::channel(1); + + let outbound_peer_manager_connector = Arc::new(OutboundPeerManagerConnector::new(outbound_event_peers)); //sender_routing_msg/sender_chan_msg + let buffer_netmsg = Arc::new(BufferNetMsg::new(inbound_chanman_msg_events)); + let chan_handler = buffer_netmsg.clone(); +``` + +The tenth step is setuping a small inbound connection thread, serving as an endpoint for any +Lightning network connections initiated by our peers. It's servicing on the configured LN port +from `LdkConfig` and will delay any furhter peer management processing to the corresponding `PeerManager` +thread. + +``` + log_sample!(logger, "Starting socket listener thread..."); + + let outbound_socket_listener_connector = OutboundSocketListenerConnector::new(outbound_socket_events); // outbound_socket_events + let handle = setup_socket_listener(peer_manager_arc, outbound_socket_listener_connector, ln_port).await; + join_handles.push(handle); +``` +The last step to complete the initialization sequence is setting up a RPC server. Servicing the +`ldk-cli` binary, it will dispatch all available commands to the others components and route back +their results. Binding port is configurable through `LdkConfig`. + +``` + log_sample!(logger, "Starting rpc server thread..."); + + let outbound_rpc_server_connector = OutboundRPCServerConnector::new(outbound_peers_rpccmd, outbound_chanman_rpccmd, outbound_router_rpccmd); + let handles = setup_rpc_server(outbound_rpc_server_connector, ldk_port).await; + join_handles.push(handles); +``` + diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 00000000..644bfdab --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,44 @@ +[package] +name = "ldk-sample" +version = "0.0.1" +authors = ["Matt Corallo ", "Antoine Riard "] +build = "build.rs" +edition = "2018" + +[dependencies] +bitcoin = "0.24" +bitcoin-bech32 = "0.7" +lightning = { git = "https://github.com/ariard/rust-lightning", rev = "c0d6b44" } +lightning-net-tokio = { git = "https://github.com/ariard/rust-lightning", rev = "c0d6b44" } +lightning-block-sync = { git = "https://github.com/ariard/rust-lightning", rev = "c0d6b44", features = ["rpc-client", "rest-client"] } +lightning-invoice = { git = "https://github.com/TheBlueMatt/rust-lightning-invoice", rev = "86f1dd0" } +lightning-persister = { git = "https://github.com/ariard/rust-lightning", rev = "c0d6b44" } +hyper = "0.13" +serde = "1" +serde_json = "1" +rand = "0.4" +futures-util = "0.3" +tokio = { version = "0.2", features = ["io-std", "io-util", "rt-threaded", "tcp", "time", "sync"] } +base64 = "0.9" +time = "0.2" + +[profile.release] +panic = "abort" + +[profile.dev] +panic = "abort" + +[build-dependencies] +cc = "1.0" + +[[bin]] +name = "ldk-node" +path = "src/init.rs" + +[[bin]] +name = "ldk-cli" +path = "src/sample-cli.rs" + +[lib] +name = "ldk" +path = "src/lib.rs" diff --git a/README.md b/README.md index 0aa6fc1f..5666b8c1 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,17 @@ # ldk-sample -sample node implementation using LDK + +Simple Sample rust-lightning-based Lightning Node + +DO NOT USE IT ON MAINET, IT'LL BURN KITTENS AND LOSE YOUR MONEY !!! + +Basic documentation is coming soon, but those commands should be working + +``` +./ldk-cli connect 03f2f14bfa554d0205847fbdf46f1948cdd5fa7911c2be95e50a7263e3d126fa95 127.0.0.1 9735 + +./ldk-cli open 03f2f14bfa554d0205847fbdf46f1948cdd5fa7911c2be95e50a7263e3d126fa95 12345 6789 + +./ldk-cli send 03f2f14bfa554d0205847fbdf46f1948cdd5fa7911c2be95e50a7263e3d126fa95 157230162837504 10 +``` + + diff --git a/build.rs b/build.rs new file mode 100644 index 00000000..3f01a1a4 --- /dev/null +++ b/build.rs @@ -0,0 +1,10 @@ +extern crate cc; + +fn main() { + #[cfg(not(any(target_arch = "x86", target_arch = "x86_64", target_arch = "arm")))] + { + let mut cfg = cc::Build::new(); + cfg.file("src/rust_crypto_nonstd_arch.c"); + cfg.compile("lib_rust_crypto_nonstd_arch.a"); + } +} diff --git a/ldk-node.conf b/ldk-node.conf new file mode 100644 index 00000000..6decd11f --- /dev/null +++ b/ldk-node.conf @@ -0,0 +1,14 @@ +# Hostname:Port for JSON-RPC connections to bitcoind +bitcoind_hostport=127.0.0.1:18443 + +# Username:Password for JSON-RPC connections to bitcoind +bitcoind_credentials=bitcoinrpc:5f8a512a85ebb45be20bd36bfa0dc2f9 + +# Network +network=regtest + +#Hostname:Port for JSON-RPC LDK RPC server +ln_hostport=127.0.0.1:9730 + +#Hostname:Port for LDK RPC server +ldk_hostport=127.0.0.1:7688 diff --git a/src/init.rs b/src/init.rs new file mode 100644 index 00000000..fde7c814 --- /dev/null +++ b/src/init.rs @@ -0,0 +1,325 @@ +/// This file contains the sample initialization logic. + +#[macro_use] +mod utils; +use utils::{LdkConfig, LogPrinter}; + +use ldk::utils::RpcClient; + +mod sampled; +use sampled::*; + +use bitcoin::BlockHash; +use bitcoin::network::constants; +use bitcoin::secp256k1::key::PublicKey; +use bitcoin::secp256k1::Secp256k1; + +use bitcoin::hashes::hex::FromHex; + +use lightning_persister::FilesystemPersister; + +use lightning::chain::keysinterface::{KeysManager, KeysInterface}; +use lightning::util::events::Event; +use lightning::util::config; +use lightning::util::ser::Writer; + +use rand::{thread_rng, Rng}; + +use std::env; +use std::fs; +use std::fs::OpenOptions; +use std::io::Read; +use std::process::exit; +use std::sync::Arc; +use std::str::FromStr; +use std::thread::sleep; +use std::time::{Duration, SystemTime}; + +use tokio::sync::mpsc; +use tokio::sync::mpsc::{Receiver, Sender}; + +const FEE_PROPORTIONAL_MILLIONTHS: u32 = 10; +const ANNOUNCE_CHANNELS: bool = false; + +#[tokio::main] +async fn main() { + + // Step 1: Load data from canonical daemon directory + // + // A lightning node may persistent set of datas of different importance: + // - the channel monitor state (`/monitors`), a file critical to ensure funds safety + // - the channel state (`/channels`), a file critical to avoid channel force-closure + // - a configuration file (`/ldk-node.conf`), daemon-specific configurations options + // - a debug file (`/debug.log`) + + let daemon_dir = if env::args().len() > 2 { + let path_dir = env::args().skip(2).next().unwrap(); + let path = if fs::metadata(&path_dir).unwrap().is_dir() { + path_dir + "/ldk-node.conf" + } else { exit_on!("Need daemon directory to exist and be a directory"); }; + path + } else { + let mut path = if let Some(path) = env::home_dir() { + path + } else { exit_on!("No home directory found"); }; + path.push(".ldk-node/ldk-node.conf"); + String::from_str(path.to_str().unwrap()).unwrap() + }; + + let ldk_config = LdkConfig::create(&daemon_dir); + + //TODO: think if default values make sense + let mut config: config::UserConfig = Default::default(); + config.channel_options.fee_proportional_millionths = FEE_PROPORTIONAL_MILLIONTHS; + config.channel_options.announced_channel = ANNOUNCE_CHANNELS; + config.own_channel_config.minimum_depth = 1; + + let logger = Arc::new(LogPrinter::new("/home/user/.ldk-node/debug.log")); + + // Step 2: Start a JSON-RPC client to a Bitcoind instance + // + // A lightning node must always have access to diverse base layer functionalities: + // - a lively view of the chain, either directly connected to a full-node or as lightclient + // - a reliable tx-broadcast mechanism + // - a honest fee estimation mechanism + + let bitcoind_client = Arc::new(RpcClient::new(&ldk_config.get_str("bitcoind_credentials"), &ldk_config.get_str("bitcoind_hostport"))); + + let network = if let Ok(net) = ldk_config.get_network() { + net + } else { exit_on!("No network found in config file"); }; + + log_sample!(logger, "Checking validity of RPC URL to bitcoind..."); + if let Ok(v) = bitcoind_client.make_rpc_call("getblockchaininfo", &[], false).await { + assert!(v["verificationprogress"].as_f64().unwrap() > 0.99); + assert!( + v["bip9_softforks"]["segwit"]["status"].as_str() == Some("active") || + v["softforks"]["segwit"]["type"].as_str() == Some("buried")); + let bitcoind_net = match v["chain"].as_str().unwrap() { + "main" => constants::Network::Bitcoin, + "test" => constants::Network::Testnet, + "regtest" => constants::Network::Regtest, + _ => panic!("Unknown network type"), + }; + if !(network == bitcoind_net) { exit_on!("Divergent network between LDK node and bitcoind"); } + } else { exit_on!("Failed to connect to bitcoind RPC server, check your `bitcoind_hostport`/`bitcoind_credentials` settings"); } + + //TODO: connect to bitcoind + let fee_estimator = Arc::new(SampleFeeEstimator::new()); + + //TODO: connect to bitcoind + let tx_broadcaster = Arc::new(TxBroadcaster::new()); + + //TODO: replace by daemon dir + let data_path = String::from("/home/user/.ldk-node"); + if !fs::metadata(&data_path).unwrap().is_dir() { + exit_on!("Need storage_directory_path to exist and be a directory (or symlink to one)"); + } + let _ = fs::create_dir(data_path.clone() + "/monitors"); // If it already exists, ignore, hopefully perms are ok + + let persister = Arc::new(FilesystemPersister::new(data_path.clone() + "/monitors")); + + //TODO: if doesn't exist take best chain from bitcoind + let starting_blockhash = if let Ok(mut blockhash_file) = OpenOptions::new().read(true).open(data_path.clone() + "/blockhash") { + let mut buffer = Vec::new(); + let ret = if let Ok(_) = blockhash_file.read_to_end(&mut buffer) { + let ret = if let Ok(hash) = BlockHash::from_hex(&(std::str::from_utf8(&buffer).unwrap())) { + Some(hash) + } else { None }; + ret + } else { None }; + ret + } else { None }; + if starting_blockhash.is_none() { exit_on!("Need `blockhash` in `./ldk-node` directory"); } + + let ln_port = if let Ok(port) = ldk_config.get_ln_port() { + port + } else { exit_on!("No ln port found in config file"); }; + + let ldk_port = if let Ok(port) = ldk_config.get_ldk_port() { + port + } else { exit_on!("No ldk port found in config file"); }; + + // Step 3: Initialize key material + // + // A lightning node must manage special key materials to cover special needs: + // + // - maintain a network-wise persistent identity, the node pubkey + // - sign channel opening, updates and closing + + let secp_ctx = Secp256k1::new(); + + let our_node_seed = if let Ok(seed) = fs::read(data_path.clone() + "/key_seed") { + assert_eq!(seed.len(), 32); + let mut key = [0; 32]; + key.copy_from_slice(&seed); + key + } else { + let mut key = [0; 32]; + thread_rng().fill_bytes(&mut key); + let mut f = fs::File::create(data_path.clone() + "/key_seed").unwrap(); + f.write_all(&key).expect("Failed to write seed to disk"); + f.sync_all().expect("Failed to sync seed to disk"); + key + }; + let cur = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap(); + let keys = Arc::new(KeysManager::new(&our_node_seed, network, cur.as_secs(), cur.subsec_nanos())); + + println!("Node Pubkey {}", PublicKey::from_secret_key(&secp_ctx, &keys.get_node_secret())); + + let mut join_handles = Vec::new(); + + // Step 4: Create a ChainProvider + // + // A lightning chain backend must provide the following services: + // + // - validate the chain, (cf. `chain::Watcher` documentation) + // - connect blocks to `ChannelManager`/`chain::Watch` + // - provide utxo filtering registration (`chain::Filter`) + // - provide utxo set access (`chain::Access`) + // + // This sample node implementation relies on the default LDK block utilities (`lightning-block-sync`) + // to communicate with a bitcoind instance through the HTTP interface. Block updates flow + // to their final consumers (`ChannelManager/`chain::Watch`) through `ChainConnector. + log_sample!(logger, "Starting chain backend thread..."); + + let (outbound_blocks_chan_manager, inbound_blocks_chan_manager) = mpsc::channel(100); + + let buffer_blocks = Arc::new(ChainConnector::new()); + let chain_listener = buffer_blocks.clone(); + + let (handle_1, handle_2) = setup_chain_backend(starting_blockhash.unwrap(), (ldk_config.get_str("bitcoind_credentials"), ldk_config.get_str("bitcoind_hostport")), buffer_blocks, chain_listener, outbound_blocks_chan_manager).await; + join_handles.push(handle_1); + join_handles.push(handle_2); + + // Step 5: Start chain processing. + // + // A lightning node must watch and process the chain lively to take the following actions: + // + // - claim HTLC-onchain + // - punish the counterparty + // - detect a preimage to settle an incoming HTLC on the previous channel link + // - detect pay-to-us outputs (cf. `SpendableOutputDescriptors`) + // - fee-bump any transactions generated due to the aforementioned actions (cf. `OnchainTxHandler`) + // + + log_sample!(logger, "Starting chain processing..."); + + let chain_source = Arc::new(ChainSource::new()); + let handle = setup_chain_processing(chain_source.clone(), tx_broadcaster.clone(), fee_estimator.clone(), logger.clone(), persister.clone()).await; + join_handles.push(handle); + + // Step 6 : Start a event handler. + // + // A lightning node may gather event notifications for consumptions by user custom component + // (e.g display an app notification, ...). + + log_sample!(logger, "Starting event handler..."); + + let (outbound_event_peers, inbound_event_peers) = mpsc::channel(1); + let (outbound_event_chan_manager, inbound_event_chan_manager) = mpsc::channel(1); + + let inbound_event_connector = InboundEventConnector::new(inbound_event_peers, inbound_event_chan_manager); + + let handle = setup_event_handler(inbound_event_connector).await; + join_handles.push(handle); + + // Step 7 : Start channel manager. + // + // A lightning node core is the offchain state machine, accepting state update proposal from + // counterparty or relaying user demand to update, and propagating change to the rest of the + // implementation accordingly. + // + // TODO + + log_sample!(logger, "Starting channel manager..."); + + let (outbound_chanman_rpccmd, inbound_chanman_rpccmd) = mpsc::channel(1); + let (outbound_chanman_netmsg, inbound_chanman_netmsg) = mpsc::channel(1); + let (outbound_chanman_msg_events, inbound_chanman_msg_events) = mpsc::channel(1); + let (outbound_peerman_notify, inbound_peerman_notify) = mpsc::channel(1); + + let outbound_chan_manager_connector = OutboundChanManagerConnector::new(outbound_chanman_msg_events, outbound_peerman_notify); + + let chain_watchdog = Arc::new(ChainWatchdog::new()); + let (handle_1, handle_2) = setup_channel_manager(inbound_chanman_rpccmd, inbound_chanman_netmsg, inbound_blocks_chan_manager, outbound_chan_manager_connector, network, fee_estimator.clone(), chain_watchdog.clone(), tx_broadcaster.clone(), logger.clone(), keys.clone(), config, 0, bitcoind_client.clone()).await; + join_handles.push(handle_1); + join_handles.push(handle_2); + + // Step 8: Start node router + // + // The node router (`lightning::router::NetGraphMsgHandler`) is providing the following services: + // - boostrap the initial network graph + // - receive and validate network updates messages (BOLT-7's `node_announcement`/channel_announcement`/`channel_update`) from peers + // - provide HTLC payment route (`lightning::router::get_route()`) to a given destination + // - request and reply to extended queries for gossip synchronization (BOLT-7's `short_channel_ids`/`channel_range`) + // + // It requires a `chain::Access` reference to validate utxos part of channel announcement against the chain. + + log_sample!(logger, "Starting node router..."); + + let (outbound_router_rpccmd, inbound_router_rpccmd) = mpsc::channel(1); + let (outbound_router_rpcreply, inbound_router_rpcreply) = mpsc::channel(1); + let (outbound_routing_msg, inbound_routing_msg): (Sender, Receiver) = mpsc::channel(1); + + let utxo_accessor = Arc::new(UtxoWatchdog::new()); + let outbound_router_connector = OutboundRouterConnector::new(outbound_router_rpcreply); + let inbound_router_connector = InboundRouterConnector::new(inbound_router_rpccmd, inbound_router_rpcreply); + let handle = setup_router(outbound_router_connector, inbound_router_connector, utxo_accessor.clone(), logger.clone()).await; + join_handles.push(handle); + + // Step 9: Start a peer manager + // + // A lightning node must have access to the wider lightning network itself. A lightning + // network stack will offer the following services : + // - relay messages to + // - handle peers management (e.g misbehaving, manual disconnection, ...) + // + // This sample node implementation relies on the default LDK networking stack (`lightning-net-tokio`). + + log_sample!(logger, "Starting peer manager..."); + + let (outbound_peers_rpccmd, inbound_peers_rpccmd) = mpsc::channel(1); + let (outbound_socket_events, inbound_socket_events): (Sender<()>, Receiver<()>) = mpsc::channel(1); + + let outbound_peer_manager_connector = Arc::new(OutboundPeerManagerConnector::new(outbound_event_peers)); //sender_routing_msg/sender_chan_msg + let buffer_netmsg = Arc::new(BufferNetMsg::new(inbound_chanman_msg_events)); + let chan_handler = buffer_netmsg.clone(); + + let mut ephemeral_data = [0; 32]; + rand::thread_rng().fill_bytes(&mut ephemeral_data); + let (handle_1, handle_2, handle_3, peer_manager_arc) = setup_peer_manager(outbound_peer_manager_connector.clone(), outbound_chanman_netmsg, inbound_peers_rpccmd, inbound_peerman_notify, buffer_netmsg, chan_handler, outbound_peer_manager_connector.clone(), keys.get_node_secret(), ephemeral_data, logger.clone()).await; + join_handles.push(handle_1); + join_handles.push(handle_2); + join_handles.push(handle_3); + + // Step 10: Start an inbound connections listener + // + // `lightning_net_tokio::setup_inbound` relays incoming messages to `ChannelManagerHandler` + // and `RoutingMessageHandler` and feeds outgoing messages back to `SocketDescriptor` + // generated by accepting an incoming connection. + + log_sample!(logger, "Starting socket listener thread..."); + + let outbound_socket_listener_connector = OutboundSocketListenerConnector::new(outbound_socket_events); // outbound_socket_events + let handle = setup_socket_listener(peer_manager_arc, outbound_socket_listener_connector, ln_port).await; + join_handles.push(handle); + + // Step X: Start a RPC server. + // + // Beyond peers messages and channel updates, a lightning node is also driven by user requests. + // + // This sample node implementation communicate with the LDK command-line binary (`ldk-cli`) + // through a custom RPC protocol. + + log_sample!(logger, "Starting rpc server thread..."); + + let outbound_rpc_server_connector = OutboundRPCServerConnector::new(outbound_peers_rpccmd, outbound_chanman_rpccmd, outbound_router_rpccmd); + let handles = setup_rpc_server(outbound_rpc_server_connector, ldk_port).await; + join_handles.push(handles); + + loop { + let one_sec = Duration::from_millis(100); + sleep(one_sec); + } +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 00000000..b5614dd8 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1 @@ +pub mod utils; diff --git a/src/rpc_cmd.rs b/src/rpc_cmd.rs new file mode 100644 index 00000000..85e0f7e7 --- /dev/null +++ b/src/rpc_cmd.rs @@ -0,0 +1,91 @@ +// RPCcmd +// +// RPCresponse +// + +use serde_json::{Value, json}; + +use std::process::exit; + +pub(crate) struct RPCBuilder {} + +macro_rules! exit_on { + ($msg: expr) => { + eprintln!($msg); + exit(1); + } +} + +impl RPCBuilder { + pub(crate) fn new(cmd: &str, args: Vec) -> Value { + match cmd { + "connect" => { + if args.len() != 3 { exit_on!("ldk-cli: connect pubkey hostname port (uncomplete)"); } + json!({ + "cmd": "connect", + "pubkey": args[0], + "hostname": args[1], + "port": args[2], + }) + }, + "disconnect" => { + if args.len() != 1 { exit_on!("ldk-cli disconnect pubkey (uncomplete)"); } + json!({ + "cmd": "disconnect", + "pubkey": args[0], + }) + }, + "open" => { + if args.len() != 3 { exit_on!("ldk-cli open pubkey funding_satoshis push_msat (uncomplete)"); } + json!({ + "cmd": "open", + "pubkey": args[0], + "funding_satoshis": args[1], + "push_msat": args[2], + }) + }, + "close" => { + if args.len() != 2 { exit_on!("ldk-cli close pubkey chan_id"); } + json!({ + "cmd": "close", + "pubkey": args[0], + "chan_id": args[1], + }) + }, + "force-close" => { + if args.len() != 2 { exit_on!("ldk-cli force-close pubkey chan_id"); } + json!({ + "cmd": "force-close", + "pubkey": args[0], + "chan_id": args[1], + }) + }, + "send" => { + if args.len() != 3 { exit_on!("ldk-cli send pubkey short_id amt"); } + json!({ + "cmd": "send", + "pubkey": args[0], + "short_id": args[1], + "amt": args[2], + }) + }, + "invoice" => { + if args.len() != 1 { exit_on!("ldk-cli invoice amt"); } + json!({ + "cmd": "invoice", + "amt": args[0], + }) + }, + "list" => { + if args.len() != 1 { exit_on!("ldk-cli list nodes|chans|peers"); } + json!({ + "cmd": "list", + "element": args[0], + }) + }, + _ => { + exit_on!("ldk-cli: unknown RPC cmd"); + } + } + } +} diff --git a/src/sample-cli.rs b/src/sample-cli.rs new file mode 100644 index 00000000..e79fbdff --- /dev/null +++ b/src/sample-cli.rs @@ -0,0 +1,51 @@ +/// This file contains the client logic. + +mod rpc_cmd; +use rpc_cmd::RPCBuilder; + +use ldk::utils::{LogPrinter, RpcClient}; + +use std::env; +use std::process::exit; +use std::sync::Arc; + +fn display_cmds() { + //println!("'list [graph|channels|peers|invoices]' List details about given class of elements"); + println!("'connect/disconnect peer_id' Open or close a connection with the given peer"); + //TODO: htlc/channel/invoices + exit(0); +} + +fn sanitize_cmd(cmd: &str) { + + if cmd.find("list").is_some() + || cmd.find("ping").is_some() + || cmd.find("connect").is_some() + || cmd.find("disconnect").is_some() + || cmd.find("invoice").is_some() + || cmd.find("send").is_some() + || cmd.find("open").is_some() { + } else { + eprintln!("ldk-cli: {} is not a ldk-cli command", cmd); + } +} + +#[tokio::main] +async fn main() { + + // If not argument is provided, display helper and exit. + if env::args().len() == 1 { + display_cmds(); + } + + // If an argument is provided, capture and sanitize + let cmd = env::args().skip(1).next().unwrap(); + let args: Vec = env::args().skip(2).collect::>(); + sanitize_cmd(&cmd); + let json_rpc = RPCBuilder::new(&cmd, args); + + let logger = Arc::new(LogPrinter::new("/home/user/.ldk-node/debug.log")); + + let rpc_client = RpcClient::new("", "127.0.0.1:7688"); + rpc_client.send_request(&logger, json_rpc).await; +} diff --git a/src/sampled.rs b/src/sampled.rs new file mode 100644 index 00000000..8ba1c88f --- /dev/null +++ b/src/sampled.rs @@ -0,0 +1,814 @@ +use ldk::utils::{hex_to_vec, RpcClient}; + +use lightning::chain; +use lightning::chain::{Access, AccessError, Filter, Watch}; +use lightning::chain::{chaininterface, channelmonitor}; +use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator}; +use lightning::chain::chainmonitor::ChainMonitor; +use lightning::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, ChannelMonitorUpdateErr, MonitorEvent}; +use lightning::chain::keysinterface::InMemoryChannelKeys; +use lightning::chain::transaction::OutPoint; +use lightning::chain::keysinterface::KeysInterface; +use lightning::ln::peer_handler; +use lightning::ln::channelmanager::{ChannelManager, PaymentHash}; +use lightning::ln::features::{InitFeatures, ChannelFeatures, NodeFeatures}; +use lightning::ln::msgs::*; +use lightning::routing::network_graph::NetGraphMsgHandler; +use lightning::routing::router::{Route, RouteHop}; +use lightning::util::config::UserConfig; +use lightning::util::events::{MessageSendEvent, MessageSendEventsProvider, EventsProvider, Event}; +use lightning::util::logger::Logger; + +use lightning_net_tokio::*; + +use lightning_block_sync::*; +use lightning_block_sync::http_clients::RPCClient; +use lightning_block_sync::http_endpoint::HttpEndpoint; + +use bitcoin::{BlockHash, TxOut}; +use bitcoin::blockdata; +use bitcoin::blockdata::block::{Block, BlockHeader}; +use bitcoin::blockdata::script::Script; +use bitcoin::blockdata::transaction::Transaction; +use bitcoin::consensus::encode; +use bitcoin::hash_types::Txid; +use bitcoin::hashes::Hash; +use bitcoin::hashes::sha256::Hash as Sha256Hash; +use bitcoin::network::constants; +use bitcoin::secp256k1::key::{PublicKey, SecretKey}; + +use std::cmp; +use std::collections::HashMap; +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use std::ops::Deref; +use std::str::FromStr; +use std::sync::{Arc, Mutex}; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::thread::sleep; +use std::time::Duration; + +use tokio::io::AsyncReadExt; +use tokio::sync::mpsc::{Sender, Receiver}; +use tokio::task::JoinHandle; + +pub(crate) struct InboundEventConnector { + peers: Receiver<()>, + chan_manager: Receiver<()>, +} + +impl InboundEventConnector { + pub(crate) fn new(peers: Receiver<()>, chan_manager: Receiver<()>) -> Self { + InboundEventConnector { + peers, + chan_manager, + } + } +} + +pub(super) async fn setup_event_handler(mut inbound: InboundEventConnector) -> JoinHandle<()> { + + let join_handle: JoinHandle<()> = tokio::spawn(async move { + loop { + if let Some(_) = inbound.peers.recv().await { + println!("Peer Manager: connect !"); + } + } + }); + join_handle +} + +pub(crate) struct WrapperBlock { + header: BlockHeader, + txdata: Vec, + height: u32, +} + +pub(crate) struct ChainWatchdog {} + +impl ChainWatchdog { + pub(crate) fn new() -> Self { + ChainWatchdog {} + } +} + +impl Watch for ChainWatchdog { + type Keys = InMemoryChannelKeys; + fn watch_channel(&self, funding_txo: OutPoint, monitor: ChannelMonitor) -> Result<(), ChannelMonitorUpdateErr> { + Ok(()) + } + fn update_channel(&self, funding_txo: OutPoint, update: ChannelMonitorUpdate) -> Result<(), ChannelMonitorUpdateErr> { + Ok(()) + } + fn release_pending_monitor_events(&self) -> Vec { + vec![] + } +} + +pub(crate) struct InboundChanManagerConnector { + rpccmd: Receiver>, + netmsg: Receiver, +} + +impl InboundChanManagerConnector { + pub(crate) fn new(rpccmd: Receiver>, netmsg: Receiver) -> Self { + InboundChanManagerConnector { + rpccmd, + netmsg, + } + } +} + +#[derive(Clone)] +pub(crate) struct OutboundChanManagerConnector { + chanman_msg_events: Sender>, + peerman_notify: Sender<()>, +} + +impl OutboundChanManagerConnector { + pub(crate) fn new(chanman_msg_events: Sender>, peerman_notify: Sender<()>) -> Self { + OutboundChanManagerConnector { + chanman_msg_events, + peerman_notify, + } + } +} + +macro_rules! flush_msg_events { + ($outbound: expr, $manager: expr) => { + let msg_events = $manager.get_and_clear_pending_msg_events(); + if let Ok(_) = $outbound.chanman_msg_events.send(msg_events).await {} + if let Ok(_) = $outbound.peerman_notify.send(()).await {} + } +} + +macro_rules! to_bech_network { + ($network: expr) => { + match $network { + constants::Network::Bitcoin => { bitcoin_bech32::constants::Network::Bitcoin }, + constants::Network::Testnet => { bitcoin_bech32::constants::Network::Testnet }, + constants::Network::Regtest => { bitcoin_bech32::constants::Network::Regtest } + }; + } +} + +pub(super) async fn setup_channel_manager(mut rpccmd: Receiver>, mut netmsg: Receiver, mut blocks: Receiver, mut outbound: OutboundChanManagerConnector, network: constants::Network, fee_est: Arc, chain_monitor: Arc, tx_broadcaster: Arc, logger: Arc, keys_manager: Arc, config: UserConfig, current_blockchain_height: usize, bitcoind_client: Arc) -> (JoinHandle<()>, JoinHandle<()>) + where F: FeeEstimator + 'static, + M: chain::Watch + 'static, + T: BroadcasterInterface + 'static, + L: Logger + 'static, + K: KeysInterface + 'static, +{ + let channel_manager = ChannelManager::new(network, fee_est, chain_monitor, tx_broadcaster, logger, keys_manager, config.clone(), current_blockchain_height); + let channel_manager_rpc = Arc::new(channel_manager); + let channel_manager_net = channel_manager_rpc.clone(); + let mut outbound_net = outbound.clone(); + let txn_to_broadcast = Mutex::new(HashMap::new()); + let hash_to_htlc_key = Mutex::new(HashMap::new()); + let join_handle_rpc: JoinHandle<()> = tokio::spawn(async move { + loop { + if let Some(buffer) = rpccmd.recv().await { + let v: serde_json::Value = serde_json::from_slice(&buffer[..]).unwrap(); + let v_obj = v.as_object().unwrap(); + match v_obj.get("cmd").unwrap().as_str().unwrap() { + "open" => { + let pubkey = PublicKey::from_str(v_obj.get("pubkey").unwrap().as_str().unwrap()).unwrap(); + let funding = u64::from_str_radix(v_obj.get("funding_satoshis").unwrap().as_str().unwrap(), 10).unwrap(); + let push_msat = u64::from_str_radix(v_obj.get("push_msat").unwrap().as_str().unwrap(), 10).unwrap(); + match channel_manager_rpc.create_channel(pubkey, funding, push_msat, 0, Some(config.clone())) { + Ok(_) => println!("Channel created, sending open_channel!"), + Err(e) => println!("Failed to open channel : {:?}", e), + } + flush_msg_events!(outbound, channel_manager_rpc); + }, + "close" => { + //TODO: call `channel_manager_rpc.close_channel(pubkey, channel_id) + }, + "force-close" => { + //TODO: call channel_manager_rpc.force_close_channel(pubkey, channel_id) + }, + "send" => { + let mut payment_preimage = [1; 32]; + let hash = Sha256Hash::hash(&payment_preimage); + let pubkey = PublicKey::from_str(v_obj.get("pubkey").unwrap().as_str().unwrap()).unwrap(); + let short_channel_id = u64::from_str_radix(v_obj.get("short_id").unwrap().as_str().unwrap(), 10).unwrap(); + let amt = u64::from_str_radix(v_obj.get("amt").unwrap().as_str().unwrap(), 10).unwrap(); + let hop = RouteHop { + pubkey, + node_features: NodeFeatures::empty(), + short_channel_id, + channel_features: ChannelFeatures::empty(), + fee_msat: amt, + cltv_expiry_delta: 10 + }; + let route = Route { + paths: vec![vec![hop]], + }; + channel_manager_rpc.send_payment(&route, PaymentHash(hash.into_inner()), &None).unwrap(); + flush_msg_events!(outbound, channel_manager_rpc); + }, + "invoice" => { + let amount = u64::from_str_radix(v_obj.get("amt").unwrap().as_str().unwrap(), 10).unwrap(); + let payment_preimage = [1; 32]; + //thread_rng().fill_bytes(&mut payment_preimage); + let payment_hash = Sha256Hash::hash(&payment_preimage); + if let Ok(mut hash_to_htlc_key) = hash_to_htlc_key.lock() { + hash_to_htlc_key.insert(payment_hash, (amount, payment_preimage)); + } + println!("Invoice hash {} amount {}", payment_hash, amount); + }, + _ => {}, + } + } + } + }); + let join_handle_net: JoinHandle<()> = tokio::spawn(async move { + loop { + if let Ok(wrap_msg) = netmsg.try_recv() { + match wrap_msg.msg { + Msg::Open(open) => { + channel_manager_net.handle_open_channel(&wrap_msg.node_id, wrap_msg.features.unwrap(), &open); + println!("Gotcha OpenChannel"); + flush_msg_events!(outbound_net, channel_manager_net); + }, + Msg::Accept(accept) => { + channel_manager_net.handle_accept_channel(&wrap_msg.node_id, wrap_msg.features.unwrap(), &accept); + let events = channel_manager_net.get_and_clear_pending_events(); + match &events[0] { + Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, output_script, .. } => { + println!("Funding Generation Ready {}", channel_value_satoshis); + let addr = bitcoin_bech32::WitnessProgram::from_scriptpubkey(&output_script[..], to_bech_network!(network)).expect("LN funding tx should always be to a Segwit output").to_address(); + let outputs = format!("{{\"{}\":{}}}", addr, *channel_value_satoshis as f64 / 1_000_000_00.0).to_string(); + if let Ok(tx_hex) = bitcoind_client.make_rpc_call("createrawtransaction", &["[]", &outputs], false).await { + let rawtx = format!("\"{}\"", tx_hex.as_str().unwrap()).to_string(); + let feerate = "{\"fee_rate\": 1}"; + if let Ok(funded_tx) = bitcoind_client.make_rpc_call("fundrawtransaction", &[&rawtx, &feerate], false).await { + let changepos = funded_tx["changepos"].as_i64().unwrap(); + assert!(changepos == 0 || changepos == 1); + let funded_tx = format!("\"{}\"", funded_tx["hex"].as_str().unwrap()).to_string(); + if let Ok(signed_tx) = bitcoind_client.make_rpc_call("signrawtransactionwithwallet", &[&funded_tx], false).await { + assert_eq!(signed_tx["complete"].as_bool().unwrap(), true); + let tx: blockdata::transaction::Transaction = encode::deserialize(&hex_to_vec(&signed_tx["hex"].as_str().unwrap()).unwrap()).unwrap(); + let outpoint = chain::transaction::OutPoint { + txid: tx.txid(), + index: if changepos == 0 { 1 } else { 0 }, + }; + channel_manager_net.funding_transaction_generated(&temporary_channel_id, outpoint); + if let Ok(mut txn_to_broadcast) = txn_to_broadcast.lock() { + txn_to_broadcast.insert(outpoint, tx); + } + flush_msg_events!(outbound_net, channel_manager_net); + } + } + } + }, + _ => panic!("Event should have been polled earlier!"), + } + }, + Msg::Created(created) => { + channel_manager_net.handle_funding_created(&wrap_msg.node_id, &created); + flush_msg_events!(outbound_net, channel_manager_net); + }, + Msg::Signed(signed) => { + channel_manager_net.handle_funding_signed(&wrap_msg.node_id, &signed); + let events = channel_manager_net.get_and_clear_pending_events(); + match &events[0] { + Event::FundingBroadcastSafe { funding_txo, user_channel_id: _ } => { + let mut tx = Transaction { + version: 2, + lock_time: 0, + input: Vec::new(), + output: Vec::new(), + }; + if let Ok(mut txn_to_broadcast) = txn_to_broadcast.lock() { + let removed_tx = txn_to_broadcast.remove(&funding_txo).unwrap(); + tx.input = removed_tx.input.clone(); + tx.output = removed_tx.output.clone(); + } + let tx_ser = "\"".to_string() + &encode::serialize_hex(&tx) + "\""; + if let Ok(_) = bitcoind_client.make_rpc_call("sendrawtransaction", &[&tx_ser], false).await { + println!("Transaction broadcast !"); + } + }, + _ => panic!("Event should have been polled earlier!"), + } + }, + Msg::Locked(locked) => { + channel_manager_net.handle_funding_locked(&wrap_msg.node_id, &locked); + }, + Msg::AddHTLC(add) => { + channel_manager_net.handle_update_add_htlc(&wrap_msg.node_id, &add); + }, + Msg::Commitment(commitment) => { + channel_manager_net.handle_commitment_signed(&wrap_msg.node_id, &commitment); + flush_msg_events!(outbound_net, channel_manager_net); + }, + Msg::RAA(raa) => { + channel_manager_net.handle_revoke_and_ack(&wrap_msg.node_id, &raa); + flush_msg_events!(outbound_net, channel_manager_net); + }, + _ => panic!("Not coded yet!"), + } + } + if let Ok(wrap_block) = blocks.try_recv() { + let mut txn_data = vec![]; + for (idx, tx) in wrap_block.txdata.iter().enumerate() { + txn_data.push((idx, tx)); + } + channel_manager_net.block_connected(&wrap_block.header, &txn_data, wrap_block.height); + flush_msg_events!(outbound_net, channel_manager_net); + } + let one_centisecond = Duration::from_millis(100); + sleep(one_centisecond); + } + }); + (join_handle_rpc, join_handle_net) +} + +pub(crate) struct ChainSource {} + +impl ChainSource { + pub(crate) fn new() -> Self { + ChainSource {} + } +} + +impl Filter for ChainSource { + fn register_tx(&self, txid: &Txid, script_pubkey: &Script) {} + fn register_output(&self, outpoint: &OutPoint, script_pubkey: &Script) {} +} + +pub(super) async fn setup_chain_processing(chain_source: C, broadcaster: T, feeest: F, logger: L, persister: P) -> JoinHandle<()> + where C::Target: chain::Filter, + T::Target: BroadcasterInterface, + F::Target: FeeEstimator, + L::Target: Logger, + P::Target: channelmonitor::Persist, +{ + let join_handle: JoinHandle<()> = tokio::spawn(async move { + let _chain_processing = ChainMonitor::new(Some(chain_source), broadcaster, logger, feeest, persister); + //TODO: actually verify what happens on chain :) + }); + join_handle +} + +pub(crate) struct UtxoWatchdog {} + +impl UtxoWatchdog { + pub(crate) fn new() -> Self { + UtxoWatchdog {} + } +} + +impl Access for UtxoWatchdog { + fn get_utxo(&self, genesis_hash: &BlockHash, short_channel_id: u64) -> Result { + panic!(); + } +} + +pub(crate) struct OutboundRouterConnector { + rpcreply: Sender>, +} + +impl OutboundRouterConnector { + pub(crate) fn new(rpcreply: Sender>) -> Self { + OutboundRouterConnector { + rpcreply + } + } +} + +pub(crate) struct InboundRouterConnector { + rpccmd: Receiver>, + netmsg: Receiver>, +} + +impl InboundRouterConnector { + pub(crate) fn new(rpccmd: Receiver>, netmsg: Receiver>) -> Self { + InboundRouterConnector { + rpccmd, + netmsg, + } + } +} + +pub(super) async fn setup_router(_outbound: OutboundRouterConnector, mut inbound: InboundRouterConnector, utxo_accessor: F, logger: L) -> JoinHandle<()> + where F::Target: Access, + L::Target: Logger, +{ + let join_handle: JoinHandle<()> = tokio::spawn(async move { + let router = NetGraphMsgHandler::new(Some(utxo_accessor), logger); + loop { + if let Some(buffer) = inbound.rpccmd.recv().await { + let v: serde_json::Value = serde_json::from_slice(&buffer[..]).unwrap(); + let v_obj = v.as_object().unwrap(); + match v_obj.get("cmd").unwrap().as_str().unwrap() { + "list" => { + match v_obj.get("element").unwrap().as_str().unwrap() { + "peers" => { + if let Ok(_) = router.network_graph.read() { + //TODO: serialize in an acceptable format and answer back the client + } + }, + _ => {}, + } + }, + _ => {}, + } + } + } + }); + join_handle +} + +enum Msg { + Open(OpenChannel), + Accept(AcceptChannel), + Created(FundingCreated), + Signed(FundingSigned), + Locked(FundingLocked), + AddHTLC(UpdateAddHTLC), + FulfillHTLC(UpdateFulfillHTLC), + Commitment(CommitmentSigned), + RAA(RevokeAndACK), +} + +pub(crate) struct WrapperMsg { + node_id: PublicKey, + features: Option, + msg: Msg, +} + +pub(crate) struct OutboundPeerManagerConnector { + event_handler: Sender<()>, +} + +impl OutboundPeerManagerConnector { + pub(crate) fn new(event_handler: Sender<()>) -> Self { + OutboundPeerManagerConnector { + event_handler, + } + } +} + +pub(crate) struct BufferNetMsg { + chanman_msg_events: Mutex>>, + buffer_netmsg: Mutex>, +} + +impl BufferNetMsg { + pub(crate) fn new(chanman_msg_events: Receiver>) -> BufferNetMsg { + BufferNetMsg { + chanman_msg_events: Mutex::new(chanman_msg_events), + buffer_netmsg: Mutex::new(vec![]), + } + } + pub(crate) fn get_netmsg(&self) -> Vec { + if let Ok(mut buffer_netmsg) = self.buffer_netmsg.lock() { + return buffer_netmsg.drain(..).collect(); + } + vec![] + } +} + +impl ChannelMessageHandler for BufferNetMsg { + fn handle_open_channel(&self, their_node_id: &PublicKey, their_features: InitFeatures, msg: &OpenChannel) { + if let Ok(mut buffer_netmsg) = self.buffer_netmsg.lock() { + buffer_netmsg.push(WrapperMsg { node_id: their_node_id.clone(), features: Some(their_features), msg: Msg::Open(msg.clone()) }); + } + } + fn handle_accept_channel(&self, their_node_id: &PublicKey, their_features: InitFeatures, msg: &AcceptChannel) { + if let Ok(mut buffer_netmsg) = self.buffer_netmsg.lock() { + buffer_netmsg.push(WrapperMsg { node_id: their_node_id.clone(), features: Some(their_features), msg: Msg::Accept(msg.clone()) }); + } + } + fn handle_funding_created(&self, their_node_id: &PublicKey, msg: &FundingCreated) { + if let Ok(mut buffer_netmsg) = self.buffer_netmsg.lock() { + buffer_netmsg.push(WrapperMsg { node_id: their_node_id.clone(), features: None, msg: Msg::Created(msg.clone()) }); + } + } + fn handle_funding_signed(&self, their_node_id: &PublicKey, msg: &FundingSigned) { + if let Ok(mut buffer_netmsg) = self.buffer_netmsg.lock() { + buffer_netmsg.push(WrapperMsg { node_id: their_node_id.clone(), features: None, msg: Msg::Signed(msg.clone()) }); + } + } + fn handle_funding_locked(&self, their_node_id: &PublicKey, msg: &FundingLocked) { + if let Ok(mut buffer_netmsg) = self.buffer_netmsg.lock() { + buffer_netmsg.push(WrapperMsg { node_id: their_node_id.clone(), features: None, msg: Msg::Locked(msg.clone()) }); + } + } + fn handle_shutdown(&self, their_node_id: &PublicKey, msg: &Shutdown) {} + fn handle_closing_signed(&self, their_node_id: &PublicKey, msg: &ClosingSigned) {} + fn handle_update_add_htlc(&self, their_node_id: &PublicKey, msg: &UpdateAddHTLC) { + if let Ok(mut buffer_netmsg) = self.buffer_netmsg.lock() { + buffer_netmsg.push(WrapperMsg { node_id: their_node_id.clone(), features: None, msg: Msg::AddHTLC(msg.clone()) }); + } + } + fn handle_update_fulfill_htlc(&self, their_node_id: &PublicKey, msg: &UpdateFulfillHTLC) { + if let Ok(mut buffer_netmsg) = self.buffer_netmsg.lock() { + buffer_netmsg.push(WrapperMsg { node_id: their_node_id.clone(), features: None, msg: Msg::FulfillHTLC(msg.clone()) }); + } + } + fn handle_update_fail_htlc(&self, their_node_id: &PublicKey, msg: &UpdateFailHTLC) {} + fn handle_update_fail_malformed_htlc(&self, their_node_id: &PublicKey, msg: &UpdateFailMalformedHTLC) {} + fn handle_commitment_signed(&self, their_node_id: &PublicKey, msg: &CommitmentSigned) { + if let Ok(mut buffer_netmsg) = self.buffer_netmsg.lock() { + buffer_netmsg.push(WrapperMsg { node_id: their_node_id.clone(), features: None, msg: Msg::Commitment(msg.clone()) }); + } + } + fn handle_revoke_and_ack(&self, their_node_id: &PublicKey, msg: &RevokeAndACK) { + if let Ok(mut buffer_netmsg) = self.buffer_netmsg.lock() { + buffer_netmsg.push(WrapperMsg { node_id: their_node_id.clone(), features: None, msg: Msg::RAA(msg.clone()) }); + } + } + fn handle_update_fee(&self, their_node_id: &PublicKey, msg: &UpdateFee) {} + fn handle_announcement_signatures(&self, their_node_id: &PublicKey, msg: &AnnouncementSignatures) {} + fn peer_disconnected(&self, their_node_id: &PublicKey, no_connection_possible: bool) {} + fn peer_connected(&self, their_node_id: &PublicKey, msg: &Init) {} + fn handle_channel_reestablish(&self, their_node_id: &PublicKey, msg: &ChannelReestablish) {} + fn handle_error(&self, their_node_id: &PublicKey, msg: &ErrorMessage) {} +} + +impl MessageSendEventsProvider for BufferNetMsg { + fn get_and_clear_pending_msg_events(&self) -> Vec { + if let Ok(mut chanman_msg_events) = self.chanman_msg_events.lock() { + if let Ok(events) = chanman_msg_events.try_recv() { + return events; + } + } + vec![] + } +} + +impl RoutingMessageHandler for OutboundPeerManagerConnector { + fn handle_node_announcement(&self, msg: &NodeAnnouncement) -> Result { + Ok(true) + } + fn handle_channel_announcement(&self, msg: &ChannelAnnouncement) -> Result { + Ok(true) + } + fn handle_channel_update(&self, msg: &ChannelUpdate) -> Result { + Ok(true) + } + fn handle_htlc_fail_channel_update(&self, update: &HTLCFailChannelUpdate) {} + fn get_next_channel_announcements(&self, starting_point: u64, batch_amount: u8) -> Vec<(ChannelAnnouncement, Option, Option)> { + vec![] + } + fn get_next_node_announcements(&self, starting_point: Option<&PublicKey>, batch_amount: u8) -> Vec { + vec![] + } + fn should_request_full_sync(&self, node_id: &PublicKey) -> bool { + true + } +} + +pub(super) async fn setup_peer_manager(outbound: Arc, mut chanman_netmsg: Sender, mut peers_rpccmd: Receiver>, mut peerman_notify: Receiver<()>, buffer_netmsg: Arc, chan_handler: Arc, router: Arc, node_secret: SecretKey, ephemeral_data: [u8; 32], logger: Arc) -> (JoinHandle<()>, JoinHandle<()>, JoinHandle<()>, Arc, Arc, Arc>>) + where C: ChannelMessageHandler + 'static, + R: RoutingMessageHandler + 'static, + L: Logger + 'static +{ + let peer_handler: peer_handler::PeerManager, Arc, Arc> = peer_handler::PeerManager::new(peer_handler::MessageHandler { + chan_handler: chan_handler, + route_handler: router, + }, node_secret, &ephemeral_data, logger); + let peer_handler_arc = Arc::new(peer_handler); + let peer_handler_listener = peer_handler_arc.clone(); + let peer_handler_event = peer_handler_arc.clone(); + let join_handle_rpc: JoinHandle<()> = tokio::spawn(async move { // RPC cmd thread + loop { + if let Some(buffer) = peers_rpccmd.recv().await { + let v: serde_json::Value = serde_json::from_slice(&buffer[..]).unwrap(); + let v_obj = v.as_object().unwrap(); + match v_obj.get("cmd").unwrap().as_str().unwrap() { + "connect" => { + let event_notify = outbound.event_handler.clone(); + let pubkey = PublicKey::from_str(v_obj.get("pubkey").unwrap().as_str().unwrap()).unwrap(); + let ip_addr: Vec<&str> = v_obj.get("hostname").unwrap().as_str().unwrap().split('.').collect(); + let port = v_obj.get("port").unwrap().as_str().unwrap().parse::().unwrap(); + let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(u8::from_str_radix(ip_addr[0], 10).unwrap(), u8::from_str_radix(ip_addr[1], 10).unwrap(), u8::from_str_radix(ip_addr[2], 10).unwrap(), u8::from_str_radix(ip_addr[3], 10).unwrap())), port); + connect_outbound(peer_handler_arc.clone(), event_notify, pubkey, addr).await; + }, + "disconnect" => { + }, + _ => {}, + } + } + } + }); + let join_handle_event: JoinHandle<()> = tokio::spawn(async move { // Event processing thread + loop { + if let Ok(_) = peerman_notify.try_recv() { + println!("Processing event..."); + peer_handler_event.process_events(); + } + let one_centisecond = Duration::from_millis(100); + sleep(one_centisecond); + } + }); + let join_handle_msg: JoinHandle<()> = tokio::spawn(async move { + loop { + for e in buffer_netmsg.get_netmsg() { + if let Ok(_) = chanman_netmsg.send(e).await {} + //println!("Passing down netmsg to channel manger!"); + } + let one_centisecond = Duration::from_millis(100); + sleep(one_centisecond); + } + }); + (join_handle_rpc, join_handle_event, join_handle_msg, peer_handler_listener) +} + +pub(crate) struct OutboundSocketListenerConnector { + outbound_socket_events: Sender<()> +} + +impl OutboundSocketListenerConnector { + pub(crate) fn new(outbound_socket_events: Sender<()>) -> Self { + OutboundSocketListenerConnector { + outbound_socket_events, + } + } +} + +pub(super) async fn setup_socket_listener(peer_manager_arc: Arc, Arc, Arc>>, outbound: OutboundSocketListenerConnector, port: u16) -> JoinHandle<()> +{ + let join_handle: JoinHandle<()> = tokio::spawn(async move { + let mut listener = tokio::net::TcpListener::bind(("::".parse::().unwrap(), port)).await.unwrap(); + loop { + let sock = listener.accept().await.unwrap().0; + let outbound_socket_events = outbound.outbound_socket_events.clone(); + let peer_manager_listener = peer_manager_arc.clone(); + tokio::spawn(async move { + setup_inbound(peer_manager_listener, outbound_socket_events, sock).await; + }); + } + }); + join_handle +} + +pub(crate) struct OutboundRPCServerConnector { + peers_rpccmd: Sender>, + chanman_rpccmd: Sender>, + router_rpccmd: Sender> +} + +impl OutboundRPCServerConnector { + pub(crate) fn new(peers_rpccmd: Sender>, chanman_rpccmd: Sender>, router_rpccmd: Sender>) -> Self { + OutboundRPCServerConnector { + peers_rpccmd, + chanman_rpccmd, + router_rpccmd + } + } +} + +pub(super) async fn setup_rpc_server(mut outbound: OutboundRPCServerConnector, port: u16) -> JoinHandle<()> { + let join_handle: JoinHandle<()> = tokio::spawn(async move { + let mut listener = tokio::net::TcpListener::bind(("::".parse::().unwrap(), port)).await.unwrap(); + loop { + let mut sock = listener.accept().await.unwrap().0; + let buffer_len = sock.read_u8().await.unwrap(); + let mut buffer = Vec::with_capacity(buffer_len as usize); + sock.read_buf(&mut buffer).await.unwrap(); + let v: serde_json::Value = match serde_json::from_slice(&buffer[..]) { + Ok(v) => v, + Err(_) => { + println!("Failed to parse RPC client command"); + return (); + }, + }; + if !v.is_object() { + println!("Failed to parse RPC client command"); + return (); + } + let v_obj = v.as_object().unwrap(); + if v_obj.get("cmd").is_none() { + println!("Failed to get a RPC client command"); + return (); + } + match v_obj.get("cmd").unwrap().as_str().unwrap() { + "connect" => { + outbound.peers_rpccmd.send(buffer[..].to_vec()).await.unwrap(); + }, + "disconnect" => { + outbound.peers_rpccmd.send(buffer[..].to_vec()).await.unwrap(); + }, + "list" => { + outbound.router_rpccmd.send(buffer[..].to_vec()).await.unwrap(); + }, + "open" => { + outbound.chanman_rpccmd.send(buffer[..].to_vec()).await.unwrap(); + }, + "invoice" => { + outbound.chanman_rpccmd.send(buffer[..].to_vec()).await.unwrap(); + }, + "send" => { + outbound.chanman_rpccmd.send(buffer[..].to_vec()).await.unwrap(); + }, + _ => {}, + } + } + }); + join_handle +} + +/// Basic fee estimator +pub(crate) struct SampleFeeEstimator { + background_est: AtomicUsize, + normal_est: AtomicUsize, + high_prio_est: AtomicUsize, +} +impl SampleFeeEstimator { + pub(crate) fn new() -> Self { + SampleFeeEstimator { + background_est: AtomicUsize::new(0), + normal_est: AtomicUsize::new(0), + high_prio_est: AtomicUsize::new(0), + } + } +} +impl chaininterface::FeeEstimator for SampleFeeEstimator { + fn get_est_sat_per_1000_weight(&self, conf_target: chaininterface::ConfirmationTarget) -> u32 { + cmp::max(match conf_target { + chaininterface::ConfirmationTarget::Background => self.background_est.load(Ordering::Acquire) as u32, + chaininterface::ConfirmationTarget::Normal => self.normal_est.load(Ordering::Acquire) as u32, + chaininterface::ConfirmationTarget::HighPriority => self.high_prio_est.load(Ordering::Acquire) as u32, + }, 253) + } +} + +/// Basic transaction broadcast +pub(crate) struct TxBroadcaster {} + +impl TxBroadcaster { + pub(crate) fn new() -> Self { + TxBroadcaster {} + } +} + +impl chaininterface::BroadcasterInterface for TxBroadcaster { + fn broadcast_transaction(&self, _tx: &bitcoin::blockdata::transaction::Transaction) { + + } +} + +pub(crate) struct ChainConnector { + buffer_connect: Mutex>, + buffer_disconnect: Mutex>, +} + +impl ChainConnector { + pub(crate) fn new() -> Self { + ChainConnector { + buffer_connect: Mutex::new(vec![]), + buffer_disconnect: Mutex::new(vec![]), + } + } + pub(crate) fn get_connect(&self) -> Vec { + if let Ok(mut buffer_connect) = self.buffer_connect.lock() { + return buffer_connect.drain(..).collect(); + } + vec![] + } +} + +impl ChainListener for ChainConnector { + fn block_connected(&self, block: &Block, height: u32) { + if let Ok(mut buffer_connect) = self.buffer_connect.lock() { + buffer_connect.push(WrapperBlock { header: block.header.clone(), txdata: block.txdata.clone(), height }); + } + } + fn block_disconnected(&self, header: &BlockHeader, height: u32) { + if let Ok(mut buffer_disconnect) = self.buffer_disconnect.lock() { + buffer_disconnect.push(WrapperBlock { header: header.clone(), txdata: vec![], height }); + } + } +} + +pub(super) async fn setup_chain_backend(chain_tip_hash: BlockHash, block_source: (String, String), buffer_blocks: Arc, chain_listener: Arc, mut chanman_connect: Sender) -> (JoinHandle<()>, JoinHandle<()>) + where CL: ChainListener + 'static, +{ + let join_handle_chain: JoinHandle<()> = tokio::spawn(async move { + let bitcoind_endpoint = HttpEndpoint::new(&("http://".to_string() + &block_source.1 + "/")).unwrap(); + let mut bitcoind_client = RPCClient::new(&block_source.0, bitcoind_endpoint); + let chain_tip = bitcoind_client.get_header(&chain_tip_hash, None).await.unwrap(); + let block_sources = vec![&mut bitcoind_client as &mut dyn BlockSource]; + let backup_sources = vec![]; + + let mut client = MicroSPVClient::init(chain_tip, block_sources, backup_sources, chain_listener, false); + let mut interval = tokio::time::interval(Duration::from_secs(1)); + loop { + interval.tick().await; + if client.poll_best_tip().await {} + } + }); + let join_handle_block: JoinHandle<()> = tokio::spawn(async move { + loop { + for e in buffer_blocks.get_connect() { + if let Ok(_) = chanman_connect.send(e).await {} + } + let one_centisecond = Duration::from_millis(100); + sleep(one_centisecond); + } + }); + (join_handle_chain, join_handle_block) +} diff --git a/src/utils.rs b/src/utils.rs new file mode 100644 index 00000000..60310d92 --- /dev/null +++ b/src/utils.rs @@ -0,0 +1,284 @@ +use lightning::util::logger::{Logger, Record}; + +use bitcoin::network::constants::Network; +use bitcoin::secp256k1::key::PublicKey; + +use futures_util::future::TryFutureExt; +use futures_util::stream::TryStreamExt; + +use serde_json::Value; + +use std::collections::HashMap; +use std::fs::OpenOptions; +use std::fs::read; +use std::io::Write; +use std::net::{IpAddr, SocketAddr}; +use std::str; +use std::sync::atomic::{AtomicUsize, Ordering}; + +use tokio::net::TcpStream; +use tokio::prelude::*; + +use time::OffsetDateTime; + +pub fn hex_to_vec(hex: &str) -> Option> { + let mut out = Vec::with_capacity(hex.len() / 2); + + let mut b = 0; + for (idx, c) in hex.as_bytes().iter().enumerate() { + b <<= 4; + match *c { + b'A'..=b'F' => b |= c - b'A' + 10, + b'a'..=b'f' => b |= c - b'a' + 10, + b'0'..=b'9' => b |= c - b'0', + _ => return None, + } + if (idx & 1) == 1 { + out.push(b); + b = 0; + } + } + + Some(out) +} + +pub fn hex_to_compressed_pubkey(hex: &str) -> Option { + let data = match hex_to_vec(&hex[0..33*2]) { + Some(bytes) => bytes, + None => return None + }; + match PublicKey::from_slice(&data) { + Ok(pk) => Some(pk), + Err(_) => None, + } +} + +#[inline] +pub fn hex_str(value: &[u8]) -> String { + let mut res = String::with_capacity(64); + for v in value { + res += &format!("{:02x}", v); + } + res +} + +#[inline] +pub fn slice_to_be64(v: &[u8]) -> u64 { + ((v[0] as u64) << 8*7) | + ((v[1] as u64) << 8*6) | + ((v[2] as u64) << 8*5) | + ((v[3] as u64) << 8*4) | + ((v[4] as u64) << 8*3) | + ((v[5] as u64) << 8*2) | + ((v[6] as u64) << 8*1) | + ((v[7] as u64) << 8*0) +} + +pub(super) struct LdkConfig { + conf_args: HashMap, +} + +impl LdkConfig { + pub(super) fn create(config_file: &str) -> Self { + if let Ok(config_bytes) = read(&config_file) { + if let Ok(s) = str::from_utf8(&config_bytes) { + let mut v: Vec<&str> = s.rsplit('\n').collect(); + v.retain(|line| line.contains("=")); + let mut conf_args = HashMap::with_capacity(v.len()); + for lines in v.iter() { + let entry: Vec<&str> = lines.rsplit("=").collect(); + //TODO: check for forbidden duplicate + conf_args.insert(entry[1].to_string(), entry[0].to_string()); + } + return LdkConfig { + conf_args, + } + } + } + panic!("ldk-node: error parsing config file {}", config_file); + } + pub(super) fn get_str(&self, arg: &str) -> String { + self.conf_args.get(arg).unwrap().clone() + } + pub(super) fn get_network(&self) -> Result { + if let Some(net) = self.conf_args.get("network") { + if net.contains("regtest") { return Ok(Network::Regtest) } + if net.contains("testnet") { return Ok(Network::Testnet) } + } + Err(()) + } + pub(super) fn get_ln_port(&self) -> Result { + if let Some(hostport) = self.conf_args.get("ln_hostport") { + let hostport: Vec<&str> = hostport.split(':').collect(); + return Ok(hostport[1].parse::().unwrap()) + } + Err(()) + } + pub(super) fn get_ldk_port(&self) -> Result { + if let Some(hostport) = self.conf_args.get("ldk_hostport") { + let hostport: Vec<&str> = hostport.split(':').collect(); + return Ok(hostport[1].parse::().unwrap()) + } + Err(()) + } +} + +#[macro_export] +macro_rules! log_sample { + ($logger: expr, $record: expr) => { + $logger.print($record); + } +} + +#[macro_export] +macro_rules! log_client { + ($logger: expr, $record: expr) => { + $logger.print_client($record); + } +} + +/// Basic log logic, excluding gossips messages +pub struct LogPrinter { + log_file: String, +} + +impl LogPrinter { + pub fn new(file: &str) -> Self { + Self { + log_file: file.to_string() + } + } + pub fn print(&self, record: &str) { + if let Ok(mut log_file) = OpenOptions::new().append(true).open(&self.log_file) { + if let Err (_) = log_file.write(format!("NODE: {}\n", record).as_bytes()) { + panic!("Can't write file {}", self.log_file); + } + } else { + panic!("Can't open file {}", self.log_file); + } + } + pub fn print_client(&self, record: &str) { + if let Ok(mut log_file) = OpenOptions::new().append(true).open(&self.log_file) { + if let Err (_) = log_file.write(format!("CLIENT: {}\n", record).as_bytes()) { + panic!("Can't write file {}", self.log_file); + } + } else { + panic!("Can't open file {}", self.log_file); + } + } +} + +impl Logger for LogPrinter { + fn log(&self, record: &Record) { + let log = record.args.to_string(); + if !log.contains("Received message of type 258") && !log.contains("Received message of type 256") && !log.contains("Received message of type 257") { + if let Ok(mut log_file) = OpenOptions::new().append(true).open(&self.log_file) { + log_file.write_all(format!("{} {:<5} [{}:{}] {}", OffsetDateTime::now_utc().format("%F %T"), record.level.to_string(), record.module_path, record.line, log).as_bytes()).unwrap(); + } + } + } +} + +#[macro_export] +macro_rules! exit_on { + ($msg: expr) => { + eprintln!($msg); + exit(1); + } +} + +/// Basic JSON-RPC client storing server host and credentials +pub struct RpcClient { + basic_auth: String, + uri: String, + id: AtomicUsize, + client: hyper::Client, +} + +impl RpcClient { + pub fn new(user_auth: &str, host_port: &str) -> Self { + Self { + basic_auth: "Basic ".to_string() + &base64::encode(user_auth), + uri: "http://".to_string() + host_port, + id: AtomicUsize::new(0), + client: hyper::Client::new(), + } + } + pub async fn send_request(&self, logger: &LogPrinter, payload: Value) { + let host_port: Vec<&str> = "127.0.0.1:7688".split(':').collect(); + let host: Vec = host_port[0].split('.').map(|x| u8::from_str_radix(&x, 10).unwrap()).collect(); + let addr = SocketAddr::new(IpAddr::from([host[0], host[1], host[2], host[3]]), u16::from_str_radix(&host_port[1], 10).unwrap()); + + if let Ok(mut stream) = TcpStream::connect(addr).await { + if let Ok(data) = serde_json::to_string(&payload) { + stream.write_u8(data.len() as u8).await.unwrap(); + stream.write_all(data.as_bytes()).await.unwrap(); + //stream.write(&[0]).await.unwrap(); + //stream.write_all(b"hello world!").await; + let mut buffer = [0;20]; + stream.read_exact(&mut buffer).await.unwrap(); + if let Ok(string) = String::from_utf8(buffer.to_vec()) { + println!("{}", string); + } + } else { + panic!("JSON serialization issues"); + } + } + } + /// params entries must be pre-quoted if appropriate + /// may_fail is only used to change logging + pub async fn make_rpc_call(&self, method: &str, params: &[&str], may_fail: bool) -> Result { + let auth: &str = &self.basic_auth; + let request = hyper::Request::post(&self.uri).header("Authorization", auth); + let mut param_str = String::new(); + for (idx, param) in params.iter().enumerate() { + param_str += param; + if idx != params.len() - 1 { + param_str += ","; + } + } + let req = "{\"method\":\"".to_string() + method + "\",\"params\":[" + ¶m_str + "],\"id\":" + &self.id.fetch_add(1, Ordering::AcqRel).to_string() + "}"; + if let Ok(res) = self.client.request(request.body(hyper::Body::from(req.clone())).unwrap()).map_err(|e| { + println!("Failed to connect to RPC server!"); + eprintln!("RPC Gave {} in response to {}", e, req); + () + }).await + { + if res.status() != hyper::StatusCode::OK { + if !may_fail { + println!("Failed to get RPC server response (probably bad auth)!"); + eprintln!("RPC returned status {} in response to {}", res.status(), req); + } + Err(()) + } else { + if let Ok(body) = res.into_body().map_ok(|b| b.to_vec()).try_concat().await { + let v: serde_json::Value = match serde_json::from_slice(&body[..]) { + Ok(v) => v, + Err(_) => { + println!("Failed to parse RPC server response!"); + return Err(()); + }, + }; + if !v.is_object() { + println!("Failed to parse RPC server response!"); + return Err(()); + } + let v_obj = v.as_object().unwrap(); + if v_obj.get("error") != Some(&serde_json::Value::Null) { + println!("Failed to parse RPC server response!"); + return Err(()); + } + if let Some(res) = v_obj.get("result") { + Ok((*res).clone()) + } else { + println!("Failed to parse RPC server response!"); + Err(()) + } + } else { + println!("Failed to load RPC server response!"); + Err(()) + } + } + } else { Err(()) } + } +}