Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tweak logs and attempt to avoid races around removing nodes #504

Merged
merged 2 commits into from
Oct 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions backend/common/src/assign_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ where

pub fn assign_id(&mut self, details: Details) -> Id {
let this_id = self.current_id;
self.current_id += 1;
// It's very unlikely we'll ever overflow the ID limit, but in case we do,
// a wrapping_add will almost certainly be fine:
self.current_id = self.current_id.wrapping_add(1);
self.mapping.insert(this_id, details);
this_id.into()
}
Expand All @@ -73,7 +75,10 @@ where
}

pub fn clear(&mut self) {
*self = AssignId::new();
// Leave the `current_id` as-is. Why? To avoid reusing IDs and risking
// race conditions where old messages can accidentally screw with new nodes
// that have been assigned the same ID.
self.mapping = BiMap::new();
Copy link
Member

@niklasad1 niklasad1 Oct 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, I don't understand.

Before we were resetting this.id == 0 in BiMap::new() and multiple nodes to entities could do this an end-up with id == 0 after clear and now just ignore this id and the next one will be this.id +1?

It looks like the id which will wrap in release mode if this.id + 1 > MAX

Can you add an explicit panic or something so we are sure that it doesn't wrap around and ends up to replace an "old ID" or what will happen then?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, I did this because I worried that "old" IDs might be reused in some sorts of obscure race conditions, leading to new nodes being affected by "old" messages or something like that.

Indeed, eventually it'll overflow and panic, but since the ID is a 64 bit value it'll never realisically hit usize::MAX so I wasn't worried about overflowing (I almost considered adding an explicit wrapping_add but meh)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it was so easy to do I did make the thing wrapping_add :)

}

pub fn iter(&self) -> impl Iterator<Item = (Id, &Details)> {
Expand Down
17 changes: 8 additions & 9 deletions backend/telemetry_core/src/aggregator/inner_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ impl InnerLoop {
}

if let Err(e) = metered_tx.send(msg) {
log::error!("Cannot send message into aggregator: {}", e);
log::error!("Cannot send message into aggregator: {e}");
break;
}
}
Expand Down Expand Up @@ -386,10 +386,11 @@ impl InnerLoop {
let node_id = match self.node_ids.remove_by_right(&(shard_conn_id, local_id)) {
Some((node_id, _)) => node_id,
None => {
log::error!(
"Cannot find ID for node with shard/connectionId of {:?}/{:?}",
shard_conn_id,
local_id
// It's possible that some race between removing and disconnecting shards might lead to
// more than one remove message for the same node. This isn't really a problem, but we
// hope it won't happen so make a note if it does:
log::debug!(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we worried about the process consuming too much memory because we may never call remove_nodes_and_broadcast_result here? Leading to a potentially OOM kill?

Would it be possible for a malicious party to force the shard to submit Remove { random_id }?
And in that could we "mute" / disconnect the shard if it makes N wrong Remove calls?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It shouldn't be possible for any party to have any control over any of the ids; we assign them all internally.

And if the id doesn't exist for this remove message, it means the node has been removed already :)

Remove calls are also entirely internal; an external person cannot cause it to be called except by disconnecting (or when messages with certain ids go stale).

"Remove: Cannot find ID for node with shard/connectionId of {shard_conn_id:?}/{local_id:?}"
);
return;
}
Expand All @@ -401,9 +402,7 @@ impl InnerLoop {
Some(id) => *id,
None => {
log::error!(
"Cannot find ID for node with shard/connectionId of {:?}/{:?}",
shard_conn_id,
local_id
"Update: Cannot find ID for node with shard/connectionId of {shard_conn_id:?}/{local_id:?}"
);
return;
}
Expand Down Expand Up @@ -606,7 +605,7 @@ impl InnerLoop {
let removed_details = match self.node_state.remove_node(node_id) {
Some(remove_details) => remove_details,
None => {
log::error!("Could not find node {:?}", node_id);
log::error!("Could not find node {node_id:?}");
return;
}
};
Expand Down
34 changes: 9 additions & 25 deletions backend/telemetry_core/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,21 +251,15 @@ where
break;
}
if let Err(e) = msg_info {
log::error!(
"Shutting down websocket connection: Failed to receive data: {}",
e
);
log::error!("Shutting down websocket connection: Failed to receive data: {e}");
break;
}

let msg: internal_messages::FromShardAggregator =
match bincode::options().deserialize(&bytes) {
Ok(msg) => msg,
Err(e) => {
log::error!(
"Failed to deserialize message from shard; booting it: {}",
e
);
log::error!("Failed to deserialize message from shard; booting it: {e}");
break;
}
};
Expand All @@ -292,7 +286,7 @@ where
};

if let Err(e) = tx_to_aggregator.send(aggregator_msg).await {
log::error!("Failed to send message to aggregator; closing shard: {}", e);
log::error!("Failed to send message to aggregator; closing shard: {e}");
break;
}
}
Expand Down Expand Up @@ -325,13 +319,10 @@ where
.expect("message to shard should serialize");

if let Err(e) = ws_send.send_binary(bytes).await {
log::error!("Failed to send message to aggregator; closing shard: {}", e)
log::error!("Failed to send message to aggregator; closing shard: {e}")
}
if let Err(e) = ws_send.flush().await {
log::error!(
"Failed to flush message to aggregator; closing shard: {}",
e
)
log::error!("Failed to flush message to aggregator; closing shard: {e}")
}
}

Expand Down Expand Up @@ -374,7 +365,7 @@ where
channel: tx_to_feed_conn,
};
if let Err(e) = tx_to_aggregator.send(init_msg).await {
log::error!("Error sending message to aggregator: {}", e);
log::error!("Error sending message to aggregator: {e}");
return (tx_to_aggregator, ws_send);
}

