Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Publish locally-made transactions to peers. #850

Merged
merged 2 commits into from
Mar 28, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion miner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ mod transaction_queue;
pub use transaction_queue::{TransactionQueue, AccountDetails};
pub use miner::{Miner};

use util::{H256, Address, FixedHash, Bytes};
use util::{H256, U256, Address, FixedHash, Bytes};
use ethcore::client::{BlockChainClient};
use ethcore::block::{ClosedBlock};
use ethcore::error::{Error};
Expand Down Expand Up @@ -107,6 +107,9 @@ pub trait MinerService : Send + Sync {

/// Query pending transactions for hash
fn transaction(&self, hash: &H256) -> Option<SignedTransaction>;

/// Suggested gas price
fn sensible_gas_price(&self) -> U256 { x!(20000000000u64) }
}

/// Mining status
Expand Down
5 changes: 5 additions & 0 deletions miner/src/miner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,11 @@ impl MinerService for Miner {
}
}

fn sensible_gas_price(&self) -> U256 {
// 10% above our minimum.
self.transaction_queue.lock().unwrap().minimal_gas_price().clone() * x!(110) / x!(100)
}

fn author(&self) -> Address {
*self.author.read().unwrap()
}
Expand Down
5 changes: 5 additions & 0 deletions miner/src/transaction_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,11 @@ impl TransactionQueue {
}
}

/// Get the minimal gas price.
pub fn minimal_gas_price(&self) -> &U256 {
&self.minimal_gas_price
}

/// Sets new gas price threshold for incoming transactions.
/// Any transaction already imported to the queue is not affected.
pub fn set_minimal_gas_price(&mut self, min_gas_price: U256) {
Expand Down
106 changes: 49 additions & 57 deletions rpc/src/v1/impls/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ use ethcore::client::*;
use ethcore::block::IsBlock;
use ethcore::views::*;
use ethcore::ethereum::Ethash;
use ethcore::ethereum::denominations::shannon;
use ethcore::transaction::{Transaction as EthTransaction, SignedTransaction, Action};
use self::ethash::SeedHashCompute;
use v1::traits::{Eth, EthFilter};
Expand All @@ -44,10 +43,6 @@ fn default_gas() -> U256 {
U256::from(21_000)
}

fn default_gas_price() -> U256 {
shannon() * U256::from(20)
}

/// Eth rpc implementation.
pub struct EthClient<C, S, A, M, EM = ExternalMiner>
where C: BlockChainClient,
Expand Down Expand Up @@ -173,16 +168,41 @@ impl<C, S, A, M, EM> EthClient<C, S, A, M, EM>
}
}

fn sign_call(client: &Arc<C>, request: CallRequest) -> SignedTransaction {
fn sign_call(&self, request: CallRequest) -> Result<SignedTransaction, Error> {
let client = take_weak!(self.client);
let miner = take_weak!(self.miner);
let from = request.from.unwrap_or(Address::zero());
EthTransaction {
Ok(EthTransaction {
nonce: request.nonce.unwrap_or_else(|| client.nonce(&from)),
action: request.to.map_or(Action::Create, Action::Call),
gas: request.gas.unwrap_or_else(default_gas),
gas_price: request.gas_price.unwrap_or_else(default_gas_price),
gas_price: request.gas_price.unwrap_or_else(|| miner.sensible_gas_price()),
value: request.value.unwrap_or_else(U256::zero),
data: request.data.map_or_else(Vec::new, |d| d.to_vec())
}.fake_sign(from)
}.fake_sign(from))
}

fn dispatch_transaction(&self, signed_transaction: SignedTransaction, raw_transaction: Vec<u8>) -> Result<Value, Error> {
let hash = signed_transaction.hash();

let import = {
let client = take_weak!(self.client);
take_weak!(self.miner).import_transactions(vec![signed_transaction], |a: &Address| AccountDetails {
nonce: client.nonce(a),
balance: client.balance(a),
})
};

match import.into_iter().collect::<Result<Vec<_>, _>>() {
Ok(_) => {
take_weak!(self.sync).new_transaction(raw_transaction);
to_value(&hash)
}
Err(e) => {
warn!("Error sending transaction: {:?}", e);
to_value(&H256::zero())
}
}
}
}

Expand Down Expand Up @@ -267,7 +287,7 @@ impl<C, S, A, M, EM> Eth for EthClient<C, S, A, M, EM>

