Skip to content

Commit

Permalink
Refactor use of futures crate
Browse files Browse the repository at this point in the history
  • Loading branch information
tottoto committed Nov 11, 2023
1 parent 38c05ba commit 2faae6d
Show file tree
Hide file tree
Showing 29 changed files with 56 additions and 77 deletions.
1 change: 0 additions & 1 deletion examples/tonic-key-value-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ bytes = "1"
hyper = { version = "0.14.4", features = ["full"] }
prost = "0.11"
tokio = { version = "1.2.0", features = ["full"] }
futures = "0.3"
tokio-stream = { version = "0.1", features = ["sync", "net"] }
tonic = "0.9"
tower = { version = "0.4.5", features = ["full"] }
Expand Down
5 changes: 2 additions & 3 deletions examples/tonic-key-value-store/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use bytes::Bytes;
use clap::Parser;
use futures::StreamExt;
use hyper::{
body::HttpBody,
header::{self, HeaderValue},
Expand All @@ -24,7 +23,7 @@ use tokio::{
};
use tokio_stream::{
wrappers::{BroadcastStream, TcpListenerStream},
Stream,
Stream, StreamExt,
};
use tonic::{async_trait, body::BoxBody, transport::Channel, Code, Request, Response, Status};
use tower::{BoxError, Service, ServiceBuilder};
Expand Down Expand Up @@ -245,7 +244,7 @@ impl key_value_store_server::KeyValueStore for ServerImpl {

let rx = self.tx.subscribe();
let stream = BroadcastStream::new(rx)
.filter_map(|item| async move {
.filter_map(|item| {
// ignore receive errors
item.ok()
})
Expand Down
28 changes: 13 additions & 15 deletions tower-http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ rust-version = "1.66"
[dependencies]
bitflags = "2.0.2"
bytes = "1"
futures-core = "0.3"
futures-util = { version = "0.3.14", default_features = false, features = [] }
http = "0.2.7"
http-body = "0.4.5"
pin-project-lite = "0.2.7"
Expand All @@ -26,6 +24,7 @@ tower-service = "0.3"
# optional dependencies
async-compression = { version = "0.4", optional = true, features = ["tokio"] }
base64 = { version = "0.21", optional = true }
futures-util = { version = "0.3.14", optional = true, default_features = false }
http-range-header = { version = "0.4.0", optional = true }
iri-string = { version = "0.7.0", optional = true }
mime = { version = "0.3.17", optional = true, default_features = false }
Expand All @@ -42,7 +41,6 @@ uuid = { version = "1.0", features = ["v4"], optional = true }
bytes = "1"
flate2 = "1.0"
brotli = "3"
futures = "0.3"
hyper = { version = "0.14", features = ["full"] }
once_cell = "1"
tokio = { version = "1", features = ["full"] }
Expand Down Expand Up @@ -81,11 +79,11 @@ full = [
]

add-extension = []
auth = ["base64", "validate-request"]
auth = ["base64", "futures-util", "validate-request"]
catch-panic = ["tracing", "futures-util/std"]
cors = []
follow-redirect = ["iri-string", "tower/util"]
fs = ["tokio/fs", "tokio-util/io", "tokio/io-util", "dep:http-range-header", "mime_guess", "mime", "percent-encoding", "httpdate", "set-status", "futures-util/alloc", "tracing"]
follow-redirect = ["futures-util", "iri-string", "tower/util"]
fs = ["futures-util", "tokio/fs", "tokio-util/io", "tokio/io-util", "dep:http-range-header", "mime_guess", "mime", "percent-encoding", "httpdate", "set-status", "futures-util/alloc", "tracing"]
limit = []
map-request-body = []
map-response-body = []
Expand All @@ -98,21 +96,21 @@ sensitive-headers = []
set-header = []
set-status = []
timeout = ["tokio/time"]
trace = ["tracing"]
trace = ["futures-util", "tracing"]
util = ["tower"]
validate-request = ["mime"]

compression-br = ["async-compression/brotli", "tokio-util", "tokio"]
compression-deflate = ["async-compression/zlib", "tokio-util", "tokio"]
compression-br = ["async-compression/brotli", "futures-util", "tokio-util", "tokio"]
compression-deflate = ["async-compression/zlib", "futures-util", "tokio-util", "tokio"]
compression-full = ["compression-br", "compression-deflate", "compression-gzip", "compression-zstd"]
compression-gzip = ["async-compression/gzip", "tokio-util", "tokio"]
compression-zstd = ["async-compression/zstd", "tokio-util", "tokio"]
compression-gzip = ["async-compression/gzip", "futures-util", "tokio-util", "tokio"]
compression-zstd = ["async-compression/zstd", "futures-util", "tokio-util", "tokio"]

decompression-br = ["async-compression/brotli", "tokio-util", "tokio"]
decompression-deflate = ["async-compression/zlib", "tokio-util", "tokio"]
decompression-br = ["async-compression/brotli", "futures-util", "tokio-util", "tokio"]
decompression-deflate = ["async-compression/zlib", "futures-util", "tokio-util", "tokio"]
decompression-full = ["decompression-br", "decompression-deflate", "decompression-gzip", "decompression-zstd"]
decompression-gzip = ["async-compression/gzip", "tokio-util", "tokio"]
decompression-zstd = ["async-compression/zstd", "tokio-util", "tokio"]
decompression-gzip = ["async-compression/gzip", "futures-util", "tokio-util", "tokio"]
decompression-zstd = ["async-compression/zstd", "futures-util", "tokio-util", "tokio"]

[package.metadata.docs.rs]
all-features = true
Expand Down
3 changes: 1 addition & 2 deletions tower-http/src/auth/async_require_authorization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,12 @@
//! # }
//! ```
use futures_core::ready;
use http::{Request, Response};
use pin_project_lite::pin_project;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
task::{ready, Context, Poll},
};
use tower_layer::Layer;
use tower_service::Service;
Expand Down
3 changes: 1 addition & 2 deletions tower-http/src/catch_panic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@
//! ```
use bytes::Bytes;
use futures_core::ready;
use futures_util::future::{CatchUnwind, FutureExt};
use http::{HeaderValue, Request, Response, StatusCode};
use http_body::{combinators::UnsyncBoxBody, Body, Full};
Expand All @@ -93,7 +92,7 @@ use std::{
future::Future,
panic::AssertUnwindSafe,
pin::Pin,
task::{Context, Poll},
task::{ready, Context, Poll},
};
use tower_layer::Layer;
use tower_service::Service;
Expand Down
6 changes: 3 additions & 3 deletions tower-http/src/classify/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ mod usable_for_retries {
Request<ReqB>: Clone,
E: std::error::Error + 'static,
{
type Future = futures::future::Ready<RetryBasedOnClassification<C>>;
type Future = std::future::Ready<RetryBasedOnClassification<C>>;

fn retry(
&self,
Expand All @@ -410,7 +410,7 @@ mod usable_for_retries {
self.classifier.clone().classify_response(res)
{
if class.err()?.is_retryable() {
return Some(futures::future::ready(self.clone()));
return Some(std::future::ready(self.clone()));
}
}

Expand All @@ -421,7 +421,7 @@ mod usable_for_retries {
.clone()
.classify_error(err)
.is_retryable()
.then(|| futures::future::ready(self.clone())),
.then(|| std::future::ready(self.clone())),
}
}

Expand Down
3 changes: 1 addition & 2 deletions tower-http/src/compression/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,14 @@ use async_compression::tokio::bufread::ZlibEncoder;
use async_compression::tokio::bufread::ZstdEncoder;

use bytes::{Buf, Bytes};
use futures_util::ready;
use http::HeaderMap;
use http_body::Body;
use pin_project_lite::pin_project;
use std::{
io,
marker::PhantomData,
pin::Pin,
task::{Context, Poll},
task::{ready, Context, Poll},
};
use tokio_util::io::StreamReader;

Expand Down
3 changes: 1 addition & 2 deletions tower-http/src/compression/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,13 @@ use crate::compression::predicate::Predicate;
use crate::compression::CompressionLevel;
use crate::compression_utils::WrapBody;
use crate::content_encoding::Encoding;
use futures_util::ready;
use http::{header, HeaderMap, HeaderValue, Response};
use http_body::Body;
use pin_project_lite::pin_project;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
task::{ready, Context, Poll},
};

pin_project! {
Expand Down
2 changes: 1 addition & 1 deletion tower-http/src/compression/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ mod tests {
let compressed_with_level = {
use async_compression::tokio::bufread::BrotliEncoder;

let stream = Box::pin(futures::stream::once(async move {
let stream = Box::pin(futures_util::stream::once(async move {
Ok::<_, std::io::Error>(DATA.as_bytes())
}));
let reader = StreamReader::new(stream);
Expand Down
5 changes: 2 additions & 3 deletions tower-http/src/compression_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@
use crate::{content_encoding::SupportedEncodings, BoxError};
use bytes::{Bytes, BytesMut};
use futures_core::Stream;
use futures_util::ready;
use futures_util::Stream;
use http::HeaderValue;
use http_body::Body;
use pin_project_lite::pin_project;
use std::{
io,
pin::Pin,
task::{Context, Poll},
task::{ready, Context, Poll},
};
use tokio::io::AsyncRead;
use tokio_util::io::{poll_read_buf, StreamReader};
Expand Down
3 changes: 1 addition & 2 deletions tower-http/src/cors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
#![allow(clippy::enum_variant_names)]

use bytes::{BufMut, BytesMut};
use futures_core::ready;
use http::{
header::{self, HeaderName},
HeaderMap, HeaderValue, Method, Request, Response,
Expand All @@ -60,7 +59,7 @@ use std::{
future::Future,
mem,
pin::Pin,
task::{Context, Poll},
task::{ready, Context, Poll},
};
use tower_layer::Layer;
use tower_service::Service;
Expand Down
8 changes: 6 additions & 2 deletions tower-http/src/decompression/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,16 @@ use async_compression::tokio::bufread::ZlibDecoder;
#[cfg(feature = "decompression-zstd")]
use async_compression::tokio::bufread::ZstdDecoder;
use bytes::{Buf, Bytes};
use futures_util::ready;
use http::HeaderMap;
use http_body::Body;
use pin_project_lite::pin_project;
use std::task::Context;
use std::{io, marker::PhantomData, pin::Pin, task::Poll};
use std::{
io,
marker::PhantomData,
pin::Pin,
task::{ready, Poll},
};
use tokio_util::io::StreamReader;

pin_project! {
Expand Down
3 changes: 1 addition & 2 deletions tower-http/src/decompression/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@
use super::{body::BodyInner, DecompressionBody};
use crate::compression_utils::{AcceptEncoding, CompressionLevel, WrapBody};
use crate::content_encoding::SupportedEncodings;
use futures_util::ready;
use http::{header, Response};
use http_body::Body;
use pin_project_lite::pin_project;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
task::{ready, Context, Poll},
};

pin_project! {
Expand Down
3 changes: 1 addition & 2 deletions tower-http/src/follow_redirect/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@
pub mod policy;

use self::policy::{Action, Attempt, Policy, Standard};
use futures_core::ready;
use futures_util::future::Either;
use http::{
header::LOCATION, HeaderMap, HeaderValue, Method, Request, Response, StatusCode, Uri, Version,
Expand All @@ -107,7 +106,7 @@ use std::{
mem,
pin::Pin,
str,
task::{Context, Poll},
task::{ready, Context, Poll},
};
use tower::util::Oneshot;
use tower_layer::Layer;
Expand Down
3 changes: 1 addition & 2 deletions tower-http/src/limit/future.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use super::body::create_error_response;
use super::ResponseBody;
use futures_core::ready;
use http::Response;
use http_body::Body;
use pin_project_lite::pin_project;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::task::{ready, Context, Poll};

pin_project! {
/// Response future for [`RequestBodyLimit`].
Expand Down
3 changes: 1 addition & 2 deletions tower-http/src/map_request_body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@
//! use http::{Request, Response};
//! use hyper::Body;
//! use std::convert::Infallible;
//! use std::{pin::Pin, task::{Context, Poll}};
//! use std::{pin::Pin, task::{ready, Context, Poll}};
//! use tower::{ServiceBuilder, service_fn, ServiceExt, Service};
//! use tower_http::map_request_body::MapRequestBodyLayer;
//! use futures::ready;
//!
//! // A wrapper for a `hyper::Body` that prints the size of data chunks
//! struct PrintChunkSizesBody {
Expand Down
6 changes: 2 additions & 4 deletions tower-http/src/map_response_body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@
//! use http::{Request, Response};
//! use hyper::Body;
//! use std::convert::Infallible;
//! use std::{pin::Pin, task::{Context, Poll}};
//! use std::{pin::Pin, task::{ready, Context, Poll}};
//! use tower::{ServiceBuilder, service_fn, ServiceExt, Service};
//! use tower_http::map_response_body::MapResponseBodyLayer;
//! use futures::ready;
//!
//! // A wrapper for a `hyper::Body` that prints the size of data chunks
//! struct PrintChunkSizesBody {
Expand Down Expand Up @@ -75,14 +74,13 @@
//! # }
//! ```
use futures_core::ready;
use http::{Request, Response};
use pin_project_lite::pin_project;
use std::future::Future;
use std::{
fmt,
pin::Pin,
task::{Context, Poll},
task::{ready, Context, Poll},
};
use tower_layer::Layer;
use tower_service::Service;
Expand Down
5 changes: 2 additions & 3 deletions tower-http/src/metrics/in_flight_requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
//! # }
//! ```
use futures_util::ready;
use http::{Request, Response};
use http_body::Body;
use pin_project_lite::pin_project;
Expand All @@ -61,7 +60,7 @@ use std::{
atomic::{AtomicUsize, Ordering},
Arc,
},
task::{Context, Poll},
task::{ready, Context, Poll},
time::Duration,
};
use tower_layer::Layer;
Expand Down Expand Up @@ -311,7 +310,7 @@ mod tests {
assert_eq!(counter.get(), 0);

// driving service to ready shouldn't increment the counter
futures::future::poll_fn(|cx| service.poll_ready(cx))
std::future::poll_fn(|cx| service.poll_ready(cx))
.await
.unwrap();
assert_eq!(counter.get(), 0);
Expand Down
3 changes: 1 addition & 2 deletions tower-http/src/propagate_header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,12 @@
//! # }
//! ```
use futures_util::ready;
use http::{header::HeaderName, HeaderValue, Request, Response};
use pin_project_lite::pin_project;
use std::future::Future;
use std::{
pin::Pin,
task::{Context, Poll},
task::{ready, Context, Poll},
};
use tower_layer::Layer;
use tower_service::Service;
Expand Down
Loading

0 comments on commit 2faae6d

Please sign in to comment.