diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 485df8d86..cb28637a9 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -43,7 +43,6 @@ jobs: - run: cargo test --features cloudflare_zlib --no-default-features if: matrix.build != 'mingw' - run: cargo test --features miniz-sys --no-default-features - - run: cargo test --features tokio rustfmt: name: Rustfmt diff --git a/Cargo.toml b/Cargo.toml index 08ee25b60..4b253afb2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,8 +25,6 @@ cfg-if = "1.0.0" miniz-sys = { path = "miniz-sys", version = "0.1.11", optional = true } libz-sys = { version = "1.1.0", optional = true, default-features = false } cloudflare-zlib-sys = { version = "0.3.0", optional = true } -tokio-io = { version = "0.1.11", optional = true } -futures = { version = "0.1.25", optional = true } miniz_oxide = { version = "0.4.0", optional = true, default-features = false } crc32fast = "1.2.0" @@ -36,10 +34,6 @@ miniz_oxide = { version = "0.4.0", default-features = false } [dev-dependencies] rand = "0.7" quickcheck = { version = "0.9", default-features = false } -tokio-io = "0.1.11" -tokio-tcp = "0.1.3" -tokio-threadpool = "0.1.10" -futures = "0.1" [features] default = ["rust_backend"] @@ -48,4 +42,3 @@ zlib = ["any_zlib", "libz-sys"] zlib-ng-compat = ["zlib", "libz-sys/zlib-ng"] cloudflare_zlib = ["any_zlib", "cloudflare-zlib-sys"] rust_backend = ["miniz_oxide"] -tokio = ["tokio-io", "futures"] diff --git a/src/deflate/bufread.rs b/src/deflate/bufread.rs index 98aee70b0..f0b29e0b4 100644 --- a/src/deflate/bufread.rs +++ b/src/deflate/bufread.rs @@ -2,11 +2,6 @@ use std::io; use std::io::prelude::*; use std::mem; -#[cfg(feature = "tokio")] -use futures::Poll; -#[cfg(feature = "tokio")] -use tokio_io::{AsyncRead, AsyncWrite}; - use crate::zio; use crate::{Compress, Decompress}; @@ -116,9 +111,6 @@ impl Read for DeflateEncoder { } } -#[cfg(feature = "tokio")] -impl AsyncRead for DeflateEncoder {} - impl Write for DeflateEncoder { fn write(&mut self, buf: &[u8]) -> io::Result { self.get_mut().write(buf) @@ -129,13 +121,6 @@ impl Write for DeflateEncoder { } } -#[cfg(feature = "tokio")] -impl AsyncWrite for DeflateEncoder { - fn shutdown(&mut self) -> Poll<(), io::Error> { - self.get_mut().shutdown() - } -} - /// A DEFLATE decoder, or decompressor. /// /// This structure consumes a [`BufRead`] interface, reading compressed data @@ -247,9 +232,6 @@ impl Read for DeflateDecoder { } } -#[cfg(feature = "tokio")] -impl AsyncRead for DeflateDecoder {} - impl Write for DeflateDecoder { fn write(&mut self, buf: &[u8]) -> io::Result { self.get_mut().write(buf) @@ -259,10 +241,3 @@ impl Write for DeflateDecoder { self.get_mut().flush() } } - -#[cfg(feature = "tokio")] -impl AsyncWrite for DeflateDecoder { - fn shutdown(&mut self) -> Poll<(), io::Error> { - self.get_mut().shutdown() - } -} diff --git a/src/deflate/read.rs b/src/deflate/read.rs index 21748e72c..fd17a894a 100644 --- a/src/deflate/read.rs +++ b/src/deflate/read.rs @@ -1,11 +1,6 @@ use std::io; use std::io::prelude::*; -#[cfg(feature = "tokio")] -use futures::Poll; -#[cfg(feature = "tokio")] -use tokio_io::{AsyncRead, AsyncWrite}; - use super::bufread; use crate::bufreader::BufReader; @@ -113,9 +108,6 @@ impl Read for DeflateEncoder { } } -#[cfg(feature = "tokio")] -impl AsyncRead for DeflateEncoder {} - impl Write for DeflateEncoder { fn write(&mut self, buf: &[u8]) -> io::Result { self.get_mut().write(buf) @@ -126,13 +118,6 @@ impl Write for DeflateEncoder { } } -#[cfg(feature = "tokio")] -impl AsyncWrite for DeflateEncoder { - fn shutdown(&mut self) -> Poll<(), io::Error> { - self.get_mut().shutdown() - } -} - /// A DEFLATE decoder, or decompressor. /// /// This structure implements a [`Read`] interface and takes a stream of @@ -245,9 +230,6 @@ impl Read for DeflateDecoder { } } -#[cfg(feature = "tokio")] -impl AsyncRead for DeflateDecoder {} - impl Write for DeflateDecoder { fn write(&mut self, buf: &[u8]) -> io::Result { self.get_mut().write(buf) @@ -257,10 +239,3 @@ impl Write for DeflateDecoder { self.get_mut().flush() } } - -#[cfg(feature = "tokio")] -impl AsyncWrite for DeflateDecoder { - fn shutdown(&mut self) -> Poll<(), io::Error> { - self.get_mut().shutdown() - } -} diff --git a/src/deflate/write.rs b/src/deflate/write.rs index 3931f525f..2c44556ac 100644 --- a/src/deflate/write.rs +++ b/src/deflate/write.rs @@ -1,11 +1,6 @@ use std::io; use std::io::prelude::*; -#[cfg(feature = "tokio")] -use futures::Poll; -#[cfg(feature = "tokio")] -use tokio_io::{AsyncRead, AsyncWrite}; - use crate::zio; use crate::{Compress, Decompress}; @@ -166,23 +161,12 @@ impl Write for DeflateEncoder { } } -#[cfg(feature = "tokio")] -impl AsyncWrite for DeflateEncoder { - fn shutdown(&mut self) -> Poll<(), io::Error> { - self.inner.finish()?; - self.inner.get_mut().shutdown() - } -} - impl Read for DeflateEncoder { fn read(&mut self, buf: &mut [u8]) -> io::Result { self.inner.get_mut().read(buf) } } -#[cfg(feature = "tokio")] -impl AsyncRead for DeflateEncoder {} - /// A DEFLATE decoder, or decompressor. /// /// This structure implements a [`Write`] and will emit a stream of decompressed @@ -331,19 +315,8 @@ impl Write for DeflateDecoder { } } -#[cfg(feature = "tokio")] -impl AsyncWrite for DeflateDecoder { - fn shutdown(&mut self) -> Poll<(), io::Error> { - self.inner.finish()?; - self.inner.get_mut().shutdown() - } -} - impl Read for DeflateDecoder { fn read(&mut self, buf: &mut [u8]) -> io::Result { self.inner.get_mut().read(buf) } } - -#[cfg(feature = "tokio")] -impl AsyncRead for DeflateDecoder {} diff --git a/src/gz/bufread.rs b/src/gz/bufread.rs index 6fac718ab..6be144d0c 100644 --- a/src/gz/bufread.rs +++ b/src/gz/bufread.rs @@ -3,11 +3,6 @@ use std::io; use std::io::prelude::*; use std::mem; -#[cfg(feature = "tokio")] -use futures::Poll; -#[cfg(feature = "tokio")] -use tokio_io::{AsyncRead, AsyncWrite}; - use super::{GzBuilder, GzHeader}; use super::{FCOMMENT, FEXTRA, FHCRC, FNAME}; use crate::crc::{Crc, CrcReader}; @@ -610,9 +605,6 @@ impl Read for GzDecoder { } } -#[cfg(feature = "tokio")] -impl AsyncRead for GzDecoder {} - impl Write for GzDecoder { fn write(&mut self, buf: &[u8]) -> io::Result { self.get_mut().write(buf) @@ -623,13 +615,6 @@ impl Write for GzDecoder { } } -#[cfg(feature = "tokio")] -impl AsyncWrite for GzDecoder { - fn shutdown(&mut self) -> Poll<(), io::Error> { - self.get_mut().shutdown() - } -} - /// A gzip streaming decoder that decodes all members of a multistream /// /// A gzip member consists of a header, compressed data and a trailer. The [gzip @@ -713,26 +698,6 @@ impl Read for MultiGzDecoder { } } -#[cfg(feature = "tokio")] -impl AsyncRead for MultiGzDecoder {} - -impl Write for MultiGzDecoder { - fn write(&mut self, buf: &[u8]) -> io::Result { - self.get_mut().write(buf) - } - - fn flush(&mut self) -> io::Result<()> { - self.get_mut().flush() - } -} - -#[cfg(feature = "tokio")] -impl AsyncWrite for MultiGzDecoder { - fn shutdown(&mut self) -> Poll<(), io::Error> { - self.get_mut().shutdown() - } -} - #[cfg(test)] pub mod tests { use crate::gz::bufread::*; diff --git a/src/gz/read.rs b/src/gz/read.rs index 79fdb4174..dbbe63282 100644 --- a/src/gz/read.rs +++ b/src/gz/read.rs @@ -1,11 +1,6 @@ use std::io; use std::io::prelude::*; -#[cfg(feature = "tokio")] -use futures::Poll; -#[cfg(feature = "tokio")] -use tokio_io::{AsyncRead, AsyncWrite}; - use super::bufread; use super::{GzBuilder, GzHeader}; use crate::bufreader::BufReader; @@ -175,9 +170,6 @@ impl Read for GzDecoder { } } -#[cfg(feature = "tokio")] -impl AsyncRead for GzDecoder {} - impl Write for GzDecoder { fn write(&mut self, buf: &[u8]) -> io::Result { self.get_mut().write(buf) @@ -188,13 +180,6 @@ impl Write for GzDecoder { } } -#[cfg(feature = "tokio")] -impl AsyncWrite for GzDecoder { - fn shutdown(&mut self) -> Poll<(), io::Error> { - self.get_mut().shutdown() - } -} - /// A gzip streaming decoder that decodes all members of a multistream /// /// A gzip member consists of a header, compressed data and a trailer. The [gzip @@ -282,9 +267,6 @@ impl Read for MultiGzDecoder { } } -#[cfg(feature = "tokio")] -impl AsyncRead for MultiGzDecoder {} - impl Write for MultiGzDecoder { fn write(&mut self, buf: &[u8]) -> io::Result { self.get_mut().write(buf) @@ -294,10 +276,3 @@ impl Write for MultiGzDecoder { self.get_mut().flush() } } - -#[cfg(feature = "tokio")] -impl AsyncWrite for MultiGzDecoder { - fn shutdown(&mut self) -> Poll<(), io::Error> { - self.get_mut().shutdown() - } -} diff --git a/src/gz/write.rs b/src/gz/write.rs index 48c76e2fa..7cf1a7cd4 100644 --- a/src/gz/write.rs +++ b/src/gz/write.rs @@ -2,11 +2,6 @@ use std::cmp; use std::io; use std::io::prelude::*; -#[cfg(feature = "tokio")] -use futures::Poll; -#[cfg(feature = "tokio")] -use tokio_io::{AsyncRead, AsyncWrite}; - use super::bufread::{corrupt, read_gz_header}; use super::{GzBuilder, GzHeader}; use crate::crc::{Crc, CrcWriter}; @@ -158,23 +153,12 @@ impl Write for GzEncoder { } } -#[cfg(feature = "tokio")] -impl AsyncWrite for GzEncoder { - fn shutdown(&mut self) -> Poll<(), io::Error> { - self.try_finish()?; - self.get_mut().shutdown() - } -} - impl Read for GzEncoder { fn read(&mut self, buf: &mut [u8]) -> io::Result { self.get_mut().read(buf) } } -#[cfg(feature = "tokio")] -impl AsyncRead for GzEncoder {} - impl Drop for GzEncoder { fn drop(&mut self) { if self.inner.is_present() { @@ -383,23 +367,12 @@ impl Write for GzDecoder { } } -#[cfg(feature = "tokio")] -impl AsyncWrite for GzDecoder { - fn shutdown(&mut self) -> Poll<(), io::Error> { - self.try_finish()?; - self.inner.get_mut().get_mut().shutdown() - } -} - impl Read for GzDecoder { fn read(&mut self, buf: &mut [u8]) -> io::Result { self.inner.get_mut().get_mut().read(buf) } } -#[cfg(feature = "tokio")] -impl AsyncRead for GzDecoder {} - #[cfg(test)] mod tests { use super::*; diff --git a/src/lib.rs b/src/lib.rs index 178803f4d..b5cfa5c16 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -77,30 +77,6 @@ //! [read]: https://doc.rust-lang.org/std/io/trait.Read.html //! [write]: https://doc.rust-lang.org/std/io/trait.Write.html //! [bufread]: https://doc.rust-lang.org/std/io/trait.BufRead.html -//! -//! # Async I/O -//! -//! This crate optionally can support async I/O streams with the [Tokio stack] via -//! the `tokio` feature of this crate: -//! -//! [Tokio stack]: https://tokio.rs/ -//! -//! ```toml -//! flate2 = { version = "0.2", features = ["tokio"] } -//! ``` -//! -//! All methods are internally capable of working with streams that may return -//! [`ErrorKind::WouldBlock`] when they're not ready to perform the particular -//! operation. -//! -//! [`ErrorKind::WouldBlock`]: https://doc.rust-lang.org/std/io/enum.ErrorKind.html -//! -//! Note that care needs to be taken when using these objects, however. The -//! Tokio runtime, in particular, requires that data is fully flushed before -//! dropping streams. For compatibility with blocking streams all streams are -//! flushed/written when they are dropped, and this is not always a suitable -//! time to perform I/O. If I/O streams are flushed before drop, however, then -//! these operations will be a noop. #![doc(html_root_url = "https://docs.rs/flate2/0.2")] #![deny(missing_docs)] #![deny(missing_debug_implementations)] diff --git a/src/zlib/bufread.rs b/src/zlib/bufread.rs index 182f0647d..f1d323165 100644 --- a/src/zlib/bufread.rs +++ b/src/zlib/bufread.rs @@ -2,11 +2,6 @@ use std::io; use std::io::prelude::*; use std::mem; -#[cfg(feature = "tokio")] -use futures::Poll; -#[cfg(feature = "tokio")] -use tokio_io::{AsyncRead, AsyncWrite}; - use crate::zio; use crate::{Compress, Decompress}; @@ -112,9 +107,6 @@ impl Read for ZlibEncoder { } } -#[cfg(feature = "tokio")] -impl AsyncRead for ZlibEncoder {} - impl Write for ZlibEncoder { fn write(&mut self, buf: &[u8]) -> io::Result { self.get_mut().write(buf) @@ -125,13 +117,6 @@ impl Write for ZlibEncoder { } } -#[cfg(feature = "tokio")] -impl AsyncWrite for ZlibEncoder { - fn shutdown(&mut self) -> Poll<(), io::Error> { - self.get_mut().shutdown() - } -} - /// A ZLIB decoder, or decompressor. /// /// This structure consumes a [`BufRead`] interface, reading compressed data @@ -237,9 +222,6 @@ impl Read for ZlibDecoder { } } -#[cfg(feature = "tokio")] -impl AsyncRead for ZlibDecoder {} - impl Write for ZlibDecoder { fn write(&mut self, buf: &[u8]) -> io::Result { self.get_mut().write(buf) @@ -249,10 +231,3 @@ impl Write for ZlibDecoder { self.get_mut().flush() } } - -#[cfg(feature = "tokio")] -impl AsyncWrite for ZlibDecoder { - fn shutdown(&mut self) -> Poll<(), io::Error> { - self.get_mut().shutdown() - } -} diff --git a/src/zlib/read.rs b/src/zlib/read.rs index 43f350246..509493166 100644 --- a/src/zlib/read.rs +++ b/src/zlib/read.rs @@ -1,11 +1,6 @@ use std::io; use std::io::prelude::*; -#[cfg(feature = "tokio")] -use futures::Poll; -#[cfg(feature = "tokio")] -use tokio_io::{AsyncRead, AsyncWrite}; - use super::bufread; use crate::bufreader::BufReader; @@ -110,9 +105,6 @@ impl Read for ZlibEncoder { } } -#[cfg(feature = "tokio")] -impl AsyncRead for ZlibEncoder {} - impl Write for ZlibEncoder { fn write(&mut self, buf: &[u8]) -> io::Result { self.get_mut().write(buf) @@ -123,13 +115,6 @@ impl Write for ZlibEncoder { } } -#[cfg(feature = "tokio")] -impl AsyncWrite for ZlibEncoder { - fn shutdown(&mut self) -> Poll<(), io::Error> { - self.get_mut().shutdown() - } -} - /// A ZLIB decoder, or decompressor. /// /// This structure implements a [`Read`] interface and takes a stream of @@ -244,9 +229,6 @@ impl Read for ZlibDecoder { } } -#[cfg(feature = "tokio")] -impl AsyncRead for ZlibDecoder {} - impl Write for ZlibDecoder { fn write(&mut self, buf: &[u8]) -> io::Result { self.get_mut().write(buf) @@ -256,10 +238,3 @@ impl Write for ZlibDecoder { self.get_mut().flush() } } - -#[cfg(feature = "tokio")] -impl AsyncWrite for ZlibDecoder { - fn shutdown(&mut self) -> Poll<(), io::Error> { - self.get_mut().shutdown() - } -} diff --git a/src/zlib/write.rs b/src/zlib/write.rs index f6840336f..c67181402 100644 --- a/src/zlib/write.rs +++ b/src/zlib/write.rs @@ -1,11 +1,6 @@ use std::io; use std::io::prelude::*; -#[cfg(feature = "tokio")] -use futures::Poll; -#[cfg(feature = "tokio")] -use tokio_io::{AsyncRead, AsyncWrite}; - use crate::zio; use crate::{Compress, Decompress}; @@ -166,23 +161,12 @@ impl Write for ZlibEncoder { } } -#[cfg(feature = "tokio")] -impl AsyncWrite for ZlibEncoder { - fn shutdown(&mut self) -> Poll<(), io::Error> { - self.try_finish()?; - self.get_mut().shutdown() - } -} - impl Read for ZlibEncoder { fn read(&mut self, buf: &mut [u8]) -> io::Result { self.get_mut().read(buf) } } -#[cfg(feature = "tokio")] -impl AsyncRead for ZlibEncoder {} - /// A ZLIB decoder, or decompressor. /// /// This structure implements a [`Write`] and will emit a stream of decompressed @@ -330,19 +314,8 @@ impl Write for ZlibDecoder { } } -#[cfg(feature = "tokio")] -impl AsyncWrite for ZlibDecoder { - fn shutdown(&mut self) -> Poll<(), io::Error> { - self.inner.finish()?; - self.inner.get_mut().shutdown() - } -} - impl Read for ZlibDecoder { fn read(&mut self, buf: &mut [u8]) -> io::Result { self.inner.get_mut().read(buf) } } - -#[cfg(feature = "tokio")] -impl AsyncRead for ZlibDecoder {} diff --git a/tests/async-reader.rs b/tests/async-reader.rs deleted file mode 100644 index 16dae65f9..000000000 --- a/tests/async-reader.rs +++ /dev/null @@ -1,96 +0,0 @@ -extern crate flate2; -extern crate futures; -extern crate tokio_io; - -use flate2::read::{GzDecoder, MultiGzDecoder}; -use futures::prelude::*; -use futures::task; -use std::cmp; -use std::fs::File; -use std::io::{self, Read}; -use tokio_io::io::read_to_end; -use tokio_io::AsyncRead; - -struct BadReader { - reader: T, - x: bool, -} - -impl BadReader { - fn new(reader: T) -> BadReader { - BadReader { reader, x: true } - } -} - -impl Read for BadReader { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - if self.x { - self.x = false; - let len = cmp::min(buf.len(), 1); - self.reader.read(&mut buf[..len]) - } else { - self.x = true; - Err(io::ErrorKind::WouldBlock.into()) - } - } -} - -struct AssertAsync(T); - -impl Read for AssertAsync { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - self.0.read(buf) - } -} - -impl AsyncRead for AssertAsync {} - -struct AlwaysNotify(T); - -impl Future for AlwaysNotify { - type Item = T::Item; - type Error = T::Error; - - fn poll(&mut self) -> Poll { - let ret = self.0.poll(); - if let Ok(Async::NotReady) = &ret { - task::current().notify(); - } - ret - } -} - -#[test] -fn test_gz_asyncread() { - let f = File::open("tests/good-file.gz").unwrap(); - - let fut = read_to_end(AssertAsync(GzDecoder::new(BadReader::new(f))), Vec::new()); - let (_, content) = AlwaysNotify(fut).wait().unwrap(); - - let mut expected = Vec::new(); - File::open("tests/good-file.txt") - .unwrap() - .read_to_end(&mut expected) - .unwrap(); - - assert_eq!(content, expected); -} - -#[test] -fn test_multi_gz_asyncread() { - let f = File::open("tests/multi.gz").unwrap(); - - let fut = read_to_end( - AssertAsync(MultiGzDecoder::new(BadReader::new(f))), - Vec::new(), - ); - let (_, content) = AlwaysNotify(fut).wait().unwrap(); - - let mut expected = Vec::new(); - File::open("tests/multi.txt") - .unwrap() - .read_to_end(&mut expected) - .unwrap(); - - assert_eq!(content, expected); -} diff --git a/tests/tokio.rs b/tests/tokio.rs deleted file mode 100644 index 0f7364640..000000000 --- a/tests/tokio.rs +++ /dev/null @@ -1,133 +0,0 @@ -#![cfg(feature = "tokio")] - -extern crate flate2; -extern crate futures; -extern crate rand; -extern crate tokio_io; -extern crate tokio_tcp; -extern crate tokio_threadpool; - -use std::io::{Read, Write}; -use std::iter; -use std::net::{Shutdown, TcpListener}; -use std::thread; - -use flate2::read; -use flate2::write; -use flate2::Compression; -use futures::Future; -use rand::{thread_rng, Rng}; -use tokio_io::io::{copy, shutdown}; -use tokio_io::AsyncRead; -use tokio_tcp::TcpStream; - -#[test] -fn tcp_stream_echo_pattern() { - const N: u8 = 16; - const M: usize = 16 * 1024; - - let listener = TcpListener::bind("127.0.0.1:0").unwrap(); - let addr = listener.local_addr().unwrap(); - let t = thread::spawn(move || { - let a = listener.accept().unwrap().0; - let b = a.try_clone().unwrap(); - - let t = thread::spawn(move || { - let mut b = read::DeflateDecoder::new(b); - let mut buf = [0; M]; - for i in 0..N { - b.read_exact(&mut buf).unwrap(); - for byte in buf.iter() { - assert_eq!(*byte, i); - } - } - - assert_eq!(b.read(&mut buf).unwrap(), 0); - }); - - let mut a = write::ZlibEncoder::new(a, Compression::default()); - for i in 0..N { - let buf = [i; M]; - a.write_all(&buf).unwrap(); - } - a.finish().unwrap().shutdown(Shutdown::Write).unwrap(); - - t.join().unwrap(); - }); - - let stream = TcpStream::connect(&addr); - let copy = stream - .and_then(|s| { - let (a, b) = s.split(); - let a = read::ZlibDecoder::new(a); - let b = write::DeflateEncoder::new(b, Compression::default()); - copy(a, b) - }) - .then(|result| { - let (amt, _a, b) = result.unwrap(); - assert_eq!(amt, (N as u64) * (M as u64)); - shutdown(b).map(|_| ()) - }) - .map_err(|err| panic!("{}", err)); - - let threadpool = tokio_threadpool::Builder::new().build(); - threadpool.spawn(copy); - threadpool.shutdown().wait().unwrap(); - t.join().unwrap(); -} - -#[test] -fn echo_random() { - let v = iter::repeat(()) - .take(1024 * 1024) - .map(|()| thread_rng().gen::()) - .collect::>(); - let listener = TcpListener::bind("127.0.0.1:0").unwrap(); - let addr = listener.local_addr().unwrap(); - let v2 = v.clone(); - let t = thread::spawn(move || { - let a = listener.accept().unwrap().0; - let b = a.try_clone().unwrap(); - - let mut v3 = v2.clone(); - let t = thread::spawn(move || { - let mut b = read::DeflateDecoder::new(b); - let mut buf = [0; 1024]; - while v3.len() > 0 { - let n = b.read(&mut buf).unwrap(); - for (actual, expected) in buf[..n].iter().zip(&v3) { - assert_eq!(*actual, *expected); - } - v3.drain(..n); - } - - assert_eq!(b.read(&mut buf).unwrap(), 0); - }); - - let mut a = write::ZlibEncoder::new(a, Compression::default()); - a.write_all(&v2).unwrap(); - a.finish().unwrap().shutdown(Shutdown::Write).unwrap(); - - t.join().unwrap(); - }); - - let stream = TcpStream::connect(&addr); - let copy = stream - .and_then(|s| { - let (a, b) = s.split(); - let a = read::ZlibDecoder::new(a); - let b = write::DeflateEncoder::new(b, Compression::default()); - copy(a, b) - }) - .then(move |result| { - let (amt, _a, b) = result.unwrap(); - assert_eq!(amt, v.len() as u64); - shutdown(b).map(|_| ()) - }) - .map_err(|err| panic!("{}", err)); - - let threadpool = tokio_threadpool::Builder::new().build(); - threadpool.spawn(copy); - threadpool.shutdown().wait().unwrap(); - t.join().unwrap(); -}