Skip to content

Commit

Permalink
Merge branch 'apache:trunk' into YARN-11225
Browse files Browse the repository at this point in the history
  • Loading branch information
slfan1989 authored Dec 21, 2022
2 parents ff8c2bf + b63b777 commit 3f33b10
Show file tree
Hide file tree
Showing 7 changed files with 148 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,8 @@ public void setSymlink(final Path p) {
}

/**
* Compare this FileStatus to another FileStatus
* Compare this FileStatus to another FileStatus based on lexicographical
* order of path.
* @param o the FileStatus to be compared.
* @return a negative integer, zero, or a positive integer as this object
* is less than, equal to, or greater than the specified object.
Expand All @@ -412,7 +413,8 @@ public int compareTo(FileStatus o) {
}

/**
* Compare this FileStatus to another FileStatus.
* Compare this FileStatus to another FileStatus based on lexicographical
* order of path.
* This method was added back by HADOOP-14683 to keep binary compatibility.
*
* @param o the FileStatus to be compared.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,6 @@ public DiskBalancerDataNode getNodeByIPAddress(String ipAddresss) {
* @return DiskBalancerDataNode.
*/
public DiskBalancerDataNode getNodeByName(String hostName) {
return hostNames.get(hostName);
return hostNames.get(hostName.toLowerCase(Locale.US));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3044,12 +3044,12 @@ LocatedBlock getAdditionalBlock(

LocatedBlock[] onRetryBlock = new LocatedBlock[1];
FSDirWriteFileOp.ValidateAddBlockResult r;
checkOperation(OperationCategory.READ);
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = getPermissionChecker();
FSPermissionChecker.setOperationType(operationName);
readLock();
try {
checkOperation(OperationCategory.READ);
checkOperation(OperationCategory.WRITE);
r = FSDirWriteFileOp.validateAddBlock(this, pc, src, fileId, clientName,
previous, onRetryBlock);
} finally {
Expand Down Expand Up @@ -3095,12 +3095,15 @@ LocatedBlock getAdditionalDatanode(String src, long fileId,
final byte storagePolicyID;
final List<DatanodeStorageInfo> chosen;
final BlockType blockType;
checkOperation(OperationCategory.READ);
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = getPermissionChecker();
FSPermissionChecker.setOperationType(null);
readLock();
try {
checkOperation(OperationCategory.READ);
// Changing this operation category to WRITE instead of making getAdditionalDatanode as a
// read method is aim to let Active NameNode to handle this RPC, because Active NameNode
// contains a more complete DN selection context than Observer NameNode.
checkOperation(OperationCategory.WRITE);
//check safe mode
checkNameNodeSafeMode("Cannot add datanode; src=" + src + ", blk=" + blk);
final INodesInPath iip = dir.resolvePath(pc, src, fileId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,26 +29,32 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.PrivilegedExceptionAction;
import java.util.EnumSet;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.AddBlockFlag;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;

import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.ipc.ObserverRetryOnActiveException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.LambdaTestUtils;
import org.junit.Test;
import org.junit.jupiter.api.Timeout;

Expand Down Expand Up @@ -158,6 +164,43 @@ public void testNamenodeRpcClientIpProxyWithFailBack() throws Exception {
}
}

@Test
@Timeout(30000)
public void testObserverHandleAddBlock() throws Exception {
String baseDir = GenericTestUtils.getRandomizedTempPath();
Configuration conf = new HdfsConfiguration();
MiniQJMHACluster.Builder builder = new MiniQJMHACluster.Builder(conf).setNumNameNodes(3);
builder.getDfsBuilder().numDataNodes(3);
try (MiniQJMHACluster qjmhaCluster = builder.baseDir(baseDir).build()) {
MiniDFSCluster dfsCluster = qjmhaCluster.getDfsCluster();
dfsCluster.waitActive();
dfsCluster.transitionToActive(0);
dfsCluster.transitionToObserver(2);

NameNode activeNN = dfsCluster.getNameNode(0);
NameNode observerNN = dfsCluster.getNameNode(2);

// Stop the editLogTailer of Observer NameNode
observerNN.getNamesystem().getEditLogTailer().stop();
DistributedFileSystem dfs = dfsCluster.getFileSystem(0);

Path testPath = new Path("/testObserverHandleAddBlock/file.txt");
try (FSDataOutputStream ignore = dfs.create(testPath)) {
HdfsFileStatus fileStatus = activeNN.getRpcServer().getFileInfo(testPath.toUri().getPath());
assertNotNull(fileStatus);
assertNull(observerNN.getRpcServer().getFileInfo(testPath.toUri().getPath()));

LambdaTestUtils.intercept(ObserverRetryOnActiveException.class, () -> {
observerNN.getRpcServer().addBlock(testPath.toUri().getPath(),
dfs.getClient().getClientName(), null, null,
fileStatus.getFileId(), null, EnumSet.noneOf(AddBlockFlag.class));
});
} finally {
dfs.delete(testPath, true);
}
}
}

/**
* A test to make sure that if an authorized user adds "clientIp:" to their
* caller context, it will be used to make locality decisions on the NN.
Expand Down
77 changes: 50 additions & 27 deletions hadoop-project/src/site/markdown/index.md.vm
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,29 @@ Overview of Changes
Users are encouraged to read the full set of release notes.
This page provides an overview of the major changes.

Azure ABFS: Critical Stream Prefetch Fix
---------------------------------------------

The abfs has a critical bug fix
[HADOOP-18546](https://issues.apache.org/jira/browse/HADOOP-18546).
*ABFS. Disable purging list of in-progress reads in abfs stream close().*

All users of the abfs connector in hadoop releases 3.3.2+ MUST either upgrade
or disable prefetching by setting `fs.azure.readaheadqueue.depth` to `0`

Consult the parent JIRA [HADOOP-18521](https://issues.apache.org/jira/browse/HADOOP-18521)
*ABFS ReadBufferManager buffer sharing across concurrent HTTP requests*
for root cause analysis, details on what is affected, and mitigations.


Vectored IO API
---------------

[HADOOP-18103](https://issues.apache.org/jira/browse/HADOOP-18103).
*High performance vectored read API in Hadoop*

The `PositionedReadable` interface has now added an operation for
Vectored (also known as Scatter/Gather IO):
Vectored IO (also known as Scatter/Gather IO):

```java
void readVectored(List<? extends FileRange> ranges, IntFunction<ByteBuffer> allocate)
Expand All @@ -38,25 +56,25 @@ possibly in parallel, with results potentially coming in out-of-order.

1. The default implementation uses a series of `readFully()` calls, so delivers
equivalent performance.
2. The local filesystem uses java native IO calls for higher performance reads than `readFully()`
2. The local filesystem uses java native IO calls for higher performance reads than `readFully()`.
3. The S3A filesystem issues parallel HTTP GET requests in different threads.

Benchmarking of (modified) ORC and Parquet clients through `file://` and `s3a://`
show tangible improvements in query times.
Benchmarking of enhanced Apache ORC and Apache Parquet clients through `file://` and `s3a://`
show significant improvements in query performance.

Further Reading: [FsDataInputStream](./hadoop-project-dist/hadoop-common/filesystem/fsdatainputstream.html).

Manifest Committer for Azure ABFS and google GCS performance
------------------------------------------------------------
Mapreduce: Manifest Committer for Azure ABFS and google GCS
----------------------------------------------------------

A new "intermediate manifest committer" uses a manifest file
The new _Intermediate Manifest Committer_ uses a manifest file
to commit the work of successful task attempts, rather than
renaming directories.
Job commit is matter of reading all the manifests, creating the
destination directories (parallelized) and renaming the files,
again in parallel.

This is fast and correct on Azure Storage and Google GCS,
This is both fast and correct on Azure Storage and Google GCS,
and should be used there instead of the classic v1/v2 file
output committers.

Expand All @@ -69,24 +87,6 @@ More details are available in the
[manifest committer](./hadoop-mapreduce-client/hadoop-mapreduce-client-core/manifest_committer.html).
documentation.

Transitive CVE fixes
--------------------

A lot of dependencies have been upgraded to address recent CVEs.
Many of the CVEs were not actually exploitable through the Hadoop
so much of this work is just due diligence.
However applications which have all the library is on a class path may
be vulnerable, and the ugprades should also reduce the number of false
positives security scanners report.

We have not been able to upgrade every single dependency to the latest
version there is. Some of those changes are just going to be incompatible.
If you have concerns about the state of a specific library, consult the apache JIRA
issue tracker to see what discussions have taken place about the library in question.

As an open source project, contributions in this area are always welcome,
especially in testing the active branches, testing applications downstream of
those branches and of whether updated dependencies trigger regressions.

HDFS: Router Based Federation
-----------------------------
Expand All @@ -96,7 +96,6 @@ A lot of effort has been invested into stabilizing/improving the HDFS Router Bas
1. HDFS-13522, HDFS-16767 & Related Jiras: Allow Observer Reads in HDFS Router Based Federation.
2. HDFS-13248: RBF supports Client Locality


HDFS: Dynamic Datanode Reconfiguration
--------------------------------------

Expand All @@ -109,6 +108,29 @@ cluster-wide Datanode Restarts.
See [DataNode.java](https://github.com/apache/hadoop/blob/branch-3.3.5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java#L346-L361)
for the list of dynamically reconfigurable attributes.


Transitive CVE fixes
--------------------

A lot of dependencies have been upgraded to address recent CVEs.
Many of the CVEs were not actually exploitable through the Hadoop
so much of this work is just due diligence.
However applications which have all the library is on a class path may
be vulnerable, and the ugprades should also reduce the number of false
positives security scanners report.

We have not been able to upgrade every single dependency to the latest
version there is. Some of those changes are just going to be incompatible.
If you have concerns about the state of a specific library, consult the pache JIRA
issue tracker to see whether a JIRA has been filed, discussions have taken place about
the library in question, and whether or not there is already a fix in the pipeline.
*Please don't file new JIRAs about dependency-X.Y.Z having a CVE without
searching for any existing issue first*

As an open source project, contributions in this area are always welcome,
especially in testing the active branches, testing applications downstream of
those branches and of whether updated dependencies trigger regressions.

Getting Started
===============

Expand All @@ -119,3 +141,4 @@ which shows you how to set up a single-node Hadoop installation.
Then move on to the
[Cluster Setup](./hadoop-project-dist/hadoop-common/ClusterSetup.html)
to learn how to set up a multi-node Hadoop installation.

Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import java.util.List;
import java.util.ListIterator;
import java.util.NoSuchElementException;
import java.util.stream.Collectors;

import static org.apache.hadoop.fs.aliyun.oss.Constants.*;

Expand Down Expand Up @@ -203,31 +204,29 @@ public void deleteObjects(List<String> keysToDelete) throws IOException {

int retry = 10;
int tries = 0;
List<String> deleteFailed = keysToDelete;
while(CollectionUtils.isNotEmpty(deleteFailed)) {
while (CollectionUtils.isNotEmpty(keysToDelete)) {
DeleteObjectsRequest deleteRequest = new DeleteObjectsRequest(bucketName);
deleteRequest.setKeys(deleteFailed);
deleteRequest.setKeys(keysToDelete);
// There are two modes to do batch delete:
// 1. detail mode: DeleteObjectsResult.getDeletedObjects returns objects
// which were deleted successfully.
// 2. simple mode: DeleteObjectsResult.getDeletedObjects returns objects
// which were deleted unsuccessfully.
// Here, we choose the simple mode to do batch delete.
deleteRequest.setQuiet(true);
// 1. verbose mode: A list of all deleted objects is returned.
// 2. quiet mode: No message body is returned.
// Here, we choose the verbose mode to do batch delete.
deleteRequest.setQuiet(false);
DeleteObjectsResult result = ossClient.deleteObjects(deleteRequest);
statistics.incrementWriteOps(1);
deleteFailed = result.getDeletedObjects();
final List<String> deletedObjects = result.getDeletedObjects();
keysToDelete = keysToDelete.stream().filter(item -> !deletedObjects.contains(item))
.collect(Collectors.toList());
tries++;
if (tries == retry) {
break;
}
}

if (tries == retry && CollectionUtils.isNotEmpty(deleteFailed)) {
if (tries == retry && CollectionUtils.isNotEmpty(keysToDelete)) {
// Most of time, it is impossible to try 10 times, expect the
// Aliyun OSS service problems.
throw new IOException("Failed to delete Aliyun OSS objects for " +
tries + " times.");
throw new IOException("Failed to delete Aliyun OSS objects for " + tries + " times.");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@

package org.apache.hadoop.fs.aliyun.oss;

import com.aliyun.oss.model.OSSObjectSummary;
import com.aliyun.oss.model.ObjectMetadata;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;

import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
Expand All @@ -36,7 +39,10 @@
import java.security.DigestOutputStream;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.List;

import static org.apache.hadoop.fs.aliyun.oss.Constants.MAX_PAGING_KEYS_DEFAULT;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -128,4 +134,29 @@ public void testLargeUpload()
writeRenameReadCompare(new Path("/test/xlarge"),
Constants.MULTIPART_UPLOAD_PART_SIZE_DEFAULT + 1);
}

@Test
public void testDeleteObjects() throws IOException, NoSuchAlgorithmException {
// generate test files
final int files = 10;
final long size = 5 * 1024 * 1024;
final String prefix = "dir";
for (int i = 0; i < files; i++) {
Path path = new Path(String.format("/%s/testFile-%d.txt", prefix, i));
ContractTestUtils.generateTestFile(this.fs, path, size, 256, 255);
}
OSSListRequest listRequest =
store.createListObjectsRequest(prefix, MAX_PAGING_KEYS_DEFAULT, null, null, true);
List<String> keysToDelete = new ArrayList<>();
OSSListResult objects = store.listObjects(listRequest);
assertEquals(files, objects.getObjectSummaries().size());

// test delete files
for (OSSObjectSummary objectSummary : objects.getObjectSummaries()) {
keysToDelete.add(objectSummary.getKey());
}
store.deleteObjects(keysToDelete);
objects = store.listObjects(listRequest);
assertEquals(0, objects.getObjectSummaries().size());
}
}

0 comments on commit 3f33b10

Please sign in to comment.