Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store buffered data size as usize (fixes #269) #542

Merged
merged 1 commit into from
Sep 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 7 additions & 23 deletions src/proto/streams/flow_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ impl FlowControl {
);

// Ensure that the argument is correct
assert!(sz <= self.window_size);
assert!(self.window_size >= sz as usize);

// Update values
self.window_size -= sz;
Expand Down Expand Up @@ -206,38 +206,22 @@ impl Window {
}
}

impl PartialEq<WindowSize> for Window {
fn eq(&self, other: &WindowSize) -> bool {
impl PartialEq<usize> for Window {
fn eq(&self, other: &usize) -> bool {
if self.0 < 0 {
false
} else {
(self.0 as WindowSize).eq(other)
(self.0 as usize).eq(other)
}
}
}

impl PartialEq<Window> for WindowSize {
fn eq(&self, other: &Window) -> bool {
other.eq(self)
}
}

impl PartialOrd<WindowSize> for Window {
fn partial_cmp(&self, other: &WindowSize) -> Option<::std::cmp::Ordering> {
impl PartialOrd<usize> for Window {
fn partial_cmp(&self, other: &usize) -> Option<::std::cmp::Ordering> {
if self.0 < 0 {
Some(::std::cmp::Ordering::Less)
} else {
(self.0 as WindowSize).partial_cmp(other)
}
}
}

impl PartialOrd<Window> for WindowSize {
fn partial_cmp(&self, other: &Window) -> Option<::std::cmp::Ordering> {
if other.0 < 0 {
Some(::std::cmp::Ordering::Greater)
} else {
self.partial_cmp(&(other.0 as WindowSize))
(self.0 as usize).partial_cmp(other)
}
}
}
Expand Down
36 changes: 19 additions & 17 deletions src/proto/streams/prioritize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ impl Prioritize {
}

// Update the buffered data counter
stream.buffered_send_data += sz;
stream.buffered_send_data += sz as usize;

let span =
tracing::trace_span!("send_data", sz, requested = stream.requested_send_capacity);
Expand All @@ -167,9 +167,10 @@ impl Prioritize {

// Implicitly request more send capacity if not enough has been
// requested yet.
if stream.requested_send_capacity < stream.buffered_send_data {
if (stream.requested_send_capacity as usize) < stream.buffered_send_data {
// Update the target requested capacity
stream.requested_send_capacity = stream.buffered_send_data;
stream.requested_send_capacity =
cmp::min(stream.buffered_send_data, WindowSize::MAX as usize) as WindowSize;

self.try_assign_capacity(stream);
}
Expand Down Expand Up @@ -217,28 +218,28 @@ impl Prioritize {
"reserve_capacity",
?stream.id,
requested = capacity,
effective = capacity + stream.buffered_send_data,
effective = (capacity as usize) + stream.buffered_send_data,
curr = stream.requested_send_capacity
);
let _e = span.enter();

// Actual capacity is `capacity` + the current amount of buffered data.
// If it were less, then we could never send out the buffered data.
let capacity = capacity + stream.buffered_send_data;
let capacity = (capacity as usize) + stream.buffered_send_data;

if capacity == stream.requested_send_capacity {
if capacity == stream.requested_send_capacity as usize {
// Nothing to do
} else if capacity < stream.requested_send_capacity {
} else if capacity < stream.requested_send_capacity as usize {
// Update the target requested capacity
stream.requested_send_capacity = capacity;
stream.requested_send_capacity = capacity as WindowSize;

// Currently available capacity assigned to the stream
let available = stream.send_flow.available().as_size();

// If the stream has more assigned capacity than requested, reclaim
// some for the connection
if available > capacity {
let diff = available - capacity;
if available as usize > capacity {
let diff = available - capacity as WindowSize;

stream.send_flow.claim_capacity(diff);

Expand All @@ -252,7 +253,8 @@ impl Prioritize {
}

// Update the target requested capacity
stream.requested_send_capacity = capacity;
stream.requested_send_capacity =
cmp::min(capacity, WindowSize::MAX as usize) as WindowSize;

// Try to assign additional capacity to the stream. If none is
// currently available, the stream will be queued to receive some
Expand Down Expand Up @@ -316,8 +318,8 @@ impl Prioritize {
/// it to the connection
pub fn reclaim_reserved_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts) {
// only reclaim requested capacity that isn't already buffered
if stream.requested_send_capacity > stream.buffered_send_data {
let reserved = stream.requested_send_capacity - stream.buffered_send_data;
if stream.requested_send_capacity as usize > stream.buffered_send_data {
let reserved = stream.requested_send_capacity - stream.buffered_send_data as WindowSize;

stream.send_flow.claim_capacity(reserved);
self.assign_connection_capacity(reserved, stream, counts);
Expand Down Expand Up @@ -377,7 +379,7 @@ impl Prioritize {

// Total requested should never go below actual assigned
// (Note: the window size can go lower than assigned)
debug_assert!(total_requested >= stream.send_flow.available());
debug_assert!(stream.send_flow.available() <= total_requested as usize);

// The amount of additional capacity that the stream requests.
// Don't assign more than the window has available!
Expand Down Expand Up @@ -435,7 +437,7 @@ impl Prioritize {
has_unavailable = %stream.send_flow.has_unavailable()
);

if stream.send_flow.available() < stream.requested_send_capacity
if stream.send_flow.available() < stream.requested_send_capacity as usize
&& stream.send_flow.has_unavailable()
{
// The stream requires additional capacity and the stream's
Expand Down Expand Up @@ -735,8 +737,8 @@ impl Prioritize {
stream.send_flow.send_data(len);

// Decrement the stream's buffered data counter
debug_assert!(stream.buffered_send_data >= len);
stream.buffered_send_data -= len;
debug_assert!(stream.buffered_send_data >= len as usize);
stream.buffered_send_data -= len as usize;
stream.requested_send_capacity -= len;

// Assign the capacity back to the connection that
Expand Down
4 changes: 2 additions & 2 deletions src/proto/streams/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,10 +336,10 @@ impl Send {
let available = stream.send_flow.available().as_size();
let buffered = stream.buffered_send_data;

if available <= buffered {
if available as usize <= buffered {
0
} else {
available - buffered
available - buffered as WindowSize
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/proto/streams/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub(super) struct Stream {

/// Amount of data buffered at the prioritization layer.
/// TODO: Technically this could be greater than the window size...
pub buffered_send_data: WindowSize,
pub buffered_send_data: usize,

/// Task tracking additional send capacity (i.e. window updates).
send_task: Option<Waker>,
Expand Down