Skip to content

Commit

Permalink
address review
Browse files Browse the repository at this point in the history
  • Loading branch information
jxs committed Mar 15, 2024
1 parent 2867280 commit e154864
Showing 1 changed file with 32 additions and 32 deletions.
64 changes: 32 additions & 32 deletions yamux/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,12 +401,11 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {

// Privilege pending `Pong` and `GoAway` `Frame`s
// over `Frame`s from the receivers.
if let Some(frame) = self.pending_read_frame.take() {
self.socket.start_send_unpin(frame)?;
continue;
}

if let Some(frame) = self.pending_write_frame.take() {
if let Some(frame) = self
.pending_read_frame
.take()
.or_else(|| self.pending_write_frame.take())
{
self.socket.start_send_unpin(frame)?;
continue;
}
Expand All @@ -417,32 +416,6 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
Poll::Pending => {}
}

if self.pending_read_frame.is_none() {
match self.socket.poll_next_unpin(cx) {
Poll::Ready(Some(frame)) => {
match self.on_frame(frame?)? {
Action::None => {}
Action::New(stream) => {
log::trace!("{}: new inbound {} of {}", self.id, stream, self);
return Poll::Ready(Ok(stream));
}
Action::Ping(f) => {
log::trace!("{}/{}: pong", self.id, f.header().stream_id());
self.pending_read_frame.replace(f.into());
}
Action::Terminate(f) => {
log::trace!("{}: sending term", self.id);
self.pending_read_frame.replace(f.into());
}
}
continue;
}
Poll::Ready(None) => {
return Poll::Ready(Err(ConnectionError::Closed));
}
Poll::Pending => {}
}
}
if self.pending_write_frame.is_none() {
match self.stream_receivers.poll_next_unpin(cx) {
Poll::Ready(Some((_, Some(StreamCommand::SendFrame(frame))))) => {
Expand Down Expand Up @@ -475,6 +448,33 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
}
}

if self.pending_read_frame.is_none() {
match self.socket.poll_next_unpin(cx) {
Poll::Ready(Some(frame)) => {
match self.on_frame(frame?)? {
Action::None => {}
Action::New(stream) => {
log::trace!("{}: new inbound {} of {}", self.id, stream, self);
return Poll::Ready(Ok(stream));
}
Action::Ping(f) => {
log::trace!("{}/{}: pong", self.id, f.header().stream_id());
self.pending_read_frame.replace(f.into());
}
Action::Terminate(f) => {
log::trace!("{}: sending term", self.id);
self.pending_read_frame.replace(f.into());
}
}
continue;
}
Poll::Ready(None) => {
return Poll::Ready(Err(ConnectionError::Closed));
}
Poll::Pending => {}
}
}

// If we make it this far, at least one of the above must have registered a waker.
return Poll::Pending;
}
Expand Down

0 comments on commit e154864

Please sign in to comment.