Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

retry: Simplify ReplayBody::poll_data for readability #1346

Merged
merged 4 commits into from
Nov 1, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 48 additions & 47 deletions linkerd/http-retry/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,21 +180,21 @@ where
buf.has_remaining = state.buf.has_remaining(),
body.is_completed = state.is_completed,
body.max_bytes_remaining = state.max_bytes,
"Replay::poll_data"
"ReplayBody::poll_data"
);

// If we haven't replayed the buffer yet, and its not empty, return the
// buffered data first.
if this.replay_body {
if state.buf.has_remaining() {
tracing::trace!("replaying body");
tracing::trace!("Replaying body");
// Don't return the buffered data again on the next poll.
this.replay_body = false;
return Poll::Ready(Some(Ok(Data::Replay(state.buf.clone()))));
}

if state.is_capped() {
tracing::trace!("cannot replay buffered body, maximum buffer length reached");
tracing::trace!("Cannot replay buffered body, maximum buffer length reached");
return Poll::Ready(Some(Err(Capped.into())));
}
}
Expand All @@ -209,50 +209,50 @@ where
return Poll::Ready(None);
}

// If there's more data in the initial body, poll that...
if let Some(rest) = state.rest.as_mut() {
// Poll the inner body for more data. If the body has ended, remember
// that so that future clones will not try polling it again (as
// described above).
let mut data = {
// Get access to the initial body. If we don't have access to the
// inner body, there's no more work to do.
let rest = match state.rest.as_mut() {
Some(rest) => rest,
None => return Poll::Ready(None),
};

tracing::trace!("Polling initial body");
let opt = futures::ready!(Pin::new(rest).poll_data(cx));

// If the body has ended, remember that so that future clones will
// not try polling it again --- some `Body` types will panic if they
// are polled after returning `None`.
if opt.is_none() {
tracing::trace!("Initial body completed");
state.is_completed = true;
match futures::ready!(Pin::new(rest).poll_data(cx)) {
Some(Ok(data)) => data,
Some(Err(e)) => return Poll::Ready(Some(Err(e.into()))),
None => {
tracing::trace!("Initial body completed");
state.is_completed = true;
return Poll::Ready(None);
}
}
return Poll::Ready(opt.map(|ok| {
ok.map(|mut data| {
// If we have buffered the maximum number of bytes, allow
// *this* body to continue, but don't buffer any more.
let length = data.remaining();
state.max_bytes = state.max_bytes.saturating_sub(length);
if state.is_capped() {
// If there's data in the buffer, discard it now, since
// we won't allow any clones to have a complete body.
if state.buf.has_remaining() {
tracing::debug!(
buf.size = state.buf.remaining(),
"buffered maximum capacity, discarding buffer"
);
state.buf = Default::default();
}
return Data::Initial(data.copy_to_bytes(length));
}

if state.is_capped() {
return Data::Initial(data.copy_to_bytes(length));
}
Comment on lines -230 to -245
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAICT the second block is redundant?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, huh, yeah, that looks like a copy-paste error or something...

};

// Buffer and return the bytes
Data::Initial(state.buf.push_chunk(data))
})
.map_err(Into::into)
}));
}
// If we have buffered the maximum number of bytes, allow *this* body to
// continue, but don't buffer any more.
let length = data.remaining();
state.max_bytes = state.max_bytes.saturating_sub(length);
let chunk = if state.is_capped() {
// If there's data in the buffer, discard it now, since we won't
// allow any clones to have a complete body.
if state.buf.has_remaining() {
tracing::debug!(
buf.size = state.buf.remaining(),
"Buffered maximum capacity, discarding buffer"
);
state.buf = Default::default();
}
data.copy_to_bytes(length)
} else {
// Buffer and return the bytes.
state.buf.push_chunk(data)
};

// Otherwise, guess we're done!
Poll::Ready(None)
Poll::Ready(Some(Ok(Data::Initial(chunk))))
}

fn poll_trailers(
Expand Down Expand Up @@ -319,16 +319,17 @@ where
None => return self.shared.orig_size_hint.clone(),
};

// Otherwise, if we're holding the state but have dropped the inner body, the entire body is
// buffered so we know the exact size hint.
// Otherwise, if we're holding the state but have dropped the inner
// body, the entire body is buffered so we know the exact size hint.
let buffered = state.buf.remaining() as u64;
let rest_hint = match state.rest.as_ref() {
Some(rest) => rest.size_hint(),
None => return SizeHint::with_exact(buffered),
};

// Otherwise, add the inner body's size hint to the amount of buffered data. An upper limit
// is only set if the inner body has an upper limit.
// Otherwise, add the inner body's size hint to the amount of buffered
// data. An upper limit is only set if the inner body has an upper
// limit.
let mut hint = SizeHint::default();
hint.set_lower(buffered + rest_hint.lower());
if let Some(rest_upper) = rest_hint.upper() {
Expand Down