Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: race conditions in simultaneous connect, NAT ID, Hole Punching, NetMutex. Fix FileKey Collisions #221

Merged
merged 23 commits into from
Nov 24, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
a3623be
fix: race condition in simultaneous connect, NAT ID, FileKey Collisions
tbraun96 Nov 18, 2024
4ec4ab0
chore: lints
tbraun96 Nov 18, 2024
3fd1a91
fix: UDP binding
tbraun96 Nov 19, 2024
fa13c14
fix: revfs tests now passing. Hanging bug fixed
tbraun96 Nov 21, 2024
2356cb7
fix: use 0.0.0.0
tbraun96 Nov 21, 2024
0e25a0f
fix: udp tests, add second UDP socket if ipv6 enabled
tbraun96 Nov 21, 2024
bf7b21e
fix: better NAT traversal
tbraun96 Nov 22, 2024
492134e
chore: upgrade bincode
tbraun96 Nov 22, 2024
915c1c1
fix: add socket changes for windows
tbraun96 Nov 22, 2024
83384b5
fix: add NetworkUnreachable as error type
tbraun96 Nov 22, 2024
9ba0829
fix: keep track of unreachable addrs
tbraun96 Nov 23, 2024
d4b8440
fix: add more more resiliency to rebuild process
tbraun96 Nov 23, 2024
17ea2ec
fix: add retry mechanism to hole puncher
tbraun96 Nov 23, 2024
7b0cbc5
fix: make network more permissive
tbraun96 Nov 23, 2024
0b4e8d5
fix: synchronization bugs in NAT and NetMutex
tbraun96 Nov 23, 2024
1267bff
fix: do not bind to 0.0.0.0 on windows for internal service tests
tbraun96 Nov 23, 2024
28c33d1
fix: ensure socket options set before std conversion
tbraun96 Nov 23, 2024
3cbaef8
refactor: update nextest config timeout policy, add mt tests to windo…
tbraun96 Nov 23, 2024
e57488b
fix(udp_traversal): prevent hanging by notifying the other side of wi…
tbraun96 Nov 24, 2024
b517f8b
refactor: simplify udp dualstack driver
tbraun96 Nov 24, 2024
bbdd570
fix: try select_ok
tbraun96 Nov 24, 2024
8eed486
fix: add more time for hole punching, less time for NAT ID, spawn hol…
tbraun96 Nov 24, 2024
d1093f4
fix: timeout bug /facepalm
tbraun96 Nov 24, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ async-trait-with-sync = { default-features = false, version = "0.1.36" }
uuid = { version = "1.2.2", default-features = false }
tracing = { version = "0.1.37", default-features = false }
lazy_static = { default-features = false, version = "1.4.0" }
socket2 = { version = "0.5.1", default-features = false }
socket2 = { version = "0.5.7", default-features = false }
rustls-native-certs = { version = "0.6.2", default-features = false }
igd = { version = "^0.12.0", default-features = false }
quinn = { version = "0.10.2", default-features = false }
Expand Down
12 changes: 1 addition & 11 deletions async_ip/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,17 +121,7 @@ pub async fn get_ip_from(client: Option<Client>, addr: &str) -> Result<IpAddr, I
.text()
.await
.map_err(|err| IpRetrieveError::Error(err.to_string()))?;
IpAddr::from_str(text.as_str())
.map_err(|err| IpRetrieveError::Error(err.to_string()))
.and_then(|res| {
if res.is_ipv4() {
Err(IpRetrieveError::Error(
"This node does not have an ipv6 addr".to_string(),
))
} else {
Ok(res)
}
})
IpAddr::from_str(text.as_str()).map_err(|err| IpRetrieveError::Error(err.to_string()))
}

