Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v1.17: BankingStage Forwarding Filter (backport of #685) #696

Merged
merged 5 commits into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bench-streamer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ fn main() -> Result<()> {
Duration::from_millis(1), // coalesce
true,
None,
false,
));
}

Expand Down
1 change: 1 addition & 0 deletions core/src/banking_stage/forwarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ impl Forwarder {
self.update_data_budget();
let packet_vec: Vec<_> = forwardable_packets
.filter(|p| !p.meta().forwarded())
.filter(|p| p.meta().is_from_staked_node())
.filter(|p| self.data_budget.take(p.meta().size))
.filter_map(|p| p.data(..).map(|data| data.to_vec()))
.collect();
Expand Down
3 changes: 3 additions & 0 deletions core/src/fetch_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ impl FetchStage {
coalesce,
true,
in_vote_only_mode.clone(),
false, // unstaked connections
)
})
.collect()
Expand All @@ -190,6 +191,7 @@ impl FetchStage {
coalesce,
true,
in_vote_only_mode.clone(),
false, // unstaked connections
)
})
.collect()
Expand All @@ -210,6 +212,7 @@ impl FetchStage {
coalesce,
true,
None,
true, // only staked connections should be voting
)
})
.collect();
Expand Down
2 changes: 2 additions & 0 deletions core/src/repair/ancestor_hashes_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ impl AncestorHashesService {
Duration::from_millis(1), // coalesce
false, // use_pinned_memory
None, // in_vote_only_mode
false, // is_staked_service
);

let (quic_endpoint_response_sender, quic_endpoint_response_receiver) = unbounded();
Expand Down Expand Up @@ -1299,6 +1300,7 @@ mod test {
Duration::from_millis(1), // coalesce
false,
None,
false,
);
let (remote_request_sender, remote_request_receiver) = unbounded();
let t_packet_adapter = Builder::new()
Expand Down
1 change: 1 addition & 0 deletions core/src/repair/serve_repair_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ impl ServeRepairService {
Duration::from_millis(1), // coalesce
false, // use_pinned_memory
None, // in_vote_only_mode
false, // is_staked_service
);
let t_packet_adapter = Builder::new()
.name(String::from("solServRAdapt"))
Expand Down
1 change: 1 addition & 0 deletions core/src/shred_fetch_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ impl ShredFetchStage {
PACKET_COALESCE_DURATION,
true, // use_pinned_memory
None, // in_vote_only_mode
false,
)
})
.collect();
Expand Down
1 change: 1 addition & 0 deletions gossip/src/gossip_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ impl GossipService {
Duration::from_millis(1), // coalesce
false,
None,
false,
);
let (consume_sender, listen_receiver) = unbounded();
let t_socket_consume = cluster_info.clone().start_socket_consume_thread(
Expand Down
12 changes: 12 additions & 0 deletions sdk/src/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ bitflags! {
/// the packet is built.
/// This field can be removed when the above feature gate is adopted by mainnet-beta.
const ROUND_COMPUTE_UNIT_PRICE = 0b0010_0000;
/// For marking packets from staked nodes
const FROM_STAKED_NODE = 0b1000_0000;
}
}

Expand Down Expand Up @@ -213,6 +215,11 @@ impl Meta {
self.port = socket_addr.port();
}

pub fn set_from_staked_node(&mut self, from_staked_node: bool) {
self.flags
.set(PacketFlags::FROM_STAKED_NODE, from_staked_node);
}

#[inline]
pub fn discard(&self) -> bool {
self.flags.contains(PacketFlags::DISCARD)
Expand Down Expand Up @@ -265,6 +272,11 @@ impl Meta {
pub fn round_compute_unit_price(&self) -> bool {
self.flags.contains(PacketFlags::ROUND_COMPUTE_UNIT_PRICE)
}

#[inline]
pub fn is_from_staked_node(&self) -> bool {
self.flags.contains(PacketFlags::FROM_STAKED_NODE)
}
}

impl Default for Meta {
Expand Down
1 change: 1 addition & 0 deletions streamer/src/nonblocking/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -876,6 +876,7 @@ async fn handle_chunk(
if packet_accum.is_none() {
let mut meta = Meta::default();
meta.set_socket_addr(remote_addr);
meta.set_from_staked_node(matches!(peer_type, ConnectionPeerType::Staked));
*packet_accum = Some(PacketAccumulator {
meta,
chunks: Vec::new(),
Expand Down
9 changes: 8 additions & 1 deletion streamer/src/streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ fn recv_loop(
coalesce: Duration,
use_pinned_memory: bool,
in_vote_only_mode: Option<Arc<AtomicBool>>,
is_staked_service: bool,
) -> Result<()> {
loop {
let mut packet_batch = if use_pinned_memory {
Expand Down Expand Up @@ -147,7 +148,9 @@ fn recv_loop(
if len == PACKETS_PER_BATCH {
full_packet_batches_count.fetch_add(1, Ordering::Relaxed);
}

packet_batch
.iter_mut()
.for_each(|p| p.meta_mut().set_from_staked_node(is_staked_service));
packet_batch_sender.send(packet_batch)?;
}
break;
Expand All @@ -156,6 +159,7 @@ fn recv_loop(
}
}

#[allow(clippy::too_many_arguments)]
pub fn receiver(
socket: Arc<UdpSocket>,
exit: Arc<AtomicBool>,
Expand All @@ -165,6 +169,7 @@ pub fn receiver(
coalesce: Duration,
use_pinned_memory: bool,
in_vote_only_mode: Option<Arc<AtomicBool>>,
is_staked_service: bool,
) -> JoinHandle<()> {
let res = socket.set_read_timeout(Some(Duration::new(1, 0)));
assert!(res.is_ok(), "streamer::receiver set_read_timeout error");
Expand All @@ -180,6 +185,7 @@ pub fn receiver(
coalesce,
use_pinned_memory,
in_vote_only_mode,
is_staked_service,
);
})
.unwrap()
Expand Down Expand Up @@ -488,6 +494,7 @@ mod test {
Duration::from_millis(1), // coalesce
true,
None,
false,
);
const NUM_PACKETS: usize = 5;
let t_responder = {
Expand Down
Loading