Skip to content

Commit

Permalink
Merge pull request #411 from russelltg/stream_image_import
Browse files Browse the repository at this point in the history
#380: Add streaming image import
  • Loading branch information
fussybeaver authored Jun 17, 2024
2 parents 2cc8669 + 15ace25 commit 5abf647
Show file tree
Hide file tree
Showing 12 changed files with 244 additions and 91 deletions.
43 changes: 22 additions & 21 deletions src/container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::hash::Hash;
use std::pin::Pin;

use super::Docker;
use crate::docker::BodyType;
use crate::errors::Error;
use crate::models::*;
use crate::read::NewlineLogOutputDecoder;
Expand Down Expand Up @@ -1197,7 +1198,7 @@ impl Docker {
url,
Builder::new().method(Method::GET),
options,
Ok(Full::new(Bytes::new())),
Ok(BodyType::Left(Full::new(Bytes::new()))),
);

self.process_into_value(req).await
Expand Down Expand Up @@ -1299,7 +1300,7 @@ impl Docker {
&url,
Builder::new().method(Method::POST),
options,
Ok(Full::new(Bytes::new())),
Ok(BodyType::Left(Full::new(Bytes::new()))),
);

self.process_into_unit(req).await
Expand Down Expand Up @@ -1344,7 +1345,7 @@ impl Docker {
&url,
Builder::new().method(Method::POST),
options,
Ok(Full::new(Bytes::new())),
Ok(BodyType::Left(Full::new(Bytes::new()))),
);

self.process_into_unit(req).await
Expand Down Expand Up @@ -1393,7 +1394,7 @@ impl Docker {
&url,
Builder::new().method(Method::DELETE),
options,
Ok(Full::new(Bytes::new())),
Ok(BodyType::Left(Full::new(Bytes::new()))),
);

self.process_into_unit(req).await
Expand Down Expand Up @@ -1444,7 +1445,7 @@ impl Docker {
&url,
Builder::new().method(Method::POST),
options,
Ok(Full::new(Bytes::new())),
Ok(BodyType::Left(Full::new(Bytes::new()))),
);