fn gas_price(&self, params: Params) -> Result<Value, Error> {
match params {
Params::None => to_value(&default_gas_price()),
Params::None => to_value(&take_weak!(self.miner).sensible_gas_price()),
_ => Err(Error::invalid_params())
}
}
Expand Down Expand Up @@ -460,32 +480,20 @@ impl<C, S, A, M, EM> Eth for EthClient<C, S, A, M, EM>
let accounts = take_weak!(self.accounts);
match accounts.account_secret(&request.from) {
Ok(secret) => {
let miner = take_weak!(self.miner);
let client = take_weak!(self.client);

let transaction = EthTransaction {
nonce: request.nonce.unwrap_or_else(|| client.nonce(&request.from)),
action: request.to.map_or(Action::Create, Action::Call),
gas: request.gas.unwrap_or_else(default_gas),
gas_price: request.gas_price.unwrap_or_else(default_gas_price),
value: request.value.unwrap_or_else(U256::zero),
data: request.data.map_or_else(Vec::new, |d| d.to_vec())
let signed_transaction = {
let client = take_weak!(self.client);
let miner = take_weak!(self.miner);
EthTransaction {
nonce: request.nonce.unwrap_or_else(|| client.nonce(&request.from)),
action: request.to.map_or(Action::Create, Action::Call),
gas: request.gas.unwrap_or_else(default_gas),
gas_price: request.gas_price.unwrap_or_else(|| miner.sensible_gas_price()),
value: request.value.unwrap_or_else(U256::zero),
data: request.data.map_or_else(Vec::new, |d| d.to_vec()),
}.sign(&secret)
};

let signed_transaction = transaction.sign(&secret);
let hash = signed_transaction.hash();

let import = miner.import_transactions(vec![signed_transaction], |a: &Address| AccountDetails {
nonce: client.nonce(a),
balance: client.balance(a),
});
match import.into_iter().collect::<Result<Vec<_>, _>>() {
Ok(_) => to_value(&hash),
Err(e) => {
warn!("Error sending transaction: {:?}", e);
to_value(&H256::zero())
}
}
let raw_transaction = encode(&signed_transaction).to_vec();
self.dispatch_transaction(signed_transaction, raw_transaction)
},
Err(_) => { to_value(&H256::zero()) }
}
Expand All @@ -495,43 +503,27 @@ impl<C, S, A, M, EM> Eth for EthClient<C, S, A, M, EM>
fn send_raw_transaction(&self, params: Params) -> Result<Value, Error> {
from_params::<(Bytes, )>(params)
.and_then(|(raw_transaction, )| {
let decoded: Result<SignedTransaction, _> = UntrustedRlp::new(&raw_transaction.to_vec()).as_val();
match decoded {
Ok(signed_tx) => {
let miner = take_weak!(self.miner);
let client = take_weak!(self.client);

let hash = signed_tx.hash();
let import = miner.import_transactions(vec![signed_tx], |a: &Address| AccountDetails {
nonce: client.nonce(a),
balance: client.balance(a),
});
match import.into_iter().collect::<Result<Vec<_>, _>>() {
Ok(_) => to_value(&hash),
Err(e) => {
warn!("Error sending transaction: {:?}", e);
to_value(&H256::zero())
}
}
},
Err(_) => { to_value(&H256::zero()) }
let raw_transaction = raw_transaction.to_vec();
match UntrustedRlp::new(&raw_transaction).as_val() {
Ok(signed_transaction) => self.dispatch_transaction(signed_transaction, raw_transaction),
Err(_) => to_value(&H256::zero()),
}
})
}

fn call(&self, params: Params) -> Result<Value, Error> {
from_params_discard_second(params).and_then(|(request, )| {
let signed = try!(self.sign_call(request));
let client = take_weak!(self.client);
let signed = Self::sign_call(&client, request);
let output = client.call(&signed).map(|e| Bytes(e.output)).unwrap_or(Bytes::new(vec![]));
to_value(&output)
})
}

fn estimate_gas(&self, params: Params) -> Result<Value, Error> {
from_params_discard_second(params).and_then(|(request, )| {
let signed = try!(self.sign_call(request));
let client = take_weak!(self.client);
let signed = Self::sign_call(&client, request);
let used = client.call(&signed).map(|res| res.gas_used + res.refunded).unwrap_or(From::from(0));
to_value(&used)
})
Expand Down
5 changes: 4 additions & 1 deletion rpc/src/v1/tests/helpers/sync_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

//! Test implementation of SyncProvider.

use util::U256;
use util::{U256, Bytes};
use ethsync::{SyncProvider, SyncStatus, SyncState};
use std::sync::{RwLock};

Expand Down Expand Up @@ -59,5 +59,8 @@ impl SyncProvider for TestSyncProvider {
fn status(&self) -> SyncStatus {
self.status.read().unwrap().clone()
}

fn new_transaction(&self, _raw_transaction: Bytes) {
}
}

48 changes: 48 additions & 0 deletions sync/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,10 @@ pub struct ChainSync {
network_id: U256,
/// Miner
miner: Arc<Miner>,

/// Transactions to propagate
// TODO: reconsider where this is in the codebase - seems a little dodgy to have here.
transactions_to_send: Vec<Bytes>,
}

type RlpResponseResult = Result<Option<(PacketId, RlpStream)>, PacketDecodeError>;
Expand All @@ -243,6 +247,7 @@ impl ChainSync {
max_download_ahead_blocks: max(MAX_HEADERS_TO_REQUEST, config.max_download_ahead_blocks),
network_id: config.network_id,
miner: miner,
transactions_to_send: vec![],
}
}

Expand Down Expand Up @@ -938,6 +943,12 @@ impl ChainSync {
sync.disable_peer(peer_id);
}
}

/// Place a new transaction on the wire.
pub fn new_transaction(&mut self, raw_transaction: Bytes) {
self.transactions_to_send.push(raw_transaction);
}

/// Called when peer sends us new transactions
fn on_peer_transactions(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> {
// accepting transactions once only fully synced
Expand Down Expand Up @@ -1271,7 +1282,44 @@ impl ChainSync {
sent
}

/// propagates new transactions to all peers
fn propagate_new_transactions(&mut self, io: &mut SyncIo) -> usize {

// Early out of nobody to send to.
if self.peers.len() == 0 {
return 0;
}

let mut packet = RlpStream::new_list(self.transactions_to_send.len());
for tx in self.transactions_to_send.iter() {
packet.append_raw(tx, 1);
}
self.transactions_to_send.clear();
let rlp = packet.out();

let lucky_peers = {
// sqrt(x)/x scaled to max u32
let fraction = (self.peers.len() as f64).powf(-0.5).mul(u32::max_value() as f64).round() as u32;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

at some point we should refactor this peer selection code into a common function

let small = self.peers.len() < MIN_PEERS_PROPAGATION;
let lucky_peers = self.peers.iter()
.filter_map(|(&p, _)| if small || ::rand::random::<u32>() < fraction { Some(p.clone()) } else { None })
.collect::<Vec<_>>();

// taking at max of MAX_PEERS_PROPAGATION
lucky_peers.iter().map(|&id| id.clone()).take(min(lucky_peers.len(), MAX_PEERS_PROPAGATION)).collect::<Vec<PeerId>>()
};

let sent = lucky_peers.len();
for peer_id in lucky_peers {
self.send_packet(io, peer_id, TRANSACTIONS_PACKET, rlp.clone());
}
sent
}

fn propagate_latest_blocks(&mut self, io: &mut SyncIo) {
if !self.transactions_to_send.is_empty() {
self.propagate_new_transactions(io);
}
let chain_info = io.chain().chain_info();
if (((chain_info.best_block_number as i64) - (self.last_sent_block_number as i64)).abs() as BlockNumber) < MAX_PEER_LAG_PROPAGATION {
let blocks = self.propagate_blocks(&chain_info, io);
Expand Down
10 changes: 9 additions & 1 deletion sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ use std::ops::*;
use std::sync::*;
use util::network::{NetworkProtocolHandler, NetworkService, NetworkContext, PeerId};
use util::TimerToken;
use util::{U256, ONE_U256};
use util::{U256, Bytes, ONE_U256};
use ethcore::client::Client;
use ethcore::service::SyncMessage;
use ethminer::Miner;
Expand Down Expand Up @@ -101,6 +101,9 @@ impl Default for SyncConfig {
pub trait SyncProvider: Send + Sync {
/// Get sync status
fn status(&self) -> SyncStatus;

/// Note that a user has submitted a new transaction.
fn new_transaction(&self, raw_transaction: Bytes);
}

/// Ethereum network protocol handler
Expand Down Expand Up @@ -140,6 +143,11 @@ impl SyncProvider for EthSync {
fn status(&self) -> SyncStatus {
self.sync.read().unwrap().status()
}

/// Note that a user has submitted a new transaction.
fn new_transaction(&self, raw_transaction: Bytes) {
self.sync.write().unwrap().new_transaction(raw_transaction);
}
}

impl NetworkProtocolHandler<SyncMessage> for EthSync {
Expand Down