/// Gets the internal IP address using DNS
Expand Down
25 changes: 21 additions & 4 deletions citadel_crypt/src/endpoint_crypto_container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::toolset::{Toolset, UpdateStatus};
use citadel_pqcrypto::constructor_opts::ConstructorOpts;
use citadel_types::crypto::CryptoParameters;
use citadel_types::crypto::SecurityLevel;
use citadel_types::prelude::ObjectId;
use serde::{Deserialize, Serialize};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
Expand All @@ -21,7 +22,7 @@ pub struct PeerSessionCrypto<R: Ratchet = StackedRatchet> {
pub update_in_progress: Arc<AtomicBool>,
// if local is initiator, then in the case both nodes send a FastMessage at the same time (causing an update to the keys), the initiator takes preference, and the non-initiator's upgrade attempt gets dropped (if update_in_progress)
pub local_is_initiator: bool,
pub rolling_object_id: u64,
pub rolling_object_id: ObjectId,
pub rolling_group_id: u64,
pub lock_set_by_alice: Option<bool>,
/// Alice sends to Bob, then bob updates internally the toolset. However. Bob can't send packets to Alice quite yet using that newest version. He must first wait from Alice to commit on her end and wait for an ACK.
Expand Down Expand Up @@ -200,9 +201,25 @@ impl<R: Ratchet> PeerSessionCrypto<R> {
self.rolling_group_id.wrapping_sub(1)
}

pub fn get_and_increment_object_id(&mut self) -> u64 {
self.rolling_object_id = self.rolling_object_id.wrapping_add(1);
self.rolling_object_id.wrapping_sub(1)
pub fn get_and_increment_object_id(&mut self) -> i64 {
let next = if self.local_is_initiator {
let mut next_val = self.rolling_object_id.wrapping_add(1);
if next_val <= 0 {
next_val = 1
}

next_val
} else {
let mut next_val = self.rolling_object_id.wrapping_sub(1);
if next_val >= 0 {
next_val = -1
}

next_val
};

self.rolling_object_id = next;
next
}
tbraun96 marked this conversation as resolved.
Show resolved Hide resolved

/// Returns a new constructor only if a concurrent update isn't occurring
Expand Down
19 changes: 10 additions & 9 deletions citadel_crypt/src/scramble/crypt_splitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::entropy_bank::EntropyBank;
use crate::packet_vector::{generate_packet_vector, PacketVector};
use crate::prelude::CryptError;
use crate::stacked_ratchet::Ratchet;
pub use citadel_types::prelude::ObjectId;
#[cfg(not(target_family = "wasm"))]
use rayon::prelude::*;

Expand Down Expand Up @@ -62,7 +63,7 @@ pub fn generate_scrambler_metadata<T: AsRef<[u8]>>(
header_size_bytes: usize,
security_level: SecurityLevel,
group_id: u64,
object_id: u64,
object_id: ObjectId,
enx: EncryptionAlgorithm,
sig_alg: SigAlgorithm,
transfer_type: &TransferType,
Expand Down Expand Up @@ -141,7 +142,7 @@ fn get_scramble_encrypt_config<'a, R: Ratchet>(
header_size_bytes: usize,
security_level: SecurityLevel,
group_id: u64,
object_id: u64,
object_id: ObjectId,
transfer_type: &TransferType,
empty_transfer: bool,
) -> Result<
Expand Down Expand Up @@ -190,13 +191,13 @@ pub fn par_scramble_encrypt_group<T: AsRef<[u8]>, R: Ratchet, F, const N: usize>
static_aux_ratchet: &R,
header_size_bytes: usize,
target_cid: u64,
object_id: u64,
object_id: ObjectId,
group_id: u64,
transfer_type: TransferType,
header_inscriber: F,
) -> Result<GroupSenderDevice<N>, CryptError<String>>
where
F: Fn(&PacketVector, &EntropyBank, u64, u64, &mut BytesMut) + Send + Sync,
F: Fn(&PacketVector, &EntropyBank, ObjectId, u64, &mut BytesMut) + Send + Sync,
{
let mut plain_text = Cow::Borrowed(plain_text.as_ref());

Expand Down Expand Up @@ -303,9 +304,9 @@ fn scramble_encrypt_wave(
msg_pqc: &PostQuantumContainer,
scramble_drill: &EntropyBank,
target_cid: u64,
object_id: u64,
object_id: ObjectId,
header_size_bytes: usize,
header_inscriber: impl Fn(&PacketVector, &EntropyBank, u64, u64, &mut BytesMut) + Send + Sync,
header_inscriber: impl Fn(&PacketVector, &EntropyBank, ObjectId, u64, &mut BytesMut) + Send + Sync,
) -> Vec<(usize, PacketCoordinate)> {
let ciphertext = msg_drill
.encrypt(msg_pqc, bytes_to_encrypt_for_this_wave)
Expand Down Expand Up @@ -336,7 +337,7 @@ pub fn oneshot_unencrypted_group_unified<const N: usize>(
plain_text: SecureMessagePacket<N>,
header_size_bytes: usize,
group_id: u64,
object_id: u64,
object_id: ObjectId,
empty_transfer: bool,
) -> Result<GroupSenderDevice<N>, CryptError<String>> {
let len = plain_text.message_len() as u64;
Expand Down Expand Up @@ -435,7 +436,7 @@ pub struct GroupReceiverConfig {
// this is NOT inscribed; only for transmission
pub header_size_bytes: u64,
pub group_id: u64,
pub object_id: u64,
pub object_id: ObjectId,
// only relevant for files. Note: if transfer type is RemoteVirtualFileystem, then,
// the receiving endpoint won't decrypt the first level of encryption since the goal
// is to keep the file remotely encrypted
Expand All @@ -450,7 +451,7 @@ impl GroupReceiverConfig {
#[allow(clippy::too_many_arguments)]
pub fn new_refresh(
group_id: u64,
object_id: u64,
object_id: ObjectId,
header_size_bytes: u64,
plaintext_length: u64,
max_packet_payload_size: u32,
Expand Down
12 changes: 8 additions & 4 deletions citadel_crypt/src/streaming_crypt_scrambler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::stacked_ratchet::StackedRatchet;
use citadel_io::Mutex;
use citadel_io::{BlockingSpawn, BlockingSpawnError};
use citadel_types::crypto::SecurityLevel;
use citadel_types::prelude::ObjectId;
use citadel_types::proto::TransferType;
use futures::Future;
use num_integer::Integer;
Expand All @@ -41,11 +42,14 @@ impl FixedSizedSource for std::fs::File {

/// Generic function for inscribing headers on packets
pub trait HeaderInscriberFn:
for<'a> Fn(&'a PacketVector, &'a EntropyBank, u64, u64, &'a mut BytesMut) + Send + Sync + 'static
for<'a> Fn(&'a PacketVector, &'a EntropyBank, ObjectId, u64, &'a mut BytesMut)
+ Send
+ Sync
+ 'static
{
}
impl<
T: for<'a> Fn(&'a PacketVector, &'a EntropyBank, u64, u64, &'a mut BytesMut)
T: for<'a> Fn(&'a PacketVector, &'a EntropyBank, ObjectId, u64, &'a mut BytesMut)
+ Send
+ Sync
+ 'static,
Expand Down Expand Up @@ -162,7 +166,7 @@ impl<T: Into<Vec<u8>>> From<T> for BytesSource {
pub fn scramble_encrypt_source<S: ObjectSource, F: HeaderInscriberFn, const N: usize>(
mut source: S,
max_group_size: Option<usize>,
object_id: u64,
object_id: ObjectId,
group_sender: GroupChanneler<Result<GroupSenderDevice<N>, CryptError>>,
stop: Receiver<()>,
security_level: SecurityLevel,
Expand Down Expand Up @@ -266,7 +270,7 @@ struct AsyncCryptScrambler<F: HeaderInscriberFn, R: Read, const N: usize> {
transfer_type: TransferType,
file_len: usize,
read_cursor: usize,
object_id: u64,
object_id: ObjectId,
header_size_bytes: usize,
target_cid: u64,
group_id: u64,
Expand Down
10 changes: 8 additions & 2 deletions citadel_crypt/tests/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ mod tests {
AlgorithmsExt, CryptoParameters, EncryptionAlgorithm, KemAlgorithm, SecBuffer,
SigAlgorithm, KEM_ALGORITHM_COUNT,
};
use citadel_types::proto::TransferType;
use citadel_types::proto::{ObjectId, TransferType};
use rstest::rstest;
#[cfg(not(target_family = "wasm"))]
use std::path::PathBuf;
Expand Down Expand Up @@ -765,7 +765,13 @@ mod tests {
}

const HEADER_LEN: usize = 52;
fn header_inscribe(_: &PacketVector, _: &EntropyBank, _: u64, _: u64, packet: &mut BytesMut) {
fn header_inscribe(
_: &PacketVector,
_: &EntropyBank,
_: ObjectId,
_: u64,
packet: &mut BytesMut,
) {
for x in 0..HEADER_LEN {
packet.put_u8((x % 255) as u8)
}
Expand Down
2 changes: 1 addition & 1 deletion citadel_logging/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use tracing_subscriber::EnvFilter;
/// Sets up the logging for any crate
pub fn setup_log() {
std::panic::set_hook(Box::new(|info| {
error!(target: "citadel", "Panic occurred: {:#?}", info);
error!(target: "citadel", "Panic occurred: {}", info);
std::process::exit(1);
}));

Expand Down
38 changes: 24 additions & 14 deletions citadel_proto/src/proto/packet_crafter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::proto::state_container::VirtualTargetType;
use citadel_crypt::scramble::crypt_splitter::oneshot_unencrypted_group_unified;
use citadel_crypt::secure_buffer::sec_packet::SecureMessagePacket;
use citadel_crypt::stacked_ratchet::{Ratchet, StackedRatchet};
use citadel_types::prelude::ObjectId;

#[derive(Debug)]
/// A thin wrapper used for convenient creation of zero-copy outgoing buffers
Expand Down Expand Up @@ -63,7 +64,7 @@ pub struct GroupTransmitter {
/// Contained within Self::group_transmitter, but is here for convenience
group_config: GroupReceiverConfig,
/// The ID of the object that is being transmitted
pub object_id: u64,
pub object_id: ObjectId,
pub group_id: u64,
/// For interfacing with the higher-level kernel
ticket: Ticket,
Expand Down Expand Up @@ -98,7 +99,7 @@ impl GroupTransmitter {
to_primary_stream: OutboundPrimaryStreamSender,
group_sender: GroupSenderDevice<HDP_HEADER_BYTE_LEN>,
hyper_ratchet: RatchetPacketCrafterContainer,
object_id: u64,
object_id: ObjectId,
ticket: Ticket,
security_level: SecurityLevel,
time_tracker: TimeTracker,
Expand Down Expand Up @@ -126,7 +127,7 @@ impl GroupTransmitter {
#[allow(clippy::too_many_arguments)]
pub fn new_message(
to_primary_stream: OutboundPrimaryStreamSender,
object_id: u64,
object_id: ObjectId,
hyper_ratchet: RatchetPacketCrafterContainer,
input_packet: SecureProtocolPacket,
security_level: SecurityLevel,
Expand Down Expand Up @@ -241,6 +242,7 @@ pub(crate) mod group {
use crate::proto::validation::group::{GroupHeader, GroupHeaderAck, WaveAck};
use citadel_crypt::endpoint_crypto_container::KemTransferStatus;
use citadel_crypt::stacked_ratchet::StackedRatchet;
use citadel_types::proto::ObjectId;
use citadel_user::serialization::SyncIO;
use std::ops::RangeInclusive;

Expand Down Expand Up @@ -296,7 +298,7 @@ pub(crate) mod group {
packet
};

packet.put_u64(processor.object_id);
packet.put_i64(processor.object_id);

processor
.hyper_ratchet_container
Expand All @@ -320,7 +322,7 @@ pub(crate) mod group {
hyper_ratchet: &StackedRatchet,
group_id: u64,
target_cid: u64,
object_id: u64,
object_id: ObjectId,
ticket: Ticket,
initial_wave_window: Option<RangeInclusive<u32>>,
fast_msg: bool,
Expand Down Expand Up @@ -362,12 +364,20 @@ pub(crate) mod group {
packet
}

pub(crate) fn pack_i64_to_u128(value: i64) -> u128 {
value as u128 & 0xFFFFFFFFFFFFFFFF
}

pub(crate) fn unpack_i64_from_u128(packed: u128) -> i64 {
(packed & 0xFFFFFFFFFFFFFFFF) as i64
}

/// This is called by the scrambler. NOTE: the scramble_drill MUST have the same drill/cid as the message_drill, otherwise
/// packets will not be rendered on the otherside
pub(crate) fn craft_wave_payload_packet_into(
coords: &PacketVector,
scramble_drill: &EntropyBank,
object_id: u64,
object_id: ObjectId,
target_cid: u64,
mut buffer: &mut BytesMut,
) {
Expand All @@ -377,7 +387,7 @@ pub(crate) mod group {
cmd_aux: packet_flags::cmd::aux::group::GROUP_PAYLOAD,
algorithm: 0,
security_level: 0, // Irrelevant; supplied by the wave header anyways
context_info: U128::new(object_id as _),
context_info: U128::new(pack_i64_to_u128(object_id)),
group: U64::new(coords.group_id),
wave_id: U32::new(coords.wave_id),
session_cid: U64::new(scramble_drill.get_cid()),
Expand All @@ -400,7 +410,7 @@ pub(crate) mod group {
#[allow(clippy::too_many_arguments)]
pub(crate) fn craft_wave_ack(
hyper_ratchet: &StackedRatchet,
object_id: u32,
object_id: ObjectId,
target_cid: u64,
group_id: u64,
wave_id: u32,
Expand All @@ -414,7 +424,7 @@ pub(crate) mod group {
cmd_aux: packet_flags::cmd::aux::group::WAVE_ACK,
algorithm: 0,
security_level: security_level.value(),
context_info: U128::new(object_id as _),
context_info: U128::new(pack_i64_to_u128(object_id)),
group: U64::new(group_id),
wave_id: U32::new(wave_id),
session_cid: U64::new(hyper_ratchet.get_cid()),
Expand Down Expand Up @@ -1598,7 +1608,7 @@ pub(crate) mod file {
use citadel_crypt::stacked_ratchet::StackedRatchet;
use citadel_types::crypto::SecurityLevel;
use citadel_types::prelude::TransferType;
use citadel_types::proto::VirtualObjectMetadata;
use citadel_types::proto::{ObjectId, VirtualObjectMetadata};
use citadel_user::serialization::SyncIO;
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
Expand All @@ -1607,7 +1617,7 @@ pub(crate) mod file {
#[derive(Serialize, Deserialize, Debug)]
pub struct FileTransferErrorPacket {
pub error_message: String,
pub object_id: u64,
pub object_id: ObjectId,
}

pub(crate) fn craft_file_error_packet(
Expand All @@ -1617,7 +1627,7 @@ pub(crate) mod file {
virtual_target: VirtualTargetType,
timestamp: i64,
error_message: String,
object_id: u64,
object_id: ObjectId,
) -> BytesMut {
let header = HdpHeader {
protocol_version: (*crate::constants::PROTOCOL_VERSION).into(),
Expand Down Expand Up @@ -1704,15 +1714,15 @@ pub(crate) mod file {
pub struct FileHeaderAckPacket {
pub success: bool,
pub virtual_target: VirtualTargetType,
pub object_id: u64,
pub object_id: ObjectId,
pub transfer_type: TransferType,
}

#[allow(clippy::too_many_arguments)]
pub(crate) fn craft_file_header_ack_packet(
hyper_ratchet: &StackedRatchet,
success: bool,
object_id: u64,
object_id: ObjectId,
target_cid: u64,
ticket: Ticket,
security_level: SecurityLevel,
Expand Down
Loading
Loading