Skip to content

Commit

Permalink
Introduce FixedConsecutivePartition partition table
Browse files Browse the repository at this point in the history
The FixedConsecutivePartition partition table replaces the FixedPartitionTable.
The latter partition table used a simple round robing partition key to partition
assignment whereas the FixedConsecutivePartition partition table properly maps
partition keys to their respective consecutive partition.

This fixes #524.
  • Loading branch information
tillrohrmann committed Jun 23, 2023
1 parent 461c026 commit 29ed806
Show file tree
Hide file tree
Showing 5 changed files with 190 additions and 128 deletions.
13 changes: 6 additions & 7 deletions src/worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@ extern crate core;

use crate::ingress_integration::{ExternalClientIngressRunner, IngressIntegrationError};
use crate::invoker_integration::EntryEnricher;
use crate::network_integration::FixedPartitionTable;
use crate::partition::storage::invoker::InvokerStorageReader;
use crate::range_partitioner::RangePartitioner;
use crate::partitioning_scheme::FixedConsecutivePartitions;
use crate::service_invocation_factory::DefaultServiceInvocationFactory;
use crate::services::Services;
use codederror::CodedError;
Expand Down Expand Up @@ -33,7 +32,7 @@ mod ingress_integration;
mod invoker_integration;
mod network_integration;
mod partition;
mod range_partitioner;
mod partitioning_scheme;
mod service_invocation_factory;
mod services;
mod util;
Expand Down Expand Up @@ -186,7 +185,7 @@ pub struct Worker {
InMemoryServiceEndpointRegistry,
>,
external_client_ingress_runner: ExternalClientIngressRunner,
services: Services,
services: Services<FixedConsecutivePartitions>,
}

impl Worker {
Expand Down Expand Up @@ -227,7 +226,7 @@ impl Worker {
channel_size,
);

let partition_table = FixedPartitionTable::new(num_partition_processors);
let partition_table = FixedConsecutivePartitions::new(num_partition_processors);

let network = network_integration::Network::new(
raft_in_tx,
Expand All @@ -254,9 +253,9 @@ impl Worker {
service_endpoint_registry,
);

let range_partitioner = RangePartitioner::new(num_partition_processors);
let partitioner = partition_table.partitioner();

let (command_senders, processors): (Vec<_>, Vec<_>) = range_partitioner
let (command_senders, processors): (Vec<_>, Vec<_>) = partitioner
.map(|(idx, partition_range)| {
let proposal_sender = consensus.create_proposal_sender();
let invoker_sender = invoker.create_sender();
Expand Down
26 changes: 2 additions & 24 deletions src/worker/src/network_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@
use crate::partition;
use crate::partition::shuffle;
use futures::future::{ok, Ready};
use restate_common::types::PartitionKey;
use restate_network::{PartitionTable, PartitionTableError};
use crate::partitioning_scheme::FixedConsecutivePartitions;

pub(super) type Network = restate_network::Network<
partition::AckCommand,
Expand All @@ -20,29 +18,9 @@ pub(super) type Network = restate_network::Network<
partition::AckResponse,
partition::ShuffleDeduplicationResponse,
partition::IngressAckResponse,
FixedPartitionTable,
FixedConsecutivePartitions,
>;

#[derive(Debug, Clone)]
pub(super) struct FixedPartitionTable {
number_partitions: u32,
}

impl FixedPartitionTable {
pub(super) fn new(number_partitions: u32) -> Self {
Self { number_partitions }
}
}

impl PartitionTable for FixedPartitionTable {
type Future = Ready<Result<u64, PartitionTableError>>;

fn partition_key_to_target_peer(&self, partition_key: PartitionKey) -> Self::Future {
let target_partition = partition_key % self.number_partitions;
ok(u64::from(target_partition))
}
}

mod ingress_integration {
use crate::partition;
use crate::partition::shuffle;
Expand Down
174 changes: 174 additions & 0 deletions src/worker/src/partitioning_scheme.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
use futures::future;
use restate_common::types::{PartitionId, PartitionKey, PeerId};
use restate_network::{PartitionTable, PartitionTableError};
use std::ops::RangeInclusive;

#[derive(Debug, Clone)]
pub(crate) struct FixedConsecutivePartitions {
num_partitions: u32,
}

impl FixedConsecutivePartitions {
const PARTITION_KEY_RANGE_END: u64 = 1 << 32;

pub(crate) fn new(num_partitions: u32) -> Self {
Self { num_partitions }
}

pub(crate) fn partitioner(&self) -> Partitioner {
Partitioner::new(self.num_partitions)
}

fn partition_key_to_partition_id(
num_partitions: u32,
partition_key: PartitionKey,
) -> PartitionId {
let num_partitions = u64::from(num_partitions);
let partition_key = u64::from(partition_key);

partition_key * num_partitions / Self::PARTITION_KEY_RANGE_END
}

fn partition_id_to_partition_range(
num_partitions: u32,
partition_id: PartitionId,
) -> RangeInclusive<PartitionKey> {
let num_partitions = u64::from(num_partitions);

assert!(
partition_id < num_partitions,
"There cannot be a partition id which is larger than the number of partitions \
'{num_partitions}', when using the fixed consecutive partitioning scheme."
);

// adding num_partitions - 1 to dividend is equivalent to applying ceil function to result
let start =
(partition_id * Self::PARTITION_KEY_RANGE_END + (num_partitions - 1)) / num_partitions;
let end = ((partition_id + 1) * Self::PARTITION_KEY_RANGE_END + (num_partitions - 1))
/ num_partitions
- 1;

let start = u32::try_from(start)
.expect("Resulting partition start '{start}' should be <= u32::MAX.");
let end =
u32::try_from(end).expect("Resulting partition end '{end}' should be <= u32::MAX.");

start..=end
}
}

impl PartitionTable for FixedConsecutivePartitions {
type Future = future::Ready<Result<PeerId, PartitionTableError>>;

fn partition_key_to_target_peer(&self, partition_key: PartitionKey) -> Self::Future {
let partition_id = FixedConsecutivePartitions::partition_key_to_partition_id(
self.num_partitions,
partition_key,
);

future::ready(Ok(partition_id))
}
}

#[derive(Debug)]
pub(crate) struct Partitioner {
num_partitions: u32,
next_partition_id: u32,
}

impl Partitioner {
fn new(num_partitions: u32) -> Self {
Self {
num_partitions,
next_partition_id: 0,
}
}
}

impl Iterator for Partitioner {
type Item = (PeerId, RangeInclusive<PartitionKey>);

fn next(&mut self) -> Option<Self::Item> {
if self.next_partition_id < self.num_partitions {
let partition_id = PeerId::from(self.next_partition_id);
self.next_partition_id += 1;

let partition_range = FixedConsecutivePartitions::partition_id_to_partition_range(
self.num_partitions,
partition_id,
);

Some((partition_id, partition_range))
} else {
None
}
}
}

#[cfg(test)]
mod tests {
use crate::partitioning_scheme::{FixedConsecutivePartitions, Partitioner};
use restate_common::types::{PartitionKey, PeerId};
use restate_network::PartitionTable;
use restate_test_util::test;

#[test]
fn partitioner_produces_consecutive_ranges() {
let partitioner = Partitioner::new(10);
let mut previous_end = None;
let mut previous_length = None::<PartitionKey>;

for (_id, range) in partitioner {
let current_length = *range.end() - *range.start();

if let Some(previous_length) = previous_length {
let length_diff = previous_length.abs_diff(current_length);
assert!(length_diff <= 1);
} else {
assert_eq!(*range.start(), 0);
}

if let Some(previous_end) = previous_end {
assert_eq!(previous_end + 1, *range.start());
}

previous_end = Some(*range.end());
previous_length = Some(current_length);
}

assert_eq!(previous_end, Some(PartitionKey::MAX));
}

impl FixedConsecutivePartitions {
async fn unchecked_partition_key_to_target_peer(
&self,
partition_key: PartitionKey,
) -> PeerId {
self.partition_key_to_target_peer(partition_key)
.await
.unwrap()
}
}

#[test(tokio::test)]
async fn partition_table_resolves_partition_keys() {
let num_partitions = 10;
let partition_table = FixedConsecutivePartitions::new(num_partitions);
let partitioner = partition_table.partitioner();

for (peer_id, partition_range) in partitioner {
assert_eq!(
partition_table
.unchecked_partition_key_to_target_peer(*partition_range.start())
.await,
peer_id
);
assert_eq!(
partition_table
.unchecked_partition_key_to_target_peer(*partition_range.end())
.await,
peer_id
);
}
}
}
91 changes: 0 additions & 91 deletions src/worker/src/range_partitioner.rs

This file was deleted.

14 changes: 8 additions & 6 deletions src/worker/src/services.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use crate::network_integration::FixedPartitionTable;
use crate::partition::{AckCommand, Command};
use restate_common::types::PeerTarget;
use restate_common::worker_command::{WorkerCommand, WorkerCommandSender};
use restate_consensus::ProposalSender;
use restate_network::{PartitionTable, PartitionTableError};
use restate_network::PartitionTableError;
use tokio::sync::mpsc;
use tracing::debug;

Expand All @@ -15,19 +14,22 @@ pub enum Error {
PartitionNotFound(#[from] PartitionTableError),
}

pub(crate) struct Services {
pub(crate) struct Services<PartitionTable> {
command_rx: mpsc::Receiver<WorkerCommand>,

proposal_tx: ProposalSender<PeerTarget<AckCommand>>,
partition_table: FixedPartitionTable,
partition_table: PartitionTable,

command_tx: WorkerCommandSender,
}

impl Services {
impl<PartitionTable> Services<PartitionTable>
where
PartitionTable: restate_network::PartitionTable,
{
pub(crate) fn new(
proposal_tx: ProposalSender<PeerTarget<AckCommand>>,
partition_table: FixedPartitionTable,
partition_table: PartitionTable,
channel_size: usize,
) -> Self {
let (command_tx, command_rx) = mpsc::channel(channel_size);
Expand Down

0 comments on commit 29ed806

Please sign in to comment.