Skip to content

Commit

Permalink
HDFS-6874. Add GETFILEBLOCKLOCATIONS operation to HttpFS
Browse files Browse the repository at this point in the history
  • Loading branch information
Ashutosh Gupta committed Aug 16, 2022
1 parent f02ff1a commit b78a873
Show file tree
Hide file tree
Showing 12 changed files with 497 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hadoop.hdfs;

import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.util.Preconditions;
Expand Down Expand Up @@ -3898,4 +3899,36 @@ public DatanodeInfo[] getSlowDatanodeStats() throws IOException {
return dfs.slowDatanodeReport();
}

/**
* Returns LocatedBlocks of the corresponding HDFS file p from offset start
* for length len.
* This is similar to {@link #getFileBlockLocations(Path, long, long)} except
* that it returns LocatedBlocks rather than BlockLocation array.
* @param p path representing the file of interest.
* @param start offset
* @param len length
* @return a LocatedBlocks object
* @throws IOException
*/
public LocatedBlocks getLocatedBlocks(Path p, long start, long len)
throws IOException {
final Path absF = fixRelativePart(p);
return new FileSystemLinkResolver<LocatedBlocks>() {
@Override
public LocatedBlocks doCall(final Path p) throws IOException {
return dfs.getLocatedBlocks(getPathName(p), start, len);
}
@Override
public LocatedBlocks next(final FileSystem fs, final Path p)
throws IOException {
if (fs instanceof DistributedFileSystem) {
DistributedFileSystem myDfs = (DistributedFileSystem)fs;
return myDfs.getLocatedBlocks(p, start, len);
}
throw new UnsupportedOperationException("Cannot getLocatedBlocks " +
"through a symlink to a non-DistributedFileSystem: " + fs + " -> "+
p);
}
}.resolve(this, absF);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.thirdparty.com.google.common.collect.Maps;
import org.apache.hadoop.fs.ContentSummary;
Expand Down Expand Up @@ -965,4 +968,53 @@ private static SnapshotStatus toSnapshotStatus(
SnapshotStatus.getParentPath(fullPath)));
return snapshotStatus;
}

@VisibleForTesting
public static BlockLocation[] toBlockLocationArray(Map<?, ?> json)
throws IOException {
final Map<?, ?> rootmap =
(Map<?, ?>) json.get(BlockLocation.class.getSimpleName() + "s");
final List<?> array =
JsonUtilClient.getList(rootmap, BlockLocation.class.getSimpleName());
Preconditions.checkNotNull(array);
final BlockLocation[] locations = new BlockLocation[array.size()];
int i = 0;
for (Object object : array) {
final Map<?, ?> m = (Map<?, ?>) object;
locations[i++] = JsonUtilClient.toBlockLocation(m);
}
return locations;
}

/** Convert a Json map to BlockLocation. **/
private static BlockLocation toBlockLocation(Map<?, ?> m) throws IOException {
if (m == null) {
return null;
}
long length = ((Number) m.get("length")).longValue();
long offset = ((Number) m.get("offset")).longValue();
boolean corrupt = Boolean.getBoolean(m.get("corrupt").toString());
String[] storageIds = toStringArray(getList(m, "storageIds"));
String[] cachedHosts = toStringArray(getList(m, "cachedHosts"));
String[] hosts = toStringArray(getList(m, "hosts"));
String[] names = toStringArray(getList(m, "names"));
String[] topologyPaths = toStringArray(getList(m, "topologyPaths"));
StorageType[] storageTypes = toStorageTypeArray(getList(m, "storageTypes"));
return new BlockLocation(names, hosts, cachedHosts, topologyPaths,
storageIds, storageTypes, offset, length, corrupt);
}