self.process_into_stream(req).map(|res| match res {
Expand Down Expand Up @@ -1518,7 +1519,7 @@ impl Docker {
.header(CONNECTION, "Upgrade")
.header(UPGRADE, "tcp"),
options,
Ok(Full::new(Bytes::new())),
Ok(BodyType::Left(Full::new(Bytes::new()))),
);

let (read, write) = self.process_upgraded(req).await?;
Expand Down Expand Up @@ -1567,7 +1568,7 @@ impl Docker {
&url,
Builder::new().method(Method::POST),
Some(options),
Ok(Full::new(Bytes::new())),
Ok(BodyType::Left(Full::new(Bytes::new()))),
);

self.process_into_unit(req).await
Expand Down Expand Up @@ -1613,7 +1614,7 @@ impl Docker {
&url,
Builder::new().method(Method::POST),
options,
Ok(Full::new(Bytes::new())),
Ok(BodyType::Left(Full::new(Bytes::new()))),
);

self.process_into_unit(req).await
Expand Down Expand Up @@ -1658,7 +1659,7 @@ impl Docker {
&url,
Builder::new().method(Method::GET),
options,
Ok(Full::new(Bytes::new())),
Ok(BodyType::Left(Full::new(Bytes::new()))),
);

self.process_into_value(req).await
Expand Down Expand Up @@ -1706,7 +1707,7 @@ impl Docker {
&url,
Builder::new().method(Method::GET),
options,
Ok(Full::new(Bytes::new())),
Ok(BodyType::Left(Full::new(Bytes::new()))),
);

self.process_into_value(req).await
Expand Down Expand Up @@ -1759,7 +1760,7 @@ impl Docker {
&url,
Builder::new().method(Method::GET),
options,
Ok(Full::new(Bytes::new())),
Ok(BodyType::Left(Full::new(Bytes::new()))),
);

self.process_into_stream_string(req)
Expand Down Expand Up @@ -1798,7 +1799,7 @@ impl Docker {
&url,
Builder::new().method(Method::GET),
None::<String>,
Ok(Full::new(Bytes::new())),
Ok(BodyType::Left(Full::new(Bytes::new()))),
);

self.process_into_value(req).await
Expand Down Expand Up @@ -1846,7 +1847,7 @@ impl Docker {
&url,
Builder::new().method(Method::GET),
options,
Ok(Full::new(Bytes::new())),
Ok(BodyType::Left(Full::new(Bytes::new()))),
);

self.process_into_stream(req)
Expand Down Expand Up @@ -1895,7 +1896,7 @@ impl Docker {
&url,
Builder::new().method(Method::POST),
options,
Ok(Full::new(Bytes::new())),
Ok(BodyType::Left(Full::new(Bytes::new()))),
);

self.process_into_unit(req).await
Expand Down Expand Up @@ -1996,7 +1997,7 @@ impl Docker {
&url,
Builder::new().method(Method::POST),
Some(options),
Ok(Full::new(Bytes::new())),
Ok(BodyType::Left(Full::new(Bytes::new()))),
);

self.process_into_unit(req).await
Expand Down Expand Up @@ -2031,7 +2032,7 @@ impl Docker {
&url,
Builder::new().method(Method::POST),
None::<String>,
Ok(Full::new(Bytes::new())),
Ok(BodyType::Left(Full::new(Bytes::new()))),
);

self.process_into_unit(req).await
Expand Down Expand Up @@ -2066,7 +2067,7 @@ impl Docker {
&url,
Builder::new().method(Method::POST),
None::<String>,
Ok(Full::new(Bytes::new())),
Ok(BodyType::Left(Full::new(Bytes::new()))),
);

self.process_into_unit(req).await
Expand Down Expand Up @@ -2117,7 +2118,7 @@ impl Docker {
url,
Builder::new().method(Method::POST),
options,
Ok(Full::new(Bytes::new())),
Ok(BodyType::Left(Full::new(Bytes::new()))),
);

self.process_into_value(req).await
Expand Down Expand Up @@ -2176,7 +2177,7 @@ impl Docker {
.method(Method::PUT)
.header(CONTENT_TYPE, "application/x-tar"),
options,
Ok(Full::new(tar)),
Ok(BodyType::Left(Full::new(tar))),
);

self.process_into_unit(req).await
Expand Down Expand Up @@ -2224,7 +2225,7 @@ impl Docker {
&url,
Builder::new().method(Method::GET),
options,
Ok(Full::new(Bytes::new())),
Ok(BodyType::Left(Full::new(Bytes::new()))),
);

self.process_into_body(req)
Expand Down Expand Up @@ -2254,7 +2255,7 @@ impl Docker {
.method(Method::GET)
.header(CONTENT_TYPE, "application/json"),
None::<String>,
Ok(Full::new(Bytes::new())),
Ok(BodyType::Left(Full::new(Bytes::new()))),
);
self.process_into_body(req)
}
Expand Down
62 changes: 39 additions & 23 deletions src/docker.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use std::convert::Infallible;
#[cfg(feature = "ssl")]
use std::fs;
use std::future::Future;
#[cfg(feature = "ssl")]
use std::io;
#[cfg(feature = "ssl")]
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;
Expand All @@ -15,10 +17,11 @@ use futures_core::Stream;
use futures_util::future::FutureExt;
use futures_util::future::TryFutureExt;
use futures_util::stream::TryStreamExt;
use futures_util::StreamExt;
use http::header::CONTENT_TYPE;
use http::request::Builder;
use http_body_util::{BodyExt, Full};
use hyper::body::Incoming;
use http_body_util::{BodyExt, Full, StreamBody};
use hyper::body::{Frame, Incoming};
use hyper::{self, body::Bytes, Method, Request, Response, StatusCode};
#[cfg(feature = "ssl")]
use hyper_rustls::HttpsConnector;
Expand Down Expand Up @@ -94,23 +97,23 @@ pub(crate) enum ClientType {
/// with various Connect traits fulfilled.
pub(crate) enum Transport {
Http {
client: Client<HttpConnector, Full<Bytes>>,
client: Client<HttpConnector, BodyType>,
},
#[cfg(feature = "ssl")]
Https {
client: Client<HttpsConnector<HttpConnector>, Full<Bytes>>,
client: Client<HttpsConnector<HttpConnector>, BodyType>,
},
#[cfg(unix)]
Unix {
client: Client<UnixConnector, Full<Bytes>>,
client: Client<UnixConnector, BodyType>,
},
#[cfg(windows)]
NamedPipe {
client: Client<NamedPipeConnector, Full<Bytes>>,
client: Client<NamedPipeConnector, BodyType>,
},
#[cfg(test)]
Mock {
client: Client<yup_hyper_mock::HostToReplyConnector, Full<Bytes>>,
client: Client<yup_hyper_mock::HostToReplyConnector, BodyType>,
},
}

Expand Down Expand Up @@ -971,7 +974,7 @@ impl Docker {
impl Docker {
pub(crate) fn process_into_value<T>(
&self,
req: Result<Request<Full<Bytes>>, Error>,
req: Result<Request<BodyType>, Error>,
) -> impl Future<Output = Result<T, Error>>
where
T: DeserializeOwned,
Expand All @@ -982,7 +985,7 @@ impl Docker {

pub(crate) fn process_into_stream<T>(
&self,
req: Result<Request<Full<Bytes>>, Error>,
req: Result<Request<BodyType>, Error>,
) -> impl Stream<Item = Result<T, Error>> + Unpin
where
T: DeserializeOwned,
Expand All @@ -997,7 +1000,7 @@ impl Docker {

pub(crate) fn process_into_stream_string(
&self,
req: Result<Request<Full<Bytes>>, Error>,
req: Result<Request<BodyType>, Error>,
) -> impl Stream<Item = Result<LogOutput, Error>> + Unpin {
Box::pin(
self.process_request(req)
Expand All @@ -1008,7 +1011,7 @@ impl Docker {

pub(crate) fn process_into_unit(
&self,
req: Result<Request<Full<Bytes>>, Error>,
req: Result<Request<BodyType>, Error>,
) -> impl Future<Output = Result<(), Error>> {
let fut = self.process_request(req);
async move {
Expand All @@ -1019,7 +1022,7 @@ impl Docker {

pub(crate) fn process_into_body(
&self,
req: Result<Request<Full<Bytes>>, Error>,
req: Result<Request<BodyType>, Error>,
) -> impl Stream<Item = Result<Bytes, Error>> + Unpin {
Box::pin(
self.process_request(req)
Expand All @@ -1031,7 +1034,7 @@ impl Docker {

pub(crate) fn process_into_string(
&self,
req: Result<Request<Full<Bytes>>, Error>,
req: Result<Request<BodyType>, Error>,
) -> impl Future<Output = Result<String, Error>> {
let fut = self.process_request(req);
async move {
Expand All @@ -1042,7 +1045,7 @@ impl Docker {

pub(crate) async fn process_upgraded(
&self,
req: Result<Request<Full<Bytes>>, Error>,
req: Result<Request<BodyType>, Error>,
) -> Result<(impl AsyncRead, impl AsyncWrite), Error> {
let res = self.process_request(req).await?;
let upgraded = hyper::upgrade::on(res).await?;
Expand All @@ -1051,7 +1054,7 @@ impl Docker {
Ok(split(tokio_upgraded))
}

pub(crate) fn serialize_payload<S>(body: Option<S>) -> Result<Full<Bytes>, Error>
pub(crate) fn serialize_payload<S>(body: Option<S>) -> Result<BodyType, Error>
where
S: Serialize,
{
Expand All @@ -1063,8 +1066,8 @@ impl Docker {
.map(|payload| {
debug!("{}", payload.clone().unwrap_or_default());
payload
.map(|content| Full::new(content.into()))
.unwrap_or(Full::new(Bytes::new()))
.map(|content| BodyType::Left(Full::new(content.into())))
.unwrap_or(BodyType::Left(Full::new(Bytes::new())))
})
}

Expand All @@ -1091,7 +1094,7 @@ impl Docker {
"/version",
Builder::new().method(Method::GET),
None::<String>,
Ok(Full::new(Bytes::new())),
Ok(BodyType::Left(Full::new(Bytes::new()))),
);

let res = self
Expand Down Expand Up @@ -1123,12 +1126,16 @@ impl Docker {

pub(crate) fn process_request(
&self,
request: Result<Request<Full<Bytes>>, Error>,
request: Result<Request<BodyType>, Error>,
) -> impl Future<Output = Result<Response<Incoming>, Error>> {
let transport = self.transport.clone();
let timeout = self.client_timeout;

trace!("request: {:?}", request.as_ref());
match request.as_ref().map(|b| b.body()) {
Ok(http_body_util::Either::Left(bytes)) => trace!("request: {:?}", bytes),
Ok(http_body_util::Either::Right(_)) => trace!("request: (stream)"),
Err(e) => trace!("request: Err({e:?}"),
};

async move {
let request = request?;
Expand Down Expand Up @@ -1165,8 +1172,8 @@ impl Docker {
path: &str,
builder: Builder,
query: Option<O>,
payload: Result<Full<Bytes>, Error>,
) -> Result<Request<Full<Bytes>>, Error>
payload: Result<BodyType, Error>,
) -> Result<Request<BodyType>, Error>
where
O: Serialize,
{
Expand All @@ -1187,7 +1194,7 @@ impl Docker {

async fn execute_request(
transport: Arc<Transport>,
req: Request<Full<Bytes>>,
req: Request<BodyType>,
timeout: u64,
) -> Result<Response<Incoming>, Error> {
// This is where we determine to which transport we issue the request.
Expand Down Expand Up @@ -1254,3 +1261,12 @@ impl Docker {
})
}
}

pub(crate) type BodyType = http_body_util::Either<
Full<Bytes>,
StreamBody<Pin<Box<dyn Stream<Item = Result<Frame<Bytes>, Infallible>> + Send>>>,
>;

pub(crate) fn body_stream(body: impl Stream<Item = Bytes> + Send + 'static) -> BodyType {
BodyType::Right(StreamBody::new(Box::pin(body.map(|a| Ok(Frame::data(a))))))
}
Loading

0 comments on commit 5abf647

Please sign in to comment.