Skip to content

Commit

Permalink
Revert "use tpu-client-next in send_transaction_service (#3515)"
Browse files Browse the repository at this point in the history
This reverts commit 5c0f173.
  • Loading branch information
KirillLykov committed Jan 3, 2025
1 parent 2bfa7d0 commit 97f1c5f
Show file tree
Hide file tree
Showing 17 changed files with 245 additions and 603 deletions.
6 changes: 1 addition & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions banks-server/src/banks_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use {
solana_send_transaction_service::{
send_transaction_service::{SendTransactionService, TransactionInfo},
tpu_info::NullTpuInfo,
transaction_client::ConnectionCacheClient,
},
std::{
io,
Expand Down Expand Up @@ -455,16 +454,17 @@ pub async fn start_tcp_server(
.map(move |chan| {
let (sender, receiver) = unbounded();

let client = ConnectionCacheClient::<NullTpuInfo>::new(
connection_cache.clone(),
SendTransactionService::new::<NullTpuInfo>(
tpu_addr,
&bank_forks,
None,
None,
receiver,
connection_cache.clone(),
5_000,
0,
exit.clone(),
);

SendTransactionService::new(&bank_forks, receiver, client, 5_000, exit.clone());

let server = BanksServer::new(
bank_forks.clone(),
block_commitment_cache.clone(),
Expand Down
31 changes: 1 addition & 30 deletions programs/sbf/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

77 changes: 53 additions & 24 deletions rpc/src/cluster_tpu_info.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use {
solana_gossip::{cluster_info::ClusterInfo, contact_info::Protocol},
solana_poh::poh_recorder::PohRecorder,
solana_sdk::{clock::NUM_CONSECUTIVE_LEADER_SLOTS, pubkey::Pubkey},
solana_sdk::{
clock::{Slot, NUM_CONSECUTIVE_LEADER_SLOTS},
pubkey::Pubkey,
},
solana_send_transaction_service::tpu_info::TpuInfo,
std::{
collections::HashMap,
Expand Down Expand Up @@ -47,7 +50,7 @@ impl TpuInfo for ClusterTpuInfo {
.collect();
}

fn get_unique_leader_tpus(&self, max_count: u64, protocol: Protocol) -> Vec<&SocketAddr> {
fn get_leader_tpus(&self, max_count: u64, protocol: Protocol) -> Vec<&SocketAddr> {
let recorder = self.poh_recorder.read().unwrap();
let leaders: Vec<_> = (0..max_count)
.filter_map(|i| recorder.leader_after_n_slots(i * NUM_CONSECUTIVE_LEADER_SLOTS))
Expand All @@ -67,23 +70,37 @@ impl TpuInfo for ClusterTpuInfo {
unique_leaders
}

fn get_leader_tpus(&self, max_count: u64, protocol: Protocol) -> Vec<&SocketAddr> {
fn get_leader_tpus_with_slots(
&self,
max_count: u64,
protocol: Protocol,
) -> Vec<(&SocketAddr, Slot)> {
let recorder = self.poh_recorder.read().unwrap();
let leader_pubkeys: Vec<_> = (0..max_count)
.filter_map(|i| recorder.leader_after_n_slots(i * NUM_CONSECUTIVE_LEADER_SLOTS))
let leaders: Vec<_> = (0..max_count)
.rev()
.filter_map(|future_slot| {
NUM_CONSECUTIVE_LEADER_SLOTS
.checked_mul(future_slot)
.and_then(|slots_in_the_future| {
recorder.leader_and_slot_after_n_slots(slots_in_the_future)
})
})
.collect();
drop(recorder);
leader_pubkeys
.iter()
.filter_map(|leader_pubkey| {
let addrs_to_slots = leaders
.into_iter()
.filter_map(|(leader_id, leader_slot)| {
self.recent_peers
.get(leader_pubkey)
.map(|addr| match protocol {
Protocol::UDP => &addr.0,
Protocol::QUIC => &addr.1,
.get(&leader_id)
.map(|(udp_tpu, quic_tpu)| match protocol {
Protocol::UDP => (udp_tpu, leader_slot),
Protocol::QUIC => (quic_tpu, leader_slot),
})
})
.collect()
.collect::<HashMap<_, _>>();
let mut unique_leaders = Vec::from_iter(addrs_to_slots);
unique_leaders.sort_by_key(|(_addr, slot)| *slot);
unique_leaders
}
}

Expand Down Expand Up @@ -258,12 +275,12 @@ mod test {
let first_leader =
solana_ledger::leader_schedule_utils::slot_leader_at(slot, &bank).unwrap();
assert_eq!(
leader_info.get_unique_leader_tpus(1, Protocol::UDP),
leader_info.get_leader_tpus(1, Protocol::UDP),
vec![&recent_peers.get(&first_leader).unwrap().0]
);
assert_eq!(
leader_info.get_leader_tpus(1, Protocol::UDP),
vec![&recent_peers.get(&first_leader).unwrap().0]
leader_info.get_leader_tpus_with_slots(1, Protocol::UDP),
vec![(&recent_peers.get(&first_leader).unwrap().0, 0)]
);

let second_leader = solana_ledger::leader_schedule_utils::slot_leader_at(
Expand All @@ -277,12 +294,15 @@ mod test {
];
expected_leader_sockets.dedup();
assert_eq!(
leader_info.get_unique_leader_tpus(2, Protocol::UDP),
leader_info.get_leader_tpus(2, Protocol::UDP),
expected_leader_sockets
);
assert_eq!(
leader_info.get_leader_tpus(2, Protocol::UDP),
leader_info.get_leader_tpus_with_slots(2, Protocol::UDP),
expected_leader_sockets
.into_iter()
.zip([0, 4])
.collect::<Vec<_>>()
);

let third_leader = solana_ledger::leader_schedule_utils::slot_leader_at(
Expand All @@ -297,17 +317,26 @@ mod test {
];
expected_leader_sockets.dedup();
assert_eq!(
leader_info.get_unique_leader_tpus(3, Protocol::UDP),
leader_info.get_leader_tpus(3, Protocol::UDP),
expected_leader_sockets
);
// Only 2 leader tpus are returned always... so [0, 4, 8] isn't right here.
// This assumption is safe. After all, leader schedule generation must be deterministic.
assert_eq!(
leader_info.get_leader_tpus_with_slots(3, Protocol::UDP),
expected_leader_sockets
.into_iter()
.zip([0, 4])
.collect::<Vec<_>>()
);

for x in 4..8 {
assert!(leader_info.get_leader_tpus(x, Protocol::UDP).len() <= recent_peers.len());
assert!(
leader_info.get_unique_leader_tpus(x, Protocol::UDP).len() <= recent_peers.len()
);
assert_eq!(
leader_info.get_leader_tpus(x, Protocol::UDP).len(),
x as usize
leader_info
.get_leader_tpus_with_slots(x, Protocol::UDP)
.len()
<= recent_peers.len()
);
}
}
Expand Down
39 changes: 17 additions & 22 deletions rpc/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ use {
solana_runtime::commitment::CommitmentSlots,
solana_send_transaction_service::{
send_transaction_service::SendTransactionService, tpu_info::NullTpuInfo,
transaction_client::ConnectionCacheClient,
},
solana_streamer::socket::SocketAddrSpace,
};
Expand Down Expand Up @@ -459,19 +458,14 @@ impl JsonRpcRequestProcessor {
.tpu(connection_cache.protocol())
.unwrap();
let (transaction_sender, transaction_receiver) = unbounded();

let client = ConnectionCacheClient::<NullTpuInfo>::new(
connection_cache.clone(),
SendTransactionService::new::<NullTpuInfo>(
tpu_address,
None,
None,
1,
);
SendTransactionService::new(
&bank_forks,
None,
transaction_receiver,
client,
connection_cache,
1000,
1,
exit.clone(),
);

Expand Down Expand Up @@ -4578,9 +4572,7 @@ pub mod tests {
},
vote::state::VoteState,
},
solana_send_transaction_service::{
tpu_info::NullTpuInfo, transaction_client::ConnectionCacheClient,
},
solana_send_transaction_service::tpu_info::NullTpuInfo,
solana_transaction_status::{
EncodedConfirmedBlock, EncodedTransaction, EncodedTransactionWithStatusMeta,
TransactionDetails,
Expand Down Expand Up @@ -6701,14 +6693,16 @@ pub mod tests {
Arc::new(PrioritizationFeeCache::default()),
service_runtime(rpc_threads, rpc_blocking_threads, rpc_niceness_adj),
);
let client = ConnectionCacheClient::<NullTpuInfo>::new(
connection_cache.clone(),
SendTransactionService::new::<NullTpuInfo>(
tpu_address,
&bank_forks,
None,
None,
receiver,
connection_cache,
1000,
1,
exit,
);
SendTransactionService::new(&bank_forks, receiver, client, 1000, exit.clone());

let mut bad_transaction = system_transaction::transfer(
&mint_keypair,
Expand Down Expand Up @@ -6981,15 +6975,16 @@ pub mod tests {
Arc::new(PrioritizationFeeCache::default()),
service_runtime(rpc_threads, rpc_blocking_threads, rpc_niceness_adj),
);
let client = ConnectionCacheClient::<NullTpuInfo>::new(
connection_cache.clone(),
SendTransactionService::new::<NullTpuInfo>(
tpu_address,
&bank_forks,
None,
None,
receiver,
connection_cache,
1000,
1,
exit,
);
SendTransactionService::new(&bank_forks, receiver, client, 1000, exit.clone());

assert_eq!(
request_processor.get_block_commitment(0),
RpcBlockCommitment {
Expand Down
18 changes: 5 additions & 13 deletions rpc/src/rpc_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,7 @@ use {
exit::Exit, genesis_config::DEFAULT_GENESIS_DOWNLOAD_PATH, hash::Hash,
native_token::lamports_to_sol,
},
solana_send_transaction_service::{
send_transaction_service::{self, SendTransactionService},
transaction_client::ConnectionCacheClient,
},
solana_send_transaction_service::send_transaction_service::{self, SendTransactionService},
solana_storage_bigtable::CredentialType,
std::{
net::SocketAddr,
Expand Down Expand Up @@ -465,20 +462,15 @@ impl JsonRpcService {

let leader_info =
poh_recorder.map(|recorder| ClusterTpuInfo::new(cluster_info.clone(), recorder));
let client = ConnectionCacheClient::new(
connection_cache,
let _send_transaction_service = Arc::new(SendTransactionService::new_with_config(
tpu_address,
send_transaction_service_config.tpu_peers.clone(),
leader_info,
send_transaction_service_config.leader_forward_count,
);
let _send_transaction_service = SendTransactionService::new_with_config(
&bank_forks,
leader_info,
receiver,
client,
connection_cache,
send_transaction_service_config,
exit,
);
));

#[cfg(test)]
let test_request_processor = request_processor.clone();
Expand Down
9 changes: 1 addition & 8 deletions send-transaction-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,20 @@ license = { workspace = true }
edition = { workspace = true }

[dependencies]
async-trait = { workspace = true }
crossbeam-channel = { workspace = true }
itertools = { workspace = true }
log = { workspace = true }
solana-client = { workspace = true }
solana-connection-cache = { workspace = true }
solana-measure = { workspace = true }
solana-metrics = { workspace = true }
solana-quic-client = { workspace = true }
solana-runtime = { workspace = true }
solana-sdk = { workspace = true }
solana-tpu-client-next = { workspace = true }
tokio = { workspace = true, features = ["full"] }
tokio-util = { workspace = true }
solana-tpu-client = { workspace = true }

[dev-dependencies]
solana-logger = { workspace = true }
solana-runtime = { workspace = true, features = ["dev-context-only-utils"] }

[features]
dev-context-only-utils = []

[package.metadata.docs.rs]
targets = ["x86_64-unknown-linux-gnu"]
Loading

0 comments on commit 97f1c5f

Please sign in to comment.