@VisibleForTesting
static String[] toStringArray(List<?> list) {
if (list == null) {
return null;
} else {
final String[] array = new String[list.size()];
int i = 0;
for (Object object : list) {
array[i++] = object.toString();
}
return array;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,8 @@ public class WebHdfsFileSystem extends FileSystem
private KeyProvider testProvider;
private boolean isTLSKrb;

private boolean isServerHCFSCompatible = true;

/**
* Return the protocol scheme for the FileSystem.
*
Expand Down Expand Up @@ -1882,18 +1884,59 @@ public BlockLocation[] getFileBlockLocations(final FileStatus status,
}

@Override
public BlockLocation[] getFileBlockLocations(final Path p,
final long offset, final long length) throws IOException {
public BlockLocation[] getFileBlockLocations(final Path p, final long offset,
final long length) throws IOException {
statistics.incrementReadOps(1);
storageStatistics.incrementOpCounter(OpType.GET_FILE_BLOCK_LOCATIONS);
BlockLocation[] locations = null;
try {
if (isServerHCFSCompatible) {
locations =
getFileBlockLocations(GetOpParam.Op.GETFILEBLOCKLOCATIONS, p, offset, length);
} else {
locations = getFileBlockLocations(GetOpParam.Op.GET_BLOCK_LOCATIONS, p,
offset, length);
}
} catch (RemoteException e) {
// parsing the exception is needed only if the client thinks the service
// is compatible
if (isServerHCFSCompatible && isGetFileBlockLocationsException(e)) {
LOG.warn("Server does not appear to support GETFILEBLOCKLOCATIONS." +
"Fallback to the old GET_BLOCK_LOCATIONS. Exception: " +
e.getMessage());
isServerHCFSCompatible = false;
locations = getFileBlockLocations(GetOpParam.Op.GET_BLOCK_LOCATIONS, p,
offset, length);
} else {
throw e;
}
}
return locations;
}

final HttpOpParam.Op op = GetOpParam.Op.GET_BLOCK_LOCATIONS;
private boolean isGetFileBlockLocationsException(RemoteException e) {
return e.getMessage() != null
&& e.getMessage().contains("Invalid value for webhdfs parameter")
&& e.getMessage()
.contains(GetOpParam.Op.GETFILEBLOCKLOCATIONS.toString());
}

private BlockLocation[] getFileBlockLocations(GetOpParam.Op operation,
final Path p, final long offset, final long length) throws IOException {
final HttpOpParam.Op op = operation;
return new FsPathResponseRunner<BlockLocation[]>(op, p,
new OffsetParam(offset), new LengthParam(length)) {
@Override
BlockLocation[] decodeResponse(Map<?,?> json) throws IOException {
return DFSUtilClient.locatedBlocks2Locations(
JsonUtilClient.toLocatedBlocks(json));
BlockLocation[] decodeResponse(Map<?, ?> json) throws IOException {
switch (operation) {
case GETFILEBLOCKLOCATIONS:
return JsonUtilClient.toBlockLocationArray(json);
case GET_BLOCK_LOCATIONS:
return DFSUtilClient
.locatedBlocks2Locations(JsonUtilClient.toLocatedBlocks(json));
default:
throw new IOException("Unknown operation " + operation.name());
}
}
}.run();
}
Expand Down
10 changes: 10 additions & 0 deletions hadoop-hdfs-project/hadoop-hdfs-httpfs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,16 @@
<artifactId>bcprov-jdk15on</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>mockwebserver</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,12 @@
import java.util.List;

import org.apache.hadoop.thirdparty.com.google.common.base.Charsets;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.type.MapType;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.CommonPathCapabilities;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.DelegationTokenRenewer;
Expand Down Expand Up @@ -140,6 +144,8 @@ public class HttpFSFileSystem extends FileSystem
public static final String SNAPSHOT_DIFF_INDEX = "snapshotdiffindex";
public static final String FSACTION_MODE_PARAM = "fsaction";
public static final String EC_POLICY_NAME_PARAM = "ecpolicy";
public static final String OFFSET_PARAM = "offset";
public static final String LENGTH_PARAM = "length";

public static final Short DEFAULT_PERMISSION = 0755;
public static final String ACLSPEC_DEFAULT = "";
Expand Down Expand Up @@ -239,6 +245,7 @@ public static FILE_TYPE getType(FileStatus fileStatus) {

public static final String STORAGE_POLICIES_JSON = "BlockStoragePolicies";
public static final String STORAGE_POLICY_JSON = "BlockStoragePolicy";
public static final String BLOCK_LOCATIONS_JSON = "BlockLocations";

public static final int HTTP_TEMPORARY_REDIRECT = 307;

Expand Down Expand Up @@ -269,7 +276,8 @@ public enum Operation {
GETSNAPSHOTTABLEDIRECTORYLIST(HTTP_GET), GETSNAPSHOTLIST(HTTP_GET),
GETSERVERDEFAULTS(HTTP_GET),
CHECKACCESS(HTTP_GET), SETECPOLICY(HTTP_PUT), GETECPOLICY(HTTP_GET), UNSETECPOLICY(
HTTP_POST), SATISFYSTORAGEPOLICY(HTTP_PUT), GETSNAPSHOTDIFFLISTING(HTTP_GET);
HTTP_POST), SATISFYSTORAGEPOLICY(HTTP_PUT), GETSNAPSHOTDIFFLISTING(HTTP_GET),
GET_BLOCK_LOCATIONS(HTTP_GET);

private String httpMethod;

Expand Down Expand Up @@ -1710,4 +1718,42 @@ public void satisfyStoragePolicy(final Path path) throws IOException {
Operation.SATISFYSTORAGEPOLICY.getMethod(), params, path, true);
HttpExceptionUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
}
}

@Override
public BlockLocation[] getFileBlockLocations(Path path, long start, long len)
throws IOException {
Map<String, String> params = new HashMap<>();
params.put(OP_PARAM, Operation.GETFILEBLOCKLOCATIONS.toString());
params.put(OFFSET_PARAM, Long.toString(start));
params.put(LENGTH_PARAM, Long.toString(len));
HttpURLConnection conn = getConnection(
Operation.GETFILEBLOCKLOCATIONS.getMethod(), params, path, true);
HttpExceptionUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
JSONObject json = (JSONObject) HttpFSUtils.jsonParse(conn);
return toBlockLocations(json);
}

@Override
public BlockLocation[] getFileBlockLocations(final FileStatus status,
final long offset, final long length) throws IOException {
if (status == null) {
return null;
}
return getFileBlockLocations(status.getPath(), offset, length);
}

@VisibleForTesting
static BlockLocation[] toBlockLocations(JSONObject json) throws IOException {
ObjectMapper mapper = new ObjectMapper();
MapType subType = mapper.getTypeFactory().constructMapType(Map.class,
String.class, BlockLocation[].class);
MapType rootType = mapper.getTypeFactory().constructMapType(Map.class,
mapper.constructType(String.class), mapper.constructType(subType));

Map<String, Map<String, BlockLocation[]>> jsonMap =
mapper.readValue(json.toJSONString(), rootType);
Map<String, BlockLocation[]> locationMap =
jsonMap.get(BLOCK_LOCATIONS_JSON);
return locationMap.get(BlockLocation.class.getSimpleName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.BlockStoragePolicySpi;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileChecksum;
Expand All @@ -44,6 +45,7 @@
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReportListing;
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
Expand Down Expand Up @@ -2192,4 +2194,78 @@ public Void execute(FileSystem fs) throws IOException {
return null;
}
}

/**
* Executor that performs a getFileBlockLocations operation.
*/

@InterfaceAudience.Private
@SuppressWarnings("rawtypes")
public static class FSFileBlockLocations
implements FileSystemAccess.FileSystemExecutor<Map> {
private Path path;
private long offsetValue;
private long lengthValue;

/**
* Creates a file-block-locations executor.
*
* @param path the path to retrieve the location
* @param offsetValue offset into the given file
* @param lengthValue length for which to get locations for
*/
public FSFileBlockLocations(String path, long offsetValue,
long lengthValue) {
this.path = new Path(path);
this.offsetValue = offsetValue;
this.lengthValue = lengthValue;
}

@Override
public Map execute(FileSystem fs) throws IOException {
BlockLocation[] locations = fs.getFileBlockLocations(this.path,
this.offsetValue, this.lengthValue);
return JsonUtil.toJsonMap(locations);
}
}

/**
* Executor that performs a getFileBlockLocations operation for legacy
* clients that supports only GET_BLOCK_LOCATIONS.
*/

@InterfaceAudience.Private
@SuppressWarnings("rawtypes")
public static class FSFileBlockLocationsLegacy
implements FileSystemAccess.FileSystemExecutor<Map> {
private Path path;
private long offsetValue;
private long lengthValue;

/**
* Creates a file-block-locations executor.
*
* @param path the path to retrieve the location
* @param offsetValue offset into the given file
* @param lengthValue length for which to get locations for
*/
public FSFileBlockLocationsLegacy(String path, long offsetValue,
long lengthValue) {
this.path = new Path(path);
this.offsetValue = offsetValue;
this.lengthValue = lengthValue;
}

@Override
public Map execute(FileSystem fs) throws IOException {
if (fs instanceof DistributedFileSystem) {
DistributedFileSystem dfs = (DistributedFileSystem)fs;
LocatedBlocks locations = dfs.getLocatedBlocks(
this.path, this.offsetValue, this.lengthValue);
return JsonUtil.toJsonMap(locations);
}
throw new IOException("Unable to support FSFileBlockLocationsLegacy " +
"because the file system is not DistributedFileSystem.");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ public class HttpFSParametersProvider extends ParametersProvider {
PARAMS_DEF.put(Operation.GETECPOLICY, new Class[] {});
PARAMS_DEF.put(Operation.UNSETECPOLICY, new Class[] {});
PARAMS_DEF.put(Operation.SATISFYSTORAGEPOLICY, new Class[] {});
PARAMS_DEF.put(Operation.GETFILEBLOCKLOCATIONS,
new Class[] {OffsetParam.class, LenParam.class});
PARAMS_DEF.put(Operation.GET_BLOCK_LOCATIONS,
new Class[] {OffsetParam.class, LenParam.class});
}

public HttpFSParametersProvider() {
Expand Down
Loading

0 comments on commit b78a873

Please sign in to comment.