Expand All @@ -399,10 +390,7 @@ where
break;
}
if let Err(e) = msg_info {
log::error!(
"Shutting down websocket connection: Failed to receive data: {}",
e
);
log::error!("Shutting down websocket connection: Failed to receive data: {e}");
break;
}

Expand All @@ -416,16 +404,12 @@ where
let cmd = match FromFeedWebsocket::from_str(&text) {
Ok(cmd) => cmd,
Err(e) => {
log::warn!(
"Ignoring invalid command '{}' from the frontend: {}",
text,
e
);
log::warn!("Ignoring invalid command '{text}' from the frontend: {e}");
continue;
}
};
if let Err(e) = tx_to_aggregator.send(cmd).await {
log::error!("Failed to send message to aggregator; closing feed: {}", e);
log::error!("Failed to send message to aggregator; closing feed: {e}");
break;
}
}
Expand Down
22 changes: 16 additions & 6 deletions backend/telemetry_shard/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,9 +261,14 @@ impl Aggregator {
// Remove references to this single node:
to_local_id.remove_by_id(local_id);
muted.remove(&local_id);
let _ = tx_to_telemetry_core
.send_async(FromShardAggregator::RemoveNode { local_id })
.await;

// If we're not connected to the core, don't buffer up remove messages. The core will remove
// all nodes associated with this shard anyway, so the remove message would be redundant.
if connected_to_telemetry_core {
let _ = tx_to_telemetry_core
.send_async(FromShardAggregator::RemoveNode { local_id })
.await;
}
}
ToAggregator::FromWebsocket(disconnected_conn_id, FromWebsocket::Disconnected) => {
// Find all of the local IDs corresponding to the disconnected connection ID and
Expand All @@ -280,9 +285,14 @@ impl Aggregator {
for local_id in local_ids_disconnected {
to_local_id.remove_by_id(local_id);
muted.remove(&local_id);
let _ = tx_to_telemetry_core
.send_async(FromShardAggregator::RemoveNode { local_id })
.await;

// If we're not connected to the core, don't buffer up remove messages. The core will remove
// all nodes associated with this shard anyway, so the remove message would be redundant.
if connected_to_telemetry_core {
let _ = tx_to_telemetry_core
.send_async(FromShardAggregator::RemoveNode { local_id })
.await;
}
}
}
ToAggregator::FromTelemetryCore(FromTelemetryCore::Mute {
Expand Down
23 changes: 13 additions & 10 deletions backend/telemetry_shard/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ where
close_connection: close_connection_tx.clone(),
};
if let Err(e) = tx_to_aggregator.send(init_msg).await {
log::error!("Error sending message to aggregator: {}", e);
log::error!("Shutting down websocket connection from {real_addr:?}: Error sending message to aggregator: {e}");
return (tx_to_aggregator, ws_send);
}

Expand All @@ -254,7 +254,7 @@ where
// The close channel has fired, so end the loop. `ws_recv.receive_data` is
// *not* cancel safe, but since we're closing the connection we don't care.
_ = close_connection_rx.recv_async() => {
log::info!("connection to {:?} being closed", real_addr);
log::info!("connection to {real_addr:?} being closed");
break
},
// Receive data and relay it on to our main select loop below.
Expand All @@ -263,7 +263,7 @@ where
break;
}
if let Err(e) = msg_info {
log::error!("Shutting down websocket connection: Failed to receive data: {}", e);
log::error!("Shutting down websocket connection from {real_addr:?}: Failed to receive data: {e}");
break;
}
if ws_tx_atomic.unbounded_send(bytes).is_err() {
Expand Down Expand Up @@ -292,14 +292,14 @@ where
.collect();

for &message_id in &stale_ids {
log::info!("Removing stale node with message ID {} from {:?}", message_id, real_addr);
log::info!("Removing stale node with message ID {message_id} from {real_addr:?}");
allowed_message_ids.remove(&message_id);
let _ = tx_to_aggregator.send(FromWebsocket::Remove { message_id } ).await;
}

if !stale_ids.is_empty() && allowed_message_ids.is_empty() {
// End the entire connection if no recent messages came in for any ID.
log::info!("Closing stale connection from {:?}", real_addr);
log::info!("Closing stale connection from {real_addr:?}");
break;
}
},
Expand All @@ -316,7 +316,7 @@ where
let this_bytes_per_second = rolling_total_bytes.total() / 10;
if this_bytes_per_second > bytes_per_second {
block_list.block_addr(real_addr, "Too much traffic");
log::error!("Shutting down websocket connection: Too much traffic ({}bps averaged over last 10s)", this_bytes_per_second);
log::error!("Shutting down websocket connection: Too much traffic ({this_bytes_per_second}bps averaged over last 10s)");
break;
}

Expand All @@ -327,7 +327,7 @@ where
Err(e) => {
let bytes: &[u8] = bytes.get(..512).unwrap_or_else(|| &bytes);
let msg_start = std::str::from_utf8(bytes).unwrap_or_else(|_| "INVALID UTF8");
log::warn!("Failed to parse node message ({}): {}", msg_start, e);
log::warn!("Failed to parse node message ({msg_start}): {e}");
continue;
},
#[cfg(not(debug))]
Expand All @@ -347,15 +347,15 @@ where
if let node_message::Payload::SystemConnected(info) = payload {
// Too many nodes seen on this connection? Ignore this one.
if allowed_message_ids.len() >= max_nodes_per_connection {
log::info!("Ignoring new node from {:?} (we've hit the max of {} nodes per connection)", real_addr, max_nodes_per_connection);
log::info!("Ignoring new node with ID {message_id} from {real_addr:?} (we've hit the max of {max_nodes_per_connection} nodes per connection)");
continue;
}

// Note of the message ID, allowing telemetry for it.
allowed_message_ids.insert(message_id, Instant::now());

// Tell the aggregator loop about the new node.
log::info!("Adding node with message ID {} from {:?}", message_id, real_addr);
log::info!("Adding node with message ID {message_id} from {real_addr:?}");
let _ = tx_to_aggregator.send(FromWebsocket::Add {
message_id,
ip: real_addr,
Expand All @@ -369,9 +369,12 @@ where
if let Some(last_seen) = allowed_message_ids.get_mut(&message_id) {
*last_seen = Instant::now();
if let Err(e) = tx_to_aggregator.send(FromWebsocket::Update { message_id, payload } ).await {
log::error!("Failed to send node message to aggregator: {}", e);
log::error!("Failed to send node message to aggregator: {e}");
continue;
}
} else {
log::info!("Ignoring message with ID {message_id} from {real_addr:?} (we've hit the max of {max_nodes_per_connection} nodes per connection)");
continue;
}
}
}
Expand Down