Skip to content

Commit

Permalink
Use aHash on the server side (#1495)
Browse files Browse the repository at this point in the history
Instead of HashMap and HashSet, use more performant AHashMap and AHashSet.
  • Loading branch information
spetz authored Feb 8, 2025
1 parent 0e14df4 commit 08f4889
Show file tree
Hide file tree
Showing 16 changed files with 79 additions and 77 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions integration/tests/streaming/stream.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::streaming::common::test_setup::TestSetup;
use crate::streaming::create_messages;
use ahash::AHashMap;
use iggy::identifier::Identifier;
use iggy::messages::poll_messages::PollingStrategy;
use iggy::messages::send_messages::Partitioning;
Expand All @@ -11,7 +12,6 @@ use iggy::utils::topic_size::MaxTopicSize;
use server::state::system::StreamState;
use server::streaming::polling_consumer::PollingConsumer;
use server::streaming::streams::stream::Stream;
use std::collections::HashMap;
use tokio::fs;

#[tokio::test]
Expand Down Expand Up @@ -60,7 +60,7 @@ async fn should_load_existing_stream_from_disk() {
id: stream_id,
name: name.clone(),
created_at: IggyTimestamp::now(),
topics: HashMap::new(),
topics: AHashMap::new(),
current_topic_id: 0,
};
loaded_stream.load(state).await.unwrap();
Expand Down
11 changes: 5 additions & 6 deletions integration/tests/streaming/topic.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
use std::collections::HashMap;
use std::default::Default;
use std::sync::atomic::{AtomicU32, AtomicU64};
use std::sync::Arc;

use crate::streaming::common::test_setup::TestSetup;
use crate::streaming::create_messages;
use ahash::AHashMap;
use iggy::compression::compression_algorithm::CompressionAlgorithm;
use iggy::messages::poll_messages::PollingStrategy;
use iggy::messages::send_messages::Partitioning;
Expand All @@ -16,6 +12,9 @@ use iggy::utils::topic_size::MaxTopicSize;
use server::state::system::{PartitionState, TopicState};
use server::streaming::polling_consumer::PollingConsumer;
use server::streaming::topics::topic::Topic;
use std::default::Default;
use std::sync::atomic::{AtomicU32, AtomicU64};
use std::sync::Arc;
use tokio::fs;

#[tokio::test]
Expand Down Expand Up @@ -106,7 +105,7 @@ async fn should_load_existing_topic_from_disk() {
id: topic_id,
name,
partitions: if partitions_count == 0 {
HashMap::new()
AHashMap::new()
} else {
(1..=partitions_count)
.map(|id| (id, PartitionState { id, created_at }))
Expand Down
2 changes: 1 addition & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "server"
version = "0.4.151"
version = "0.4.152"
edition = "2021"
build = "src/build.rs"
license = "Apache-2.0"
Expand Down
8 changes: 4 additions & 4 deletions server/src/http/jwt/jwt_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::http::jwt::json_web_token::{GeneratedToken, JwtClaims, RevokedAccessT
use crate::http::jwt::storage::TokenStorage;
use crate::http::jwt::COMPONENT;
use crate::streaming::persistence::persister::PersisterKind;
use ahash::AHashMap;
use error_set::ErrContext;
use iggy::error::IggyError;
use iggy::locking::IggySharedMut;
Expand All @@ -12,7 +13,6 @@ use iggy::utils::duration::IggyDuration;
use iggy::utils::expiry::IggyExpiry;
use iggy::utils::timestamp::IggyTimestamp;
use jsonwebtoken::{encode, Algorithm, DecodingKey, EncodingKey, Header, TokenData, Validation};
use std::collections::HashMap;
use std::sync::Arc;
use tracing::{debug, error, info};

Expand All @@ -36,8 +36,8 @@ pub struct JwtManager {
issuer: IssuerOptions,
validator: ValidatorOptions,
tokens_storage: TokenStorage,
revoked_tokens: IggySharedMut<HashMap<String, u64>>,
validations: HashMap<Algorithm, Validation>,
revoked_tokens: IggySharedMut<AHashMap<String, u64>>,
validations: AHashMap<Algorithm, Validation>,
}

impl JwtManager {
Expand All @@ -59,7 +59,7 @@ impl JwtManager {
issuer,
validator,
tokens_storage: TokenStorage::new(persister, path),
revoked_tokens: IggySharedMut::new(HashMap::new()),
revoked_tokens: IggySharedMut::new(AHashMap::new()),
})
}

Expand Down
8 changes: 4 additions & 4 deletions server/src/http/jwt/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ use crate::streaming::utils::file;
use crate::{
http::jwt::json_web_token::RevokedAccessToken, streaming::persistence::persister::PersisterKind,
};
use ahash::AHashMap;
use anyhow::Context;
use bytes::{BufMut, BytesMut};
use error_set::ErrContext;
use iggy::error::IggyError;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::io::AsyncReadExt;
use tracing::{error, info};
Expand Down Expand Up @@ -63,7 +63,7 @@ impl TokenStorage {
})
.map_err(|_| IggyError::CannotReadFile)?;

let tokens: HashMap<String, u64> = bincode::deserialize(&buffer)
let tokens: AHashMap<String, u64> = bincode::deserialize(&buffer)
.with_context(|| "Failed to deserialize revoked access tokens")
.map_err(|_| IggyError::CannotDeserializeResource)?;

Expand All @@ -84,7 +84,7 @@ impl TokenStorage {
let mut map = tokens
.into_iter()
.map(|token| (token.id, token.expiry))
.collect::<HashMap<_, _>>();
.collect::<AHashMap<_, _>>();
map.insert(token.id.to_owned(), token.expiry);
let bytes = bincode::serialize(&map)
.with_context(|| "Failed to serialize revoked access tokens")
Expand Down Expand Up @@ -115,7 +115,7 @@ impl TokenStorage {
let mut map = tokens
.into_iter()
.map(|token| (token.id, token.expiry))
.collect::<HashMap<_, _>>();
.collect::<AHashMap<_, _>>();
for id in id {
map.remove(id);
}
Expand Down
39 changes: 21 additions & 18 deletions server/src/state/system.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::state::{EntryCommand, StateEntry, COMPONENT};
use crate::streaming::personal_access_tokens::personal_access_token::PersonalAccessToken;
use ahash::AHashMap;
use error_set::ErrContext;
use iggy::compression::compression_algorithm::CompressionAlgorithm;
use iggy::error::IggyError;
Expand All @@ -9,31 +10,30 @@ use iggy::models::user_status::UserStatus;
use iggy::utils::expiry::IggyExpiry;
use iggy::utils::timestamp::IggyTimestamp;
use iggy::utils::topic_size::MaxTopicSize;
use std::collections::HashMap;
use std::fmt::Display;
use tracing::debug;

#[derive(Debug)]
pub struct SystemState {
pub streams: HashMap<u32, StreamState>,
pub users: HashMap<u32, UserState>,
pub streams: AHashMap<u32, StreamState>,
pub users: AHashMap<u32, UserState>,
}

#[derive(Debug)]
pub struct StreamState {
pub id: u32,
pub name: String,
pub created_at: IggyTimestamp,
pub topics: HashMap<u32, TopicState>,
pub topics: AHashMap<u32, TopicState>,
pub current_topic_id: u32,
}

#[derive(Debug)]
pub struct TopicState {
pub id: u32,
pub name: String,
pub partitions: HashMap<u32, PartitionState>,
pub consumer_groups: HashMap<u32, ConsumerGroupState>,
pub partitions: AHashMap<u32, PartitionState>,
pub consumer_groups: AHashMap<u32, ConsumerGroupState>,
pub compression_algorithm: CompressionAlgorithm,
pub message_expiry: IggyExpiry,
pub max_topic_size: MaxTopicSize,
Expand Down Expand Up @@ -62,7 +62,7 @@ pub struct UserState {
pub password_hash: String,
pub status: UserStatus,
pub permissions: Option<Permissions>,
pub personal_access_tokens: HashMap<String, PersonalAccessTokenState>,
pub personal_access_tokens: AHashMap<String, PersonalAccessTokenState>,
}

#[derive(Debug)]
Expand All @@ -73,8 +73,8 @@ pub struct ConsumerGroupState {

impl SystemState {
pub async fn init(entries: Vec<StateEntry>) -> Result<Self, IggyError> {
let mut streams = HashMap::new();
let mut users = HashMap::new();
let mut streams = AHashMap::new();
let mut users = AHashMap::new();
let mut current_stream_id = 0;
let mut current_user_id = 0;
for entry in entries {
Expand All @@ -90,7 +90,7 @@ impl SystemState {
let stream = StreamState {
id: stream_id,
name: command.name.clone(),
topics: HashMap::new(),
topics: AHashMap::new(),
current_topic_id: 0,
created_at: entry.timestamp,
};
Expand Down Expand Up @@ -126,15 +126,15 @@ impl SystemState {
let topic = TopicState {
id: topic_id,
name: command.name,
consumer_groups: HashMap::new(),
consumer_groups: AHashMap::new(),
current_consumer_group_id: 0,
compression_algorithm: command.compression_algorithm,
message_expiry: command.message_expiry,
max_topic_size: command.max_topic_size,
replication_factor: command.replication_factor,
created_at: entry.timestamp,
partitions: if command.partitions_count > 0 {
let mut partitions = HashMap::new();
let mut partitions = AHashMap::new();
for i in 1..=command.partitions_count {
partitions.insert(
i,
Expand All @@ -146,7 +146,7 @@ impl SystemState {
}
partitions
} else {
HashMap::new()
AHashMap::new()
},
};
stream.topics.insert(topic.id, topic);
Expand Down Expand Up @@ -285,7 +285,7 @@ impl SystemState {
password_hash: command.password, // This is already hashed
status: command.status,
permissions: command.permissions,
personal_access_tokens: HashMap::new(),
personal_access_tokens: AHashMap::new(),
};
users.insert(user.id, user);
}
Expand Down Expand Up @@ -379,7 +379,7 @@ impl SystemState {
}
}

fn find_stream_id(streams: &HashMap<u32, StreamState>, stream_id: &Identifier) -> u32 {
fn find_stream_id(streams: &AHashMap<u32, StreamState>, stream_id: &Identifier) -> u32 {
match stream_id.kind {
IdKind::Numeric => stream_id
.get_u32_value()
Expand All @@ -397,7 +397,7 @@ fn find_stream_id(streams: &HashMap<u32, StreamState>, stream_id: &Identifier) -
}
}

fn find_topic_id(topics: &HashMap<u32, TopicState>, topic_id: &Identifier) -> u32 {
fn find_topic_id(topics: &AHashMap<u32, TopicState>, topic_id: &Identifier) -> u32 {
match topic_id.kind {
IdKind::Numeric => topic_id
.get_u32_value()
Expand All @@ -415,7 +415,10 @@ fn find_topic_id(topics: &HashMap<u32, TopicState>, topic_id: &Identifier) -> u3
}
}

fn find_consumer_group_id(groups: &HashMap<u32, ConsumerGroupState>, group_id: &Identifier) -> u32 {
fn find_consumer_group_id(
groups: &AHashMap<u32, ConsumerGroupState>,
group_id: &Identifier,
) -> u32 {
match group_id.kind {
IdKind::Numeric => group_id
.get_u32_value()
Expand All @@ -433,7 +436,7 @@ fn find_consumer_group_id(groups: &HashMap<u32, ConsumerGroupState>, group_id: &
}
}

fn find_user_id(users: &HashMap<u32, UserState>, user_id: &Identifier) -> u32 {
fn find_user_id(users: &AHashMap<u32, UserState>, user_id: &Identifier) -> u32 {
match user_id.kind {
IdKind::Numeric => user_id
.get_u32_value()
Expand Down
4 changes: 2 additions & 2 deletions server/src/streaming/clients/client_manager.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
use crate::streaming::session::Session;
use crate::streaming::utils::hash;
use ahash::AHashMap;
use iggy::error::IggyError;
use iggy::locking::IggySharedMut;
use iggy::locking::IggySharedMutFn;
use iggy::models::user_info::UserId;
use iggy::utils::timestamp::IggyTimestamp;
use std::collections::HashMap;
use std::fmt::{Display, Formatter};
use std::net::SocketAddr;
use std::sync::Arc;

#[derive(Debug, Default)]
pub struct ClientManager {
clients: HashMap<u32, IggySharedMut<Client>>,
clients: AHashMap<u32, IggySharedMut<Client>>,
}

#[derive(Debug)]
Expand Down
8 changes: 4 additions & 4 deletions server/src/streaming/streams/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ use crate::streaming::storage::StreamStorage;
use crate::streaming::streams::stream::Stream;
use crate::streaming::streams::COMPONENT;
use crate::streaming::topics::topic::Topic;
use ahash::AHashSet;
use error_set::ErrContext;
use futures::future::join_all;
use iggy::error::IggyError;
use iggy::utils::timestamp::IggyTimestamp;
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::path::Path;
use std::sync::Arc;
use tokio::fs;
Expand Down Expand Up @@ -75,15 +75,15 @@ impl StreamStorage for FileStreamStorage {
unloaded_topics.push(topic);
}

let state_topic_ids = state.topics.keys().copied().collect::<HashSet<u32>>();
let state_topic_ids = state.topics.keys().copied().collect::<AHashSet<u32>>();
let unloaded_topic_ids = unloaded_topics
.iter()
.map(|topic| topic.topic_id)
.collect::<HashSet<u32>>();
.collect::<AHashSet<u32>>();
let missing_ids = state_topic_ids
.difference(&unloaded_topic_ids)
.copied()
.collect::<HashSet<u32>>();
.collect::<AHashSet<u32>>();
if missing_ids.is_empty() {
info!(
"All topics for stream with ID: '{}' found on disk were found in state.",
Expand Down
10 changes: 5 additions & 5 deletions server/src/streaming/streams/stream.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::configs::system::SystemConfig;
use crate::streaming::storage::SystemStorage;
use crate::streaming::topics::topic::Topic;
use ahash::AHashMap;
use iggy::utils::byte_size::IggyByteSize;
use iggy::utils::timestamp::IggyTimestamp;
use std::collections::HashMap;
use std::fmt::Display;
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use std::sync::Arc;
Expand All @@ -19,8 +19,8 @@ pub struct Stream {
pub size_bytes: Arc<AtomicU64>,
pub messages_count: Arc<AtomicU64>,
pub segments_count: Arc<AtomicU32>,
pub(crate) topics: HashMap<u32, Topic>,
pub(crate) topics_ids: HashMap<String, u32>,
pub(crate) topics: AHashMap<u32, Topic>,
pub(crate) topics_ids: AHashMap<String, u32>,
pub(crate) config: Arc<SystemConfig>,
pub(crate) storage: Arc<SystemStorage>,
}
Expand Down Expand Up @@ -54,8 +54,8 @@ impl Stream {
size_bytes: Arc::new(AtomicU64::new(0)),
messages_count: Arc::new(AtomicU64::new(0)),
segments_count: Arc::new(AtomicU32::new(0)),
topics: HashMap::new(),
topics_ids: HashMap::new(),
topics: AHashMap::new(),
topics_ids: AHashMap::new(),
storage,
created_at: IggyTimestamp::now(),
}
Expand Down
Loading

0 comments on commit 08f4889

Please sign in to comment.