Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

retry: Simplify ReplayBody::poll_data for readability #1346

Merged
merged 4 commits into from
Nov 1, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 39 additions & 45 deletions linkerd/http-retry/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,79 +180,73 @@ where
buf.has_remaining = state.buf.has_remaining(),
body.is_completed = state.is_completed,
body.max_bytes_remaining = state.max_bytes,
"Replay::poll_data"
"ReplayBody::poll_data"
);

// If we haven't replayed the buffer yet, and its not empty, return the
// buffered data first.
if this.replay_body {
if state.buf.has_remaining() {
tracing::trace!("replaying body");
tracing::trace!("Replaying body");
// Don't return the buffered data again on the next poll.
this.replay_body = false;
return Poll::Ready(Some(Ok(Data::Replay(state.buf.clone()))));
}

if state.is_capped() {
tracing::trace!("cannot replay buffered body, maximum buffer length reached");
tracing::trace!("Cannot replay buffered body, maximum buffer length reached");
return Poll::Ready(Some(Err(Capped.into())));
}
}

// Get access to the initial body. If we don't have access to the inner body, there's no
// more work to do.
let rest = match state.rest.as_mut() {
Some(rest) => rest,
None => return Poll::Ready(None),
};

// If the inner body has previously ended, don't poll it again.
//
// NOTE(eliza): we would expect the inner body to just happily return
// `None` multiple times here, but `hyper::Body::channel` (which we use
// in the tests) will panic if it is polled after returning `None`, so
// we have to special-case this. :/
// NOTE(eliza): we would expect the inner body to just happily return `None` multiple times
// here, but `hyper::Body::channel` (which we use in the tests) will panic if it is polled
// after returning `None`, so we have to special-case this. :/
if state.is_completed {
return Poll::Ready(None);
}

// If there's more data in the initial body, poll that...
if let Some(rest) = state.rest.as_mut() {
tracing::trace!("Polling initial body");
let opt = futures::ready!(Pin::new(rest).poll_data(cx));

// If the body has ended, remember that so that future clones will
// not try polling it again --- some `Body` types will panic if they
// are polled after returning `None`.
if opt.is_none() {
// Poll the inner body for more data. If the body has ended, remember that so that future
// clones will not try polling it again (as described above).
tracing::trace!("Polling initial body");
let mut data = match futures::ready!(Pin::new(rest).poll_data(cx)) {
Some(Ok(data)) => data,
Some(Err(e)) => return Poll::Ready(Some(Err(e.into()))),
None => {
tracing::trace!("Initial body completed");
state.is_completed = true;
return Poll::Ready(None);
}
return Poll::Ready(opt.map(|ok| {
ok.map(|mut data| {
// If we have buffered the maximum number of bytes, allow
// *this* body to continue, but don't buffer any more.
let length = data.remaining();
state.max_bytes = state.max_bytes.saturating_sub(length);
if state.is_capped() {
// If there's data in the buffer, discard it now, since
// we won't allow any clones to have a complete body.
if state.buf.has_remaining() {
tracing::debug!(
buf.size = state.buf.remaining(),
"buffered maximum capacity, discarding buffer"
);
state.buf = Default::default();
}
return Data::Initial(data.copy_to_bytes(length));
}

if state.is_capped() {
return Data::Initial(data.copy_to_bytes(length));
}
Comment on lines -230 to -245
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAICT the second block is redundant?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, huh, yeah, that looks like a copy-paste error or something...

};

// Buffer and return the bytes
Data::Initial(state.buf.push_chunk(data))
})
.map_err(Into::into)
}));
// If we have buffered the maximum number of bytes, allow *this* body to continue, but
// don't buffer any more.
let length = data.remaining();
state.max_bytes = state.max_bytes.saturating_sub(length);
if state.is_capped() {
// If there's data in the buffer, discard it now, since we won't allow any clones to
// have a complete body.
if state.buf.has_remaining() {
tracing::debug!(
buf.size = state.buf.remaining(),
"Buffered maximum capacity, discarding buffer"
);
state.buf = Default::default();
}
return Poll::Ready(Some(Ok(Data::Initial(data.copy_to_bytes(length)))));
}

// Otherwise, guess we're done!
Poll::Ready(None)
// Buffer and return the bytes.
Poll::Ready(Some(Ok(Data::Initial(state.buf.push_chunk(data)))))
}

fn poll_trailers(
Expand Down