Skip to content

Commit

Permalink
Defragment incoming stream chunks (fixes #431)
Browse files Browse the repository at this point in the history
  • Loading branch information
djc committed Jan 14, 2020
1 parent 8f31992 commit a5da6db
Showing 1 changed file with 42 additions and 2 deletions.
44 changes: 42 additions & 2 deletions quinn-proto/src/assembler.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
use std::{cmp::Ordering, collections::BinaryHeap};
use std::{cmp::Ordering, collections::BinaryHeap, mem};

use bytes::{Buf, Bytes};
use bytes::{Buf, Bytes, BytesMut};

/// Helper to assemble unordered stream frames into an ordered stream
#[derive(Debug)]
pub(crate) struct Assembler {
offset: u64,
data: BinaryHeap<Chunk>,
defragmented: usize,
}

impl Assembler {
pub fn new() -> Self {
Self {
offset: 0,
data: BinaryHeap::new(),
defragmented: 0,
}
}

Expand All @@ -22,6 +24,7 @@ impl Assembler {
loop {
if self.consume(buf, &mut read) {
self.data.pop();
self.defragmented = self.defragmented.saturating_sub(1);
} else {
break;
}
Expand Down Expand Up @@ -71,6 +74,28 @@ impl Assembler {
}
}

fn defragment(&mut self) {
// Set up a single-shot allocated byte buffer to back all chunks
let buffered = self.data.iter().map(|c| c.bytes.len()).sum::<usize>();
let mut buffer = BytesMut::with_capacity(buffered);
let mut offset = self.offset;

let new = BinaryHeap::with_capacity(self.data.len());
let old = mem::replace(&mut self.data, new);
for chunk in old.into_sorted_vec().into_iter().rev() {
if offset + (buffer.len() as u64) < chunk.offset {
let bytes = buffer.split().freeze();
self.data.push(Chunk { offset, bytes });
offset = chunk.offset;
}
buffer.extend_from_slice(&chunk.bytes);
}

let bytes = buffer.split().freeze();
self.data.push(Chunk { offset, bytes });
self.defragmented = self.data.len();
}

#[cfg(test)]
fn next(&mut self, size: usize) -> Option<Box<[u8]>> {
let mut buf = vec![0; size];
Expand All @@ -89,6 +114,9 @@ impl Assembler {

pub(crate) fn insert(&mut self, offset: u64, bytes: Bytes) {
self.data.push(Chunk { offset, bytes });
if self.data.len() - self.defragmented > 32 {
self.defragment()
}
}

/// Current position in the stream
Expand Down Expand Up @@ -216,4 +244,16 @@ mod test {
x.insert(0, Bytes::from_static(b"1234"));
assert_matches!(x.next(32), None);
}

#[test]
fn compact() {
let mut x = Assembler::new();
x.insert(0, Bytes::from_static(b"abc"));
x.insert(3, Bytes::from_static(b"def"));
x.insert(9, Bytes::from_static(b"jkl"));
x.insert(12, Bytes::from_static(b"mno"));
x.defragment();
assert_eq!(x.pop(), Some((0, Bytes::from_static(b"abcdef"))));
assert_eq!(x.pop(), Some((9, Bytes::from_static(b"jklmno"))));
}
}

0 comments on commit a5da6db

Please sign in to comment.