Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(core): Make sure OpenDAL works with http2 on GCS #2956

Merged
merged 2 commits into from
Aug 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 1 addition & 9 deletions core/src/services/gcs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use std::fmt::Formatter;
use std::sync::Arc;

use async_trait::async_trait;
use http::header::HOST;
use http::StatusCode;
use log::debug;
use reqsign::GoogleCredentialLoader;
Expand Down Expand Up @@ -586,14 +585,7 @@ impl Accessor for GcsBackend {
self.core.sign_query(&mut req, args.expire()).await?;

// We don't need this request anymore, consume it directly.
let (mut parts, _) = req.into_parts();
// Always remove host header, let users' client to set it based on HTTP
// version.
//
// As discussed in <https://github.com/seanmonstar/reqwest/issues/1809>,
// google server could send RST_STREAM of PROTOCOL_ERROR if our request
// contains host header.
parts.headers.remove(HOST);
let (parts, _) = req.into_parts();

Ok(RpPresign::new(PresignedRequest::new(
parts.method,
Expand Down
27 changes: 25 additions & 2 deletions core/src/services/gcs/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use bytes::Bytes;
use http::header::CONTENT_LENGTH;
use http::header::CONTENT_RANGE;
use http::header::CONTENT_TYPE;
use http::header::HOST;
use http::header::IF_MATCH;
use http::header::IF_NONE_MATCH;
use http::Request;
Expand Down Expand Up @@ -107,15 +108,37 @@ impl GcsCore {
pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> {
let cred = self.load_token().await?;

self.signer.sign(req, &cred).map_err(new_request_sign_error)
self.signer
.sign(req, &cred)
.map_err(new_request_sign_error)?;

// Always remove host header, let users' client to set it based on HTTP
// version.
//
// As discussed in <https://github.com/seanmonstar/reqwest/issues/1809>,
// google server could send RST_STREAM of PROTOCOL_ERROR if our request
// contains host header.
req.headers_mut().remove(HOST);

Ok(())
}

pub async fn sign_query<T>(&self, req: &mut Request<T>, duration: Duration) -> Result<()> {
let cred = self.load_credential()?;

self.signer
.sign_query(req, duration, &cred)
.map_err(new_request_sign_error)
.map_err(new_request_sign_error)?;

// Always remove host header, let users' client to set it based on HTTP
// version.
//
// As discussed in <https://github.com/seanmonstar/reqwest/issues/1809>,
// google server could send RST_STREAM of PROTOCOL_ERROR if our request
// contains host header.
req.headers_mut().remove(HOST);

Ok(())
}

#[inline]
Expand Down
27 changes: 25 additions & 2 deletions core/src/services/s3/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use http::header::CACHE_CONTROL;
use http::header::CONTENT_DISPOSITION;
use http::header::CONTENT_LENGTH;
use http::header::CONTENT_TYPE;
use http::header::HOST;
use http::header::IF_MATCH;
use http::header::IF_NONE_MATCH;
use http::HeaderValue;
Expand Down Expand Up @@ -127,7 +128,19 @@ impl S3Core {
return Ok(());
};

self.signer.sign(req, &cred).map_err(new_request_sign_error)
self.signer
.sign(req, &cred)
.map_err(new_request_sign_error)?;

// Always remove host header, let users' client to set it based on HTTP
// version.
//
// As discussed in <https://github.com/seanmonstar/reqwest/issues/1809>,
// google server could send RST_STREAM of PROTOCOL_ERROR if our request
// contains host header.
req.headers_mut().remove(HOST);

Ok(())
}

pub async fn sign_query<T>(&self, req: &mut Request<T>, duration: Duration) -> Result<()> {
Expand All @@ -139,7 +152,17 @@ impl S3Core {

self.signer
.sign_query(req, duration, &cred)
.map_err(new_request_sign_error)
.map_err(new_request_sign_error)?;

// Always remove host header, let users' client to set it based on HTTP
// version.
//
// As discussed in <https://github.com/seanmonstar/reqwest/issues/1809>,
// google server could send RST_STREAM of PROTOCOL_ERROR if our request
// contains host header.
req.headers_mut().remove(HOST);

Ok(())
}

#[inline]
Expand Down
5 changes: 3 additions & 2 deletions core/src/types/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,12 @@ impl Iterator for BlockingLister {

#[cfg(test)]
mod tests {
use super::*;
use crate::services::Azblob;
use futures::future;
use futures::StreamExt;

use super::*;
use crate::services::Azblob;

/// Inspired by <https://gist.github.com/kyle-mccarthy/1e6ae89cc34495d731b91ebf5eb5a3d9>
///
/// Invalid lister should not panic nor endless loop.
Expand Down