diff --git a/core/src/services/webhdfs/backend.rs b/core/src/services/webhdfs/backend.rs index b1bfed8aeafe..4adbc6ab9e93 100644 --- a/core/src/services/webhdfs/backend.rs +++ b/core/src/services/webhdfs/backend.rs @@ -201,7 +201,7 @@ impl WebhdfsBackend { &self, path: &str, size: Option, - content_type: Option<&str>, + args: &OpWrite, body: AsyncBody, ) -> Result> { let p = build_abs_path(&self.root, path); @@ -230,7 +230,7 @@ impl WebhdfsBackend { if let Some(size) = size { req = req.header(CONTENT_LENGTH, size.to_string()); } - if let Some(content_type) = content_type { + if let Some(content_type) = args.content_type() { req = req.header(CONTENT_TYPE, content_type); } @@ -296,12 +296,12 @@ impl WebhdfsBackend { pub(super) fn webhdfs_list_status_batch_request( &self, path: &str, - start_after: &Option, + args: &OpList, ) -> Result> { let p = build_abs_path(&self.root, path); // if it's not the first time to call LISTSTATUS_BATCH, we will add &startAfter= - let start_after_param = match start_after { + let start_after_param = match args.start_after() { Some(sa) if sa.is_empty() => String::new(), Some(sa) => format!("&startAfter={}", sa), None => String::new(), @@ -430,7 +430,7 @@ impl Accessor for WebhdfsBackend { /// Create a file or directory async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result { let req = self - .webhdfs_create_object_request(path, Some(0), None, AsyncBody::Empty) + .webhdfs_create_object_request(path, Some(0), &OpWrite::default(), AsyncBody::Empty) .await?; let resp = self.client.send(req).await?; @@ -535,7 +535,7 @@ impl Accessor for WebhdfsBackend { let path = path.trim_end_matches('/'); if !self.disable_list_batch { - let req = self.webhdfs_list_status_batch_request(path, &None)?; + let req = self.webhdfs_list_status_batch_request(path, &OpList::default())?; let resp = self.client.send(req).await?; match resp.status() { StatusCode::OK => { diff --git a/core/src/services/webhdfs/pager.rs b/core/src/services/webhdfs/pager.rs index e9f45df3f44e..c55252c3de39 100644 --- a/core/src/services/webhdfs/pager.rs +++ b/core/src/services/webhdfs/pager.rs @@ -63,9 +63,13 @@ impl oio::Page for WebhdfsPager { return match self.backend.disable_list_batch { true => self.webhdfs_get_next_list_statuses(), false => { + let args = OpList::with_start_after( + OpList::default(), + &self.batch_start_after.clone().unwrap(), + ); let req = self .backend - .webhdfs_list_status_batch_request(&self.path, &self.batch_start_after)?; + .webhdfs_list_status_batch_request(&self.path, &args)?; let resp = self.backend.client.send(req).await?; match resp.status() { diff --git a/core/src/services/webhdfs/writer.rs b/core/src/services/webhdfs/writer.rs index ddc8dd328959..495fa7fc27a7 100644 --- a/core/src/services/webhdfs/writer.rs +++ b/core/src/services/webhdfs/writer.rs @@ -48,7 +48,7 @@ impl oio::OneShotWrite for WebhdfsWriter { .webhdfs_create_object_request( &self.path, Some(bs.len()), - self.op.content_type(), + &self.op, AsyncBody::Bytes(bs), ) .await?;