Skip to content

Commit

Permalink
Revert "feat: Change send to not block on waiting receipt (#298)"
Browse files Browse the repository at this point in the history
This reverts commit 5c245cd.
  • Loading branch information
mattisonchao committed May 2, 2024
1 parent 4f243f6 commit b7435fa
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 93 deletions.
91 changes: 35 additions & 56 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,22 +323,20 @@ impl<Exe: Executor> ConnectionSender<Exe> {
}

#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub(crate) fn send(
pub(crate) async fn send(
&self,
producer_id: u64,
producer_name: String,
sequence_id: u64,
message: producer::ProducerMessage,
) -> Result<
impl Future<Output = Result<proto::CommandSendReceipt, ConnectionError>>,
ConnectionError,
> {
) -> Result<proto::CommandSendReceipt, ConnectionError> {
let key = RequestKey::ProducerSend {
producer_id,
sequence_id,
};
let msg = messages::send(producer_id, producer_name, sequence_id, message);
self.send_message_non_blocking(msg, key, |resp| resp.command.send_receipt)
self.send_message(msg, key, |resp| resp.command.send_receipt)
.await
}

#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
Expand Down Expand Up @@ -625,31 +623,17 @@ impl<Exe: Executor> ConnectionSender<Exe> {
extract: F,
) -> Result<R, ConnectionError>
where
F: FnOnce(Message) -> Option<R> + 'static,
{
self.send_message_non_blocking(msg, key, extract)?.await
}

#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
fn send_message_non_blocking<R: Debug, F>(
&self,
msg: Message,
key: RequestKey,
extract: F,
) -> Result<impl Future<Output = Result<R, ConnectionError>>, ConnectionError>
where
F: FnOnce(Message) -> Option<R> + 'static,
F: FnOnce(Message) -> Option<R>,
{
let (resolver, response) = oneshot::channel();
trace!("sending message(key = {:?}): {:?}", key, msg);

let k = key.clone();
let error = self.error.clone();
let response = async move {
let response = async {
response
.await
.map_err(|oneshot::Canceled| {
error.set(ConnectionError::Disconnected);
self.error.set(ConnectionError::Disconnected);
ConnectionError::Disconnected
})
.map(move |message: Message| {
Expand All @@ -664,41 +648,36 @@ impl<Exe: Executor> ConnectionSender<Exe> {
self.tx.unbounded_send(msg),
) {
(Ok(_), Ok(_)) => {
let connection_id = self.connection_id;
let error = self.error.clone();
let delay_f = self.executor.delay(self.operation_timeout);
let fut = async move {
pin_mut!(response);
pin_mut!(delay_f);
match select(response, delay_f).await {
Either::Left((res, _)) => {
// println!("recv msg: {:?}", res);
res
}
Either::Right(_) => {
warn!(
"connection {} timedout sending message to the Pulsar server",
connection_id
);
error.set(ConnectionError::Io(std::io::Error::new(
std::io::ErrorKind::TimedOut,
format!(
" connection {} timedout sending message to the Pulsar server",
connection_id
),
)));
Err(ConnectionError::Io(std::io::Error::new(
std::io::ErrorKind::TimedOut,
format!(
" connection {} timedout sending message to the Pulsar server",
connection_id
),
)))
}
}
};
pin_mut!(response);
pin_mut!(delay_f);

Ok(fut)
match select(response, delay_f).await {
Either::Left((res, _)) => {
// println!("recv msg: {:?}", res);
res
}
Either::Right(_) => {
warn!(
"connection {} timedout sending message to the Pulsar server",
self.connection_id
);
self.error.set(ConnectionError::Io(std::io::Error::new(
std::io::ErrorKind::TimedOut,
format!(
" connection {} timedout sending message to the Pulsar server",
self.connection_id
),
)));
Err(ConnectionError::Io(std::io::Error::new(
std::io::ErrorKind::TimedOut,
format!(
" connection {} timedout sending message to the Pulsar server",
self.connection_id
),
)))
}
}
}
_ => {
warn!(
Expand Down
58 changes: 21 additions & 37 deletions src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ impl<Exe: Executor> TopicProducer<Exe> {
};

trace!("sending a batched message of size {}", message_count);
let send_receipt = self.send_compress(message).await?.await.map_err(Arc::new);
let send_receipt = self.send_compress(message).await.map_err(Arc::new);
for resolver in receipts {
let _ = resolver.send(
send_receipt
Expand All @@ -557,13 +557,8 @@ impl<Exe: Executor> TopicProducer<Exe> {
let (tx, rx) = oneshot::channel();
match self.batch.as_ref() {
None => {
let fut = self.send_compress(message).await?;
self.client
.executor
.spawn(Box::pin(async move {
let _ = tx.send(fut.await);
}))
.map_err(|_| Error::Executor)?;
let receipt = self.send_compress(message).await?;
let _ = tx.send(Ok(receipt));
Ok(SendFuture(rx))
}
Some(batch) => {
Expand Down Expand Up @@ -591,18 +586,16 @@ impl<Exe: Executor> TopicProducer<Exe> {
..Default::default()
};

let send_receipt = self.send_compress(message).await.map_err(Arc::new);

trace!("sending a batched message of size {}", counter);
let receipt_fut = self.send_compress(message).await?;
self.client
.executor
.spawn(Box::pin(async move {
let res = receipt_fut.await.map_err(Arc::new);
for tx in receipts.drain(..) {
let _ = tx
.send(res.clone().map_err(|e| ProducerError::Batch(e).into()));
}
}))
.map_err(|_| Error::Executor)?;
for tx in receipts.drain(..) {
let _ = tx.send(
send_receipt
.clone()
.map_err(|e| ProducerError::Batch(e).into()),
);
}
}

Ok(SendFuture(rx))
Expand All @@ -614,7 +607,7 @@ impl<Exe: Executor> TopicProducer<Exe> {
async fn send_compress(
&mut self,
mut message: ProducerMessage,
) -> Result<impl Future<Output = Result<CommandSendReceipt, Error>>, Error> {
) -> Result<proto::CommandSendReceipt, Error> {
let compressed_message = match self.compression.clone() {
None | Some(Compression::None) => message,
#[cfg(feature = "lz4")]
Expand Down Expand Up @@ -681,25 +674,16 @@ impl<Exe: Executor> TopicProducer<Exe> {
async fn send_inner(
&mut self,
message: ProducerMessage,
) -> Result<impl Future<Output = Result<CommandSendReceipt, Error>>, Error> {
) -> Result<proto::CommandSendReceipt, Error> {
loop {
let msg = message.clone();
match self.connection.sender().send(
self.id,
self.name.clone(),
self.message_id.get(),
msg,
) {
Ok(fut) => {
let fut = async move {
let res = fut.await;
res.map_err(|e| {
error!("wait send receipt got error: {:?}", e);
Error::Producer(ProducerError::Connection(e))
})
};
return Ok(fut);
}
match self
.connection
.sender()
.send(self.id, self.name.clone(), self.message_id.get(), msg)
.await
{
Ok(receipt) => return Ok(receipt),
Err(ConnectionError::Disconnected) => {}
Err(ConnectionError::Io(e)) => {
if e.kind() != std::io::ErrorKind::TimedOut {
Expand Down

0 comments on commit b7435fa

Please sign in to comment.