Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial version of incremental record commitment (take 2) #1201

Merged
merged 2 commits into from
Mar 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/subspace-archiving/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ include = [
bench = false

[dependencies]
blake2 = { version = "0.10.5", default-features = false }
parity-scale-codec = { version = "3.2.1", default-features = false, features = ["derive"] }
reed-solomon-erasure = { version = "6.0.0", default-features = false }
serde = { version = "1.0.147", optional = true, features = ["derive"] }
Expand All @@ -30,6 +31,7 @@ rand = { version = "0.8.5", features = ["min_const_gen"] }
[features]
default = ["std"]
std = [
"blake2/std",
"parity-scale-codec/std",
"reed-solomon-erasure/simd-accel",
"reed-solomon-erasure/std",
Expand Down
47 changes: 39 additions & 8 deletions crates/subspace-archiving/src/archiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod incremental_record_commitments;
mod record_shards;

extern crate alloc;

use crate::archiver::incremental_record_commitments::{
update_record_commitments, IncrementalRecordCommitmentsState,
};
use crate::archiver::record_shards::RecordShards;
use crate::utils::GF_16_ELEMENT_BYTES;
use alloc::collections::VecDeque;
Expand Down Expand Up @@ -242,6 +246,8 @@ pub struct Archiver {
/// Buffer containing blocks and other buffered items that are pending to be included into the
/// next segment
buffer: VecDeque<SegmentItem>,
/// Intermediate record commitments that are built incrementally as above buffer fills up.
incremental_record_commitments: IncrementalRecordCommitmentsState,
/// Configuration parameter defining the size of one record (data in one piece excluding witness
/// size)
record_size: u32,
Expand Down Expand Up @@ -305,6 +311,9 @@ impl Archiver {

Ok(Self {
buffer: VecDeque::default(),
incremental_record_commitments: IncrementalRecordCommitmentsState::with_capacity(
data_shards as usize,
),
record_size,
data_shards,
parity_shards,
Expand Down Expand Up @@ -427,6 +436,13 @@ impl Archiver {
let segment_item = match self.buffer.pop_front() {
Some(segment_item) => segment_item,
None => {
update_record_commitments(
&mut self.incremental_record_commitments,
&segment,
false,
self.record_size as usize,
);

let Segment::V0 { items } = segment;
// Push all of the items back into the buffer, we don't have enough data yet
for segment_item in items.into_iter().rev() {
Expand Down Expand Up @@ -640,6 +656,13 @@ impl Archiver {

self.last_archived_block = last_archived_block;

update_record_commitments(
&mut self.incremental_record_commitments,
&segment,
true,
self.record_size as usize,
);

Some(segment)
}

Expand Down Expand Up @@ -724,20 +747,28 @@ impl Archiver {
let mut pieces = FlatPieces::new(record_shards_slices.len());
drop(record_shards_slices);

let record_shards_hashes = record_shards
.as_bytes()
.as_ref()
.chunks_exact(self.record_size as usize)
.map(blake2b_256_254_hash)
// We take hashes of source records computed incrementally
let record_commitments = self
.incremental_record_commitments
.drain()
.chain(
// TODO: Parity hashes will be erasure coded instead in the future
record_shards
.as_bytes()
.as_ref()
.chunks_exact(self.record_size as usize)
.skip(self.data_shards as usize)
.map(blake2b_256_254_hash),
)
.collect::<Vec<_>>();

let data = {
let mut data = Vec::with_capacity(
(self.data_shards + self.parity_shards) as usize * BLAKE2B_256_HASH_SIZE,
);

for shard in &record_shards_hashes {
// TODO: Eventually we need to commit to data itself, not hashes
data.extend_from_slice(shard);
for shard_commitment in record_commitments {
data.extend_from_slice(&shard_commitment);
}

data
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
extern crate alloc;

use crate::archiver::Segment;
use alloc::collections::VecDeque;
use blake2::digest::typenum::U32;
use blake2::digest::{FixedOutput, Update};
use blake2::Blake2b;
use core::mem;
use parity_scale_codec::{Encode, Output};
use subspace_core_primitives::Blake2b256Hash;

/// State of incremental record commitments, encapsulated to hide implementation details and
/// encapsulate tricky logic
#[derive(Debug, Default, Clone)]
pub(super) struct IncrementalRecordCommitmentsState {
/// State contains record commitments.
///
/// NOTE: Until full segment is processed, this will not contain commitment to the first record
/// since it is not ready yet. This in turn means all commitments will be at `-1` offset.
state: VecDeque<Blake2b256Hash>,
}

impl IncrementalRecordCommitmentsState {
/// Creates an empty state with space for at least capacity records.
pub(super) fn with_capacity(capacity: usize) -> Self {
Self {
state: VecDeque::with_capacity(capacity),
}
}

pub(super) fn drain(&mut self) -> impl Iterator<Item = Blake2b256Hash> + '_ {
self.state.drain(..)
}
}

/// Update internal record commitments state based on (full or partial) segment.
pub(super) fn update_record_commitments(
incremental_record_commitments: &mut IncrementalRecordCommitmentsState,
segment: &Segment,
full: bool,
record_size: usize,
) {
segment.encode_to(&mut IncrementalRecordCommitmentsProcessor::new(
incremental_record_commitments,
record_size,
full,
));
}

/// Processor is hidden to not expose unnecessary implementation details (like `Output` trait
/// implementation)
struct IncrementalRecordCommitmentsProcessor<'a> {
/// Processed bytes in the segment so far
processed_bytes: usize,
/// Record commitments already created
incremental_record_commitments: &'a mut IncrementalRecordCommitmentsState,
/// Record size
record_size: usize,
/// Whether segment is full or partial
full: bool,
/// Intermediate hashing state that computes Blake2-256-254.
///
/// See [`subspace_core_primitives::crypto::blake2b_256_254_hash`] for details.
hashing_state: Blake2b<U32>,
}

impl<'a> Drop for IncrementalRecordCommitmentsProcessor<'a> {
fn drop(&mut self) {
if self.full {
let record_offset = self.processed_bytes % self.record_size;
if record_offset > 0 {
// This is fine since we'll have at most a few iterations and allocation is less
// desirable than a loop here
for _ in 0..(self.record_size - record_offset) {
self.update_commitment_state(&[0]);
}
self.create_commitment();
}
}
}
}

impl<'a> Output for IncrementalRecordCommitmentsProcessor<'a> {
fn write(&mut self, mut bytes: &[u8]) {
// Try to finish last partial record if possible

let record_offset = self.processed_bytes % self.record_size;
let bytes_left_in_record = self.record_size - record_offset;
if bytes_left_in_record > 0 {
let remaining_record_bytes;
(remaining_record_bytes, bytes) =
bytes.split_at(if bytes.len() >= bytes_left_in_record {
bytes_left_in_record
} else {
bytes.len()
});

self.update_commitment_state(remaining_record_bytes);

if remaining_record_bytes.len() == bytes_left_in_record {
self.create_commitment();
}
}

// Continue processing records (full and partial) from remaining data, at this point we have
// processed some number of full records, so can simply chunk the remaining bytes into
// record sizes
bytes.chunks(self.record_size).for_each(|record| {
self.update_commitment_state(record);

// Store hashes of full records
if record.len() == self.record_size {
self.create_commitment();
}
});
}
}

impl<'a> IncrementalRecordCommitmentsProcessor<'a> {
fn new(
incremental_record_commitments: &'a mut IncrementalRecordCommitmentsState,
record_size: usize,
full: bool,
) -> Self {
Self {
processed_bytes: 0,
incremental_record_commitments,
record_size,
full,
hashing_state: Blake2b::<U32>::default(),
}
}

/// Whether commitment for current record needs to be created
fn should_commit_to_record(&self, record_position: usize) -> bool {
self.incremental_record_commitments
.state
.get(record_position)
.is_none()
}

/// In case commitment is necessary for currently processed record, internal commitment state
/// will be updated with provided bytes.
///
/// NOTE: This method is called with bytes that either cover part of the record or stop at the
/// edge of the record.
fn update_commitment_state(&mut self, bytes: &[u8]) {
if self.should_commit_to_record(self.processed_bytes / self.record_size) {
self.hashing_state.update(bytes);
}
self.processed_bytes += bytes.len();
}

/// In case commitment is necessary for currently processed record, internal hashing state will
/// be finalized and commitment will be stored in shared state.
fn create_commitment(&mut self) {
if self.should_commit_to_record(self.processed_bytes / self.record_size - 1) {
let hashing_state = mem::take(&mut self.hashing_state);

let mut hash = Blake2b256Hash::from(hashing_state.finalize_fixed());
// Erase last 2 bits to effectively truncate the hash (number is interpreted as
// little-endian)
hash[31] &= 0b00111111;

self.incremental_record_commitments.state.push_back(hash);
}
}
}