Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sniffer messages cleaner #1341

Merged
merged 3 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 68 additions & 2 deletions roles/tests-integration/lib/sniffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,8 +423,8 @@ impl Sniffer {
}
}

/// used to block the test runtime
/// while we wait until Sniffer has received a message of some specific type
/// Waits until a message of the specified type is received into the `message_direction`
/// corresponding queue.
pub async fn wait_for_message_type(
&self,
message_direction: MessageDirection,
Expand Down Expand Up @@ -456,6 +456,56 @@ impl Sniffer {
sleep(Duration::from_secs(1)).await;
}
}

/// Similar to `[Sniffer::wait_for_message_type]` but also removes the messages from the queue
/// including the specified message type.
pub async fn wait_for_message_type_and_clean_queue(
&self,
message_direction: MessageDirection,
message_type: u8,
) -> bool {
let now = std::time::Instant::now();
loop {
let has_message_type = match message_direction {
MessageDirection::ToDownstream => self
.messages_from_upstream
.has_message_type_with_remove(message_type),
MessageDirection::ToUpstream => self
.messages_from_downstream
.has_message_type_with_remove(message_type),
};

// ready to unblock test runtime
if has_message_type {
return true;
}

// 10 min timeout
// only for worst case, ideally should never be triggered
if now.elapsed().as_secs() > 10 * 60 {
panic!("Timeout waiting for message type");
}

// sleep to reduce async lock contention
sleep(Duration::from_secs(1)).await;
}
}

/// Checks whether the sniffer has received a message of the specified type.
pub async fn includes_message_type(
&self,
message_direction: MessageDirection,
message_type: u8,
) -> bool {
match message_direction {
MessageDirection::ToDownstream => {
self.messages_from_upstream.has_message_type(message_type)
}
MessageDirection::ToUpstream => {
self.messages_from_downstream.has_message_type(message_type)
}
}
}
}

// Utility macro to assert that the downstream and upstream roles have sent specific messages.
Expand Down Expand Up @@ -656,6 +706,22 @@ impl MessagesAggregator {
has_message
}

fn has_message_type_with_remove(&self, message_type: u8) -> bool {
self.messages
.safe_lock(|messages| {
let mut cloned_messages = messages.clone();
for (pos, (t, _)) in cloned_messages.iter().enumerate() {
if *t == message_type {
let drained = cloned_messages.drain(pos + 1..).collect();
*messages = drained;
return true;
}
}
false
})
.unwrap()
}

// The aggregator queues messages in FIFO order, so this function returns the oldest message in
// the queue.
//
Expand Down
29 changes: 10 additions & 19 deletions roles/tests-integration/tests/pool_integration.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use integration_tests_sv2::*;

use crate::sniffer::MessageDirection;
use const_sv2::{MESSAGE_TYPE_NEW_EXTENDED_MINING_JOB, MESSAGE_TYPE_NEW_TEMPLATE};
use const_sv2::{
MESSAGE_TYPE_MINING_SET_NEW_PREV_HASH, MESSAGE_TYPE_NEW_EXTENDED_MINING_JOB,
MESSAGE_TYPE_NEW_TEMPLATE,
};
use roles_logic_sv2::{
common_messages_sv2::{Protocol, SetupConnection},
parsers::{AnyMessage, CommonMessages, Mining, PoolMessages, TemplateDistribution},
Expand Down Expand Up @@ -92,24 +95,12 @@ async fn header_timestamp_value_assertion_in_new_extended_mining_job() {
}
_ => panic!("SetNewPrevHash not found!"),
};
// Assertions of messages between Pool and Translator Proxy (these are not necessary for the
// test itself, but they are used to pop from the sniffer's message queue)
assert_common_message!(
&pool_translator_sniffer.next_message_from_upstream(),
SetupConnectionSuccess
);
assert_mining_message!(
&pool_translator_sniffer.next_message_from_upstream(),
OpenExtendedMiningChannelSuccess
);
assert_mining_message!(
&pool_translator_sniffer.next_message_from_upstream(),
NewExtendedMiningJob
);
assert_mining_message!(
&pool_translator_sniffer.next_message_from_upstream(),
SetNewPrevHash
);
pool_translator_sniffer
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment was marked as resolved.

This comment was marked as resolved.

.wait_for_message_type_and_clean_queue(
MessageDirection::ToDownstream,
MESSAGE_TYPE_MINING_SET_NEW_PREV_HASH,
)
.await;
// Wait for a second NewExtendedMiningJob message
pool_translator_sniffer
.wait_for_message_type(
Expand Down
39 changes: 37 additions & 2 deletions roles/tests-integration/tests/sniffer_integration.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use const_sv2::MESSAGE_TYPE_SETUP_CONNECTION_ERROR;
use const_sv2::{
MESSAGE_TYPE_SETUP_CONNECTION_ERROR, MESSAGE_TYPE_SETUP_CONNECTION_SUCCESS,
MESSAGE_TYPE_SET_NEW_PREV_HASH,
};
use integration_tests_sv2::*;
use roles_logic_sv2::{
common_messages_sv2::SetupConnectionError,
Expand All @@ -10,7 +13,6 @@ use std::convert::TryInto;
#[tokio::test]
async fn test_sniffer_interrupter() {
let (_tp, tp_addr) = start_template_provider(None).await;
use const_sv2::MESSAGE_TYPE_SETUP_CONNECTION_SUCCESS;
let message =
PoolMessages::Common(CommonMessages::SetupConnectionError(SetupConnectionError {
flags: 0,
Expand All @@ -33,3 +35,36 @@ async fn test_sniffer_interrupter() {
assert_common_message!(&sniffer.next_message_from_downstream(), SetupConnection);
assert_common_message!(&sniffer.next_message_from_upstream(), SetupConnectionError);
}

#[tokio::test]
async fn test_sniffer_wait_for_message_type_with_remove() {
let (_tp, tp_addr) = start_template_provider(None).await;
let (sniffer, sniffer_addr) = start_sniffer("".to_string(), tp_addr, false, None).await;
let _ = start_pool(Some(sniffer_addr)).await;
assert!(
sniffer
.wait_for_message_type_and_clean_queue(
MessageDirection::ToDownstream,
MESSAGE_TYPE_SET_NEW_PREV_HASH,
)
.await
);
assert_eq!(
sniffer
.includes_message_type(
MessageDirection::ToDownstream,
MESSAGE_TYPE_SETUP_CONNECTION_SUCCESS
)
.await,
false
);
assert_eq!(
sniffer
.includes_message_type(
MessageDirection::ToDownstream,
MESSAGE_TYPE_SET_NEW_PREV_HASH
)
.await,
false
);
}
Loading