diff --git a/.github/services/webhdfs/webhdfs_with_user_name/action.yml b/.github/services/webhdfs/webhdfs_with_user_name/action.yml new file mode 100644 index 000000000000..842ec48cb1fa --- /dev/null +++ b/.github/services/webhdfs/webhdfs_with_user_name/action.yml @@ -0,0 +1,37 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: webhdfs +description: "Behavior test for webhdfs with user name specified" + +runs: + using: "composite" + steps: + - name: Setup webhdfs + shell: bash + working-directory: fixtures/webhdfs + run: | + docker compose -f docker-compose-webhdfs.yml up -d --wait + - name: Setup + shell: bash + run: | + cat << EOF >> $GITHUB_ENV + OPENDAL_WEBHDFS_ROOT=/ + OPENDAL_WEBHDFS_ENDPOINT=http://127.0.0.1:9870 + OPENDAL_WEBHDFS_ATOMIC_WRITE_DIR=.opendal_tmp/ + OPENDAL_WEBHDFS_USER_NAME=root + EOF diff --git a/core/src/services/webhdfs/backend.rs b/core/src/services/webhdfs/backend.rs index 75d2240b1e5c..afd165afd045 100644 --- a/core/src/services/webhdfs/backend.rs +++ b/core/src/services/webhdfs/backend.rs @@ -101,6 +101,16 @@ impl WebhdfsBuilder { self } + /// Set the username of this backend, + /// used for authentication + /// + pub fn user_name(mut self, user_name: &str) -> Self { + if !user_name.is_empty() { + self.config.user_name = Some(user_name.to_string()); + } + self + } + /// Set the delegation token of this backend, /// used for authentication /// @@ -179,6 +189,7 @@ impl Builder for WebhdfsBuilder { let backend = WebhdfsBackend { root, endpoint, + user_name: self.config.user_name, auth, client, root_checker: OnceCell::new(), @@ -195,6 +206,7 @@ impl Builder for WebhdfsBuilder { pub struct WebhdfsBackend { root: String, endpoint: String, + user_name: Option, auth: Option, root_checker: OnceCell<()>, @@ -212,6 +224,9 @@ impl WebhdfsBackend { self.endpoint, percent_encode_path(&p), ); + if let Some(user) = &self.user_name { + url += format!("&user.name={user}").as_str(); + } if let Some(auth) = &self.auth { url += format!("&{auth}").as_str(); } @@ -220,6 +235,7 @@ impl WebhdfsBackend { req.body(Buffer::new()).map_err(new_request_build_error) } + /// create object pub async fn webhdfs_create_object_request( &self, @@ -235,6 +251,9 @@ impl WebhdfsBackend { self.endpoint, percent_encode_path(&p), ); + if let Some(user) = &self.user_name { + url += format!("&user.name={user}").as_str(); + } if let Some(auth) = &self.auth { url += format!("&{auth}").as_str(); } @@ -277,6 +296,9 @@ impl WebhdfsBackend { self.endpoint, percent_encode_path(&p), ); + if let Some(user) = &self.user_name { + url += format!("&user.name={user}").as_str(); + } if let Some(auth) = &self.auth { url += &format!("&{auth}"); } @@ -311,7 +333,9 @@ impl WebhdfsBackend { percent_encode_path(&from), percent_encode_path(&to) ); - + if let Some(user) = &self.user_name { + url += format!("&user.name={user}").as_str(); + } if let Some(auth) = &self.auth { url += &format!("&{auth}"); } @@ -330,7 +354,9 @@ impl WebhdfsBackend { body: Buffer, ) -> Result> { let mut url = location.to_string(); - + if let Some(user) = &self.user_name { + url += format!("&user.name={user}").as_str(); + } if let Some(auth) = &self.auth { url += &format!("&{auth}"); } @@ -362,7 +388,9 @@ impl WebhdfsBackend { percent_encode_path(&p), percent_encode_path(&sources), ); - + if let Some(user) = &self.user_name { + url += format!("&user.name={user}").as_str(); + } if let Some(auth) = &self.auth { url += &format!("&{auth}"); } @@ -379,6 +407,9 @@ impl WebhdfsBackend { self.endpoint, percent_encode_path(&p), ); + if let Some(user) = &self.user_name { + url += format!("&user.name={user}").as_str(); + } if let Some(auth) = &self.auth { url += &format!("&{auth}"); } @@ -404,6 +435,9 @@ impl WebhdfsBackend { self.endpoint, percent_encode_path(&p), ); + if let Some(user) = &self.user_name { + url += format!("&user.name={user}").as_str(); + } if let Some(auth) = &self.auth { url += format!("&{auth}").as_str(); } @@ -429,6 +463,9 @@ impl WebhdfsBackend { if !start_after.is_empty() { url += format!("&startAfter={}", start_after).as_str(); } + if let Some(user) = &self.user_name { + url += format!("&user.name={user}").as_str(); + } if let Some(auth) = &self.auth { url += format!("&{auth}").as_str(); } @@ -455,7 +492,9 @@ impl WebhdfsBackend { self.endpoint, percent_encode_path(&p), ); - + if let Some(user) = &self.user_name { + url += format!("&user.name={user}").as_str(); + } if let Some(auth) = &self.auth { url += format!("&{auth}").as_str(); } @@ -474,6 +513,9 @@ impl WebhdfsBackend { self.endpoint, percent_encode_path(&p), ); + if let Some(user) = &self.user_name { + url += format!("&user.name={user}").as_str(); + } if let Some(auth) = &self.auth { url += format!("&{auth}").as_str(); } diff --git a/core/src/services/webhdfs/config.rs b/core/src/services/webhdfs/config.rs index 168ea7d0f082..03dadfcaf609 100644 --- a/core/src/services/webhdfs/config.rs +++ b/core/src/services/webhdfs/config.rs @@ -30,6 +30,8 @@ pub struct WebhdfsConfig { pub root: Option, /// Endpoint for webhdfs. pub endpoint: Option, + /// Name of the user for webhdfs. + pub user_name: Option, /// Delegation token for webhdfs. pub delegation: Option, /// Disable batch listing @@ -43,6 +45,7 @@ impl Debug for WebhdfsConfig { f.debug_struct("WebhdfsConfig") .field("root", &self.root) .field("endpoint", &self.endpoint) + .field("user_name", &self.user_name) .field("atomic_write_dir", &self.atomic_write_dir) .finish_non_exhaustive() }