Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: ClickHouse/clickhouse-rs
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: v0.6.0
Choose a base ref
...
head repository: ClickHouse/clickhouse-rs
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: v0.6.1
Choose a head ref
  • 4 commits
  • 3 files changed
  • 2 contributors

Commits on Apr 9, 2021

  1. Copy the full SHA
    da1c503 View commit details
  2. Copy the full SHA
    25c6cd6 View commit details
  3. Merge pull request #7 from ods/decompress_error

    fix(compression): decompress error response (#6)
    loyd authored Apr 9, 2021
    Copy the full SHA
    dd7516b View commit details
  4. chore: bump version

    loyd committed Apr 9, 2021
    Copy the full SHA
    efa7190 View commit details
Showing with 66 additions and 32 deletions.
  1. +4 −0 CHANGELOG.md
  2. +1 −1 Cargo.toml
  3. +61 −31 src/response.rs
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -6,6 +6,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## [0.6.1]
### Fixed
- compression: decompress error messages (@ods)

## [0.6.0]
### Changed
- Use tokio v1, hyper v0.14, bytes v1
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "clickhouse"
version = "0.6.0"
version = "0.6.1"
description = "a typed client for ClickHouse"
keywords = ["clickhouse", "database", "driver", "tokio"]
authors = ["Paul Loyd <pavelko95@gmail.com>"]
92 changes: 61 additions & 31 deletions src/response.rs
Original file line number Diff line number Diff line change
@@ -10,7 +10,10 @@ use async_compression::stream::GzipDecoder;
#[cfg(feature = "zlib")]
use async_compression::stream::ZlibDecoder;
use bytes::Bytes;
use futures::stream::Stream;
use futures::{
future,
stream::{self, Stream, TryStreamExt},
};
use hyper::{body, client::ResponseFuture, Body, StatusCode};

#[cfg(feature = "lz4")]
@@ -22,40 +25,40 @@ use crate::{

pub enum Response {
Waiting(ResponseFuture, Compression),
Loading(Chunks),
Loading(Chunks<Body>),
}

impl Response {
pub fn new(future: ResponseFuture, compression: Compression) -> Self {
Self::Waiting(future, compression)
}

pub async fn resolve(&mut self) -> Result<&mut Chunks> {
pub async fn resolve(&mut self) -> Result<&mut Chunks<Body>> {
if let Self::Waiting(response, compression) = self {
let response = response.await?;
let status = response.status();
let body = response.into_body();

if response.status() != StatusCode::OK {
let bytes = body::to_bytes(response.into_body()).await?;
let reason = String::from_utf8_lossy(&bytes).trim().into();
if status != StatusCode::OK {
// TODO(loyd): test decompression of error response
let bytes = body::to_bytes(body).await?;

let mut chunks = decompress_stream(
stream::once(future::ready(Result::<_>::Ok(bytes.clone()))),
*compression,
);
let bytes = match chunks.try_next().await {
Ok(chunk) => chunk.unwrap_or_default(),
// Original response is more useful than decompression error
Err(_) => bytes,
};

let reason = String::from_utf8_lossy(&bytes).trim().into();
return Err(Error::BadResponse(reason));
}

let body = response.into_body();
let chunks = match compression {
Compression::None => Inner::Plain(body),
#[cfg(feature = "lz4")]
Compression::Lz4 => Inner::Lz4(Lz4Decoder::new(body)),
#[cfg(feature = "gzip")]
Compression::Gzip => Inner::Gzip(Box::new(GzipDecoder::new(BodyAdapter(body)))),
#[cfg(feature = "zlib")]
Compression::Zlib => Inner::Zlib(Box::new(ZlibDecoder::new(BodyAdapter(body)))),
#[cfg(feature = "brotli")]
Compression::Brotli => {
Inner::Brotli(Box::new(BrotliDecoder::new(BodyAdapter(body))))
}
};
*self = Self::Loading(Chunks(chunks));
let chunks = decompress_stream(body, *compression);
*self = Self::Loading(chunks);
}

match self {
@@ -65,22 +68,44 @@ impl Response {
}
}

pub struct Chunks(Inner);
fn decompress_stream<S, E>(stream: S, compression: Compression) -> Chunks<S>
where
S: Stream<Item = Result<Bytes, E>> + Unpin,
Error: From<E>,
{
Chunks(match compression {
Compression::None => Inner::Plain(stream),
#[cfg(feature = "lz4")]
Compression::Lz4 => Inner::Lz4(Lz4Decoder::new(stream)),
#[cfg(feature = "gzip")]
Compression::Gzip => Inner::Gzip(Box::new(GzipDecoder::new(BodyAdapter(stream)))),
#[cfg(feature = "zlib")]
Compression::Zlib => Inner::Zlib(Box::new(ZlibDecoder::new(BodyAdapter(stream)))),
#[cfg(feature = "brotli")]
Compression::Brotli => Inner::Brotli(Box::new(BrotliDecoder::new(BodyAdapter(stream)))),
})
}

pub struct Chunks<S>(Inner<S>);

enum Inner {
Plain(Body),
enum Inner<S> {
Plain(S),
#[cfg(feature = "lz4")]
Lz4(Lz4Decoder<Body>),
Lz4(Lz4Decoder<S>),
#[cfg(feature = "gzip")]
Gzip(Box<GzipDecoder<BodyAdapter>>),
Gzip(Box<GzipDecoder<BodyAdapter<S>>>),
#[cfg(feature = "zlib")]
Zlib(Box<ZlibDecoder<BodyAdapter>>),
Zlib(Box<ZlibDecoder<BodyAdapter<S>>>),
#[cfg(feature = "brotli")]
Brotli(Box<BrotliDecoder<BodyAdapter>>),
Brotli(Box<BrotliDecoder<BodyAdapter<S>>>),
Empty,
}

impl Stream for Chunks {
impl<S, E> Stream for Chunks<S>
where
S: Stream<Item = Result<Bytes, E>> + Unpin,
Error: From<E>,
{
type Item = Result<Bytes>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
@@ -122,11 +147,16 @@ impl Stream for Chunks {
}
}

// async-compression depends on bytes-0.5, therefore adapter to convert
#[cfg(any(feature = "gzip", feature = "zlib", feature = "brotli"))]
struct BodyAdapter(Body);
struct BodyAdapter<S>(S);

#[cfg(any(feature = "gzip", feature = "zlib", feature = "brotli"))]
impl Stream for BodyAdapter {
impl<S, E> Stream for BodyAdapter<S>
where
S: Stream<Item = Result<Bytes, E>> + Unpin,
Error: From<E>,
{
type Item = std::io::Result<bytes_05::Bytes>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {