From d8b93ab9e79ed5929cae033f1687c34eec4ca142 Mon Sep 17 00:00:00 2001 From: PHILO-HE Date: Fri, 14 Sep 2018 13:17:02 +0800 Subject: [PATCH] Support Erasure Coding on HDFS-3.x (#1924) --- .../java/org/smartdata/SmartConstants.java | 3 +- .../java/org/smartdata/model/FileInfo.java | 34 +++- .../org/smartdata/model/TestFileInfo.java | 24 ++- .../smartdata/hdfs/CompatibilityHelper2.java | 10 + smart-hadoop-support/smart-hadoop-3.1/pom.xml | 26 ++- .../smartdata/hdfs/CompatibilityHelper31.java | 23 +++ .../hdfs/action/CheckErasureCodingPolicy.java | 59 ++++++ .../hdfs/action/ErasureCodingAction.java | 178 ++++++++++++++++++ .../hdfs/action/ErasureCodingBase.java | 114 +++++++++++ .../hdfs/action/ListErasureCodingPolicy.java} | 32 +++- .../hdfs/action/UnErasureCodingAction.java | 114 +++++++++++ .../hdfs/action/TestErasureCodingAction.java | 71 +++++++ .../action/TestErasureCodingMiniCluster.java | 75 ++++++++ .../action/TestUnErasureCodingAction.java | 71 +++++++ .../smart-hadoop-common/pom.xml | 15 ++ .../smartdata/hdfs/CompatibilityHelper.java | 4 + .../java/org/smartdata/hdfs/HadoopUtil.java | 1 + .../org/smartdata/hdfs/action/HdfsAction.java | 0 .../smartdata/hdfs/MiniClusterHarness.java | 0 .../hdfs/MiniClusterWithStoragesHarness.java | 0 .../hdfs/action/HdfsActionFactory.java | 27 ++- .../smartdata/hdfs/action/MetaDataAction.java | 3 +- .../metric/fetcher/InotifyEventApplier.java | 31 ++- .../hdfs/metric/fetcher/NamespaceFetcher.java | 4 +- .../scheduler/ErasureCodingScheduler.java | 162 ++++++++++++++++ .../metric/fetcher/TestCachedListFetcher.java | 3 +- .../org/smartdata/metastore/MetaStore.java | 4 +- .../smartdata/metastore/dao/FileInfoDao.java | 4 +- .../smartdata/metastore/TestMetaStore.java | 12 +- .../dao/TestAccessCountTableManager.java | 2 + .../metastore/dao/TestFileInfoDao.java | 14 +- .../metastore/dao/TestTableAggregator.java | 1 + .../server/engine/rule/TestRuleManager.java | 2 +- 33 files changed, 1081 insertions(+), 42 deletions(-) create mode 100644 smart-hadoop-support/smart-hadoop-3.1/src/main/java/org/smartdata/hdfs/action/CheckErasureCodingPolicy.java create mode 100644 smart-hadoop-support/smart-hadoop-3.1/src/main/java/org/smartdata/hdfs/action/ErasureCodingAction.java create mode 100644 smart-hadoop-support/smart-hadoop-3.1/src/main/java/org/smartdata/hdfs/action/ErasureCodingBase.java rename smart-hadoop-support/{smart-hadoop/src/main/java/org/smartdata/hdfs/action/StripErasureCodeFileAction.java => smart-hadoop-3.1/src/main/java/org/smartdata/hdfs/action/ListErasureCodingPolicy.java} (50%) create mode 100644 smart-hadoop-support/smart-hadoop-3.1/src/main/java/org/smartdata/hdfs/action/UnErasureCodingAction.java create mode 100644 smart-hadoop-support/smart-hadoop-3.1/src/test/java/org/smartdata/hdfs/action/TestErasureCodingAction.java create mode 100644 smart-hadoop-support/smart-hadoop-3.1/src/test/java/org/smartdata/hdfs/action/TestErasureCodingMiniCluster.java create mode 100644 smart-hadoop-support/smart-hadoop-3.1/src/test/java/org/smartdata/hdfs/action/TestUnErasureCodingAction.java rename smart-hadoop-support/{smart-hadoop => smart-hadoop-common}/src/main/java/org/smartdata/hdfs/HadoopUtil.java (99%) rename smart-hadoop-support/{smart-hadoop => smart-hadoop-common}/src/main/java/org/smartdata/hdfs/action/HdfsAction.java (100%) rename smart-hadoop-support/{smart-hadoop => smart-hadoop-common}/src/test/java/org/smartdata/hdfs/MiniClusterHarness.java (100%) rename smart-hadoop-support/{smart-hadoop => smart-hadoop-common}/src/test/java/org/smartdata/hdfs/MiniClusterWithStoragesHarness.java (100%) create mode 100644 smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/scheduler/ErasureCodingScheduler.java diff --git a/smart-common/src/main/java/org/smartdata/SmartConstants.java b/smart-common/src/main/java/org/smartdata/SmartConstants.java index 9206d43c1f8..70ff96e9ae7 100644 --- a/smart-common/src/main/java/org/smartdata/SmartConstants.java +++ b/smart-common/src/main/java/org/smartdata/SmartConstants.java @@ -28,7 +28,8 @@ public class SmartConstants { "org.smartdata.hdfs.scheduler.MoverScheduler, " + "org.smartdata.hdfs.scheduler.CopyScheduler, " + "org.smartdata.hdfs.scheduler.Copy2S3Scheduler," - + "org.smartdata.hdfs.scheduler.SmallFileScheduler"; + + "org.smartdata.hdfs.scheduler.SmallFileScheduler," + + "org.smartdata.hdfs.scheduler.ErasureCodingScheduler"; public static final String SMART_HADOOP_LAST_INOTIFY_TXID = "smart_hadoop_last_inotify_txid"; diff --git a/smart-common/src/main/java/org/smartdata/model/FileInfo.java b/smart-common/src/main/java/org/smartdata/model/FileInfo.java index 3e119cb600f..1cab069c27f 100644 --- a/smart-common/src/main/java/org/smartdata/model/FileInfo.java +++ b/smart-common/src/main/java/org/smartdata/model/FileInfo.java @@ -32,11 +32,12 @@ public class FileInfo { private String owner; private String group; private byte storagePolicy; + private byte erasureCodingPolicy; public FileInfo(String path, long fileId, long length, boolean isdir, short blockReplication, long blocksize, long modificationTime, long accessTime, short permission, String owner, String group, - byte storagePolicy) { + byte storagePolicy, byte erasureCodingPolicy) { this.path = path; this.fileId = fileId; this.length = length; @@ -49,6 +50,7 @@ public FileInfo(String path, long fileId, long length, boolean isdir, this.owner = owner; this.group = group; this.storagePolicy = storagePolicy; + this.erasureCodingPolicy = erasureCodingPolicy; } public String getPath() { @@ -147,6 +149,14 @@ public void setStoragePolicy(byte storagePolicy) { this.storagePolicy = storagePolicy; } + public byte getErasureCodingPolicy() { + return erasureCodingPolicy; + } + + public void setErasureCodingPolicy(byte erasureCodingPolicy) { + this.erasureCodingPolicy = erasureCodingPolicy; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -165,6 +175,7 @@ public boolean equals(Object o) { && accessTime == fileInfo.accessTime && permission == fileInfo.permission && storagePolicy == fileInfo.storagePolicy + && erasureCodingPolicy == fileInfo.erasureCodingPolicy && Objects.equals(path, fileInfo.path) && Objects.equals(owner, fileInfo.owner) && Objects.equals(group, fileInfo.group); @@ -184,7 +195,8 @@ public int hashCode() { permission, owner, group, - storagePolicy); + storagePolicy, + erasureCodingPolicy); } public static Builder newBuilder() { @@ -196,7 +208,7 @@ public String toString() { return String.format( "FileInfo{path=\'%s\', fileId=%s, length=%s, isdir=%s, blockReplication=%s, " + "blocksize=%s, modificationTime=%s, accessTime=%s, permission=%s, owner=\'%s\', " - + "group=\'%s\', storagePolicy=%s}", + + "group=\'%s\', storagePolicy=%s, erasureCodingPolicy=%s}", path, fileId, length, @@ -208,7 +220,8 @@ public String toString() { permission, owner, group, - storagePolicy); + storagePolicy, + erasureCodingPolicy); } public static class Builder { @@ -224,6 +237,7 @@ public static class Builder { private String owner; private String group; private byte storagePolicy; + private byte erasureCodingPolicy; public Builder setPath(String path) { this.path = path; @@ -285,10 +299,15 @@ public Builder setStoragePolicy(byte storagePolicy) { return this; } + public Builder setErasureCodingPolicy(byte erasureCodingPolicy) { + this.erasureCodingPolicy = erasureCodingPolicy; + return this; + } + public FileInfo build() { return new FileInfo(path, fileId, length, isdir, blockReplication, blocksize, modificationTime, accessTime, permission, owner, - group, storagePolicy); + group, storagePolicy, erasureCodingPolicy); } @Override @@ -296,7 +315,7 @@ public String toString() { return String.format( "Builder{path=\'%s\', fileId=%s, length=%s, isdir=%s, blockReplication=%s, " + "blocksize=%s, modificationTime=%s, accessTime=%s, permission=%s, owner=\'%s\', " - + "group=\'%s\', storagePolicy=\'%s\'}", + + "group=\'%s\', storagePolicy=%s, erasureCodingPolicy=%s}", path, fileId, length, @@ -308,7 +327,8 @@ public String toString() { permission, owner, group, - storagePolicy); + storagePolicy, + erasureCodingPolicy); } } } diff --git a/smart-common/src/test/java/org/smartdata/model/TestFileInfo.java b/smart-common/src/test/java/org/smartdata/model/TestFileInfo.java index cacaf219909..269b119603a 100644 --- a/smart-common/src/test/java/org/smartdata/model/TestFileInfo.java +++ b/smart-common/src/test/java/org/smartdata/model/TestFileInfo.java @@ -36,7 +36,8 @@ public ChildFileInfo( short permission, String owner, String group, - byte storagePolicy) { + byte storagePolicy, + byte erasureCodingPolicy) { super( path, fileId, @@ -49,7 +50,8 @@ public ChildFileInfo( permission, owner, group, - storagePolicy); + storagePolicy, + erasureCodingPolicy); } } @@ -57,37 +59,43 @@ public ChildFileInfo( public void testEquals() throws Exception { //Case 1: FileInfo fileInfo = - new FileInfo(" ", 1, 1, true, (short) 1, 1, 1, 1, (short) 1, " ", " ", (byte) 1); + new FileInfo(" ", 1, 1, true, (short) 1, 1, 1, + 1, (short) 1, " ", " ", (byte) 1, (byte) 0); Assert.assertEquals(true, fileInfo.equals(fileInfo)); //Case 2: FileInfo fileInfo1 = - new FileInfo(" ", 1, 1, true, (short) 1, 1, 1, 1, (short) 1, " ", " ", (byte) 1); + new FileInfo(" ", 1, 1, true, (short) 1, 1, 1, + 1, (short) 1, " ", " ", (byte) 1, (byte) 0); Assert.assertEquals(true, fileInfo.equals(fileInfo1)); //Case 3: FileInfo fileInfo2 = - new FileInfo(" ", 1, 1, true, (short) 1, 1, 1, 1, (short) 1, null, " ", (byte) 1); + new FileInfo(" ", 1, 1, true, (short) 1, 1, 1, + 1, (short) 1, null, " ", (byte) 1, (byte) 0); Assert.assertEquals(false, fileInfo.equals(fileInfo2)); Assert.assertEquals(false, fileInfo2.equals(fileInfo)); //Case 4: FileInfo fileInfo3 = - new FileInfo(null, 1, 1, true, (short) 1, 1, 1, 1, (short) 1, " ", " ", (byte) 1); + new FileInfo(null, 1, 1, true, (short) 1, 1, 1, + 1, (short) 1, " ", " ", (byte) 1, (byte) 0); Assert.assertEquals(false, fileInfo.equals(fileInfo3)); Assert.assertEquals(false, fileInfo3.equals(fileInfo)); //Case 5: FileInfo fileInfo4 = - new FileInfo(null, 1, 1, true, (short) 1, 1, 1, 1, (short) 1, " ", null, (byte) 1); + new FileInfo(null, 1, 1, true, (short) 1, 1, 1, + 1, (short) 1, " ", null, (byte) 1, (byte) 0); Assert.assertEquals(false, fileInfo.equals(fileInfo4)); Assert.assertEquals(false, fileInfo4.equals(fileInfo)); //Case 6: FileInfo fileInfo5 = - new FileInfo(" ", 1, 1, true, (short) 1, 1, 1, 1, (short) 2, " ", " ", (byte) 1); + new FileInfo(" ", 1, 1, true, (short) 1, 1, 1, + 1, (short) 2, " ", " ", (byte) 1, (byte) 0); Assert.assertEquals(false, fileInfo.equals(fileInfo5)); } } diff --git a/smart-hadoop-support/smart-hadoop-2/src/main/java/org/smartdata/hdfs/CompatibilityHelper2.java b/smart-hadoop-support/smart-hadoop-2/src/main/java/org/smartdata/hdfs/CompatibilityHelper2.java index 935321b452e..9c8d31fe66a 100644 --- a/smart-hadoop-support/smart-hadoop-2/src/main/java/org/smartdata/hdfs/CompatibilityHelper2.java +++ b/smart-hadoop-support/smart-hadoop-2/src/main/java/org/smartdata/hdfs/CompatibilityHelper2.java @@ -20,6 +20,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileEncryptionInfo; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.protocol.*; import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; @@ -62,4 +63,13 @@ public HdfsFileStatus createHdfsFileStatus( length, isdir, block_replication, blocksize, modification_time, access_time, permission, owner, group, symlink, path, fileId, childrenNum, feInfo, storagePolicy); } + + public byte getErasureCodingPolicy(HdfsFileStatus fileStatus) { + // for HDFS2.x, the erasure policy is always replication whose id is 0 in HDFS. + return (byte) 0; + } + + public byte getErasureCodingPolicyByName(DFSClient client, String ecPolicyName) throws IOException { + return (byte) 0; + } } \ No newline at end of file diff --git a/smart-hadoop-support/smart-hadoop-3.1/pom.xml b/smart-hadoop-support/smart-hadoop-3.1/pom.xml index a68ac82c0ae..b371795c186 100644 --- a/smart-hadoop-support/smart-hadoop-3.1/pom.xml +++ b/smart-hadoop-support/smart-hadoop-3.1/pom.xml @@ -70,6 +70,11 @@ smart-hadoop-common 1.5.0-SNAPSHOT + + org.apache.hadoop + hadoop-aws + ${hadoop.version} + org.apache.hadoop hadoop-hdfs @@ -84,10 +89,27 @@ test test-jar + + org.mockito + mockito-all + test + org.apache.hadoop - hadoop-aws - ${hadoop.version} + hadoop-common + test + test-jar + + + org.apache.hadoop + hadoop-hdfs + test + test-jar + + + junit + junit + test diff --git a/smart-hadoop-support/smart-hadoop-3.1/src/main/java/org/smartdata/hdfs/CompatibilityHelper31.java b/smart-hadoop-support/smart-hadoop-3.1/src/main/java/org/smartdata/hdfs/CompatibilityHelper31.java index a1ce43f0205..c78723a864f 100644 --- a/smart-hadoop-support/smart-hadoop-3.1/src/main/java/org/smartdata/hdfs/CompatibilityHelper31.java +++ b/smart-hadoop-support/smart-hadoop-3.1/src/main/java/org/smartdata/hdfs/CompatibilityHelper31.java @@ -207,4 +207,27 @@ public HdfsFileStatus createHdfsFileStatus( .storagePolicy(storagePolicy) .build(); } + + @Override + public byte getErasureCodingPolicy(HdfsFileStatus fileStatus) { + ErasureCodingPolicy erasureCodingPolicy = fileStatus.getErasureCodingPolicy(); + // null means replication policy and its id is 0 in HDFS. + if (erasureCodingPolicy == null) { + return (byte) 0; + } + return fileStatus.getErasureCodingPolicy().getId(); + } + + @Override + public byte getErasureCodingPolicyByName(DFSClient client, String ecPolicyName) throws IOException { + if (ecPolicyName.equals(SystemErasureCodingPolicies.getReplicationPolicy().getName())) { + return (byte) 0; + } + for (ErasureCodingPolicyInfo policyInfo : client.getErasureCodingPolicies()) { + if (policyInfo.getPolicy().getName().equals(ecPolicyName)) { + return policyInfo.getPolicy().getId(); + } + } + return (byte) -1; + } } \ No newline at end of file diff --git a/smart-hadoop-support/smart-hadoop-3.1/src/main/java/org/smartdata/hdfs/action/CheckErasureCodingPolicy.java b/smart-hadoop-support/smart-hadoop-3.1/src/main/java/org/smartdata/hdfs/action/CheckErasureCodingPolicy.java new file mode 100644 index 00000000000..84074bc3e73 --- /dev/null +++ b/smart-hadoop-support/smart-hadoop-3.1/src/main/java/org/smartdata/hdfs/action/CheckErasureCodingPolicy.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.smartdata.hdfs.action; + +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; +import org.smartdata.action.annotation.ActionSignature; +import org.smartdata.conf.SmartConf; +import org.smartdata.hdfs.HadoopUtil; + +import java.util.Map; + +/** + * An action to check the EC policy for a file or dir. + */ +@ActionSignature( + actionId = "checkec", + displayName = "checkec", + usage = HdfsAction.FILE_PATH + " $src" +) +public class CheckErasureCodingPolicy extends HdfsAction { + public static final String RESULT_OF_NULL_EC_POLICY = + "The EC policy is replication."; + private SmartConf conf; + private String srcPath; + + @Override + public void init(Map args) { + super.init(args); + this.conf = getContext().getConf(); + this.srcPath = args.get(HdfsAction.FILE_PATH); + } + + @Override + public void execute() throws Exception { + this.setDfsClient(HadoopUtil.getDFSClient( + HadoopUtil.getNameNodeUri(conf), conf)); + ErasureCodingPolicy srcEcPolicy = dfsClient.getErasureCodingPolicy(srcPath); + if (srcEcPolicy == null) { + appendResult(RESULT_OF_NULL_EC_POLICY); + } else { + appendResult(srcEcPolicy.toString()); + } + } +} diff --git a/smart-hadoop-support/smart-hadoop-3.1/src/main/java/org/smartdata/hdfs/action/ErasureCodingAction.java b/smart-hadoop-support/smart-hadoop-3.1/src/main/java/org/smartdata/hdfs/action/ErasureCodingAction.java new file mode 100644 index 00000000000..2b50d9a36ce --- /dev/null +++ b/smart-hadoop-support/smart-hadoop-3.1/src/main/java/org/smartdata/hdfs/action/ErasureCodingAction.java @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.smartdata.hdfs.action; + +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.Options; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyState; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.smartdata.action.ActionException; +import org.smartdata.action.annotation.ActionSignature; +import org.smartdata.conf.SmartConf; +import org.smartdata.hdfs.HadoopUtil; + +import java.io.IOException; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.Map; + +/** + * An action to set an EC policy for a dir or convert a file to another one in an EC policy. + * Default values are used for arguments of policy & bufSize if their values are not given in this action. + */ +@ActionSignature( + actionId = "ec", + displayName = "ec", + usage = HdfsAction.FILE_PATH + " $src " + ErasureCodingAction.EC_POLICY_NAME + " $policy" + + ErasureCodingBase.BUF_SIZE + " $bufSize" +) +public class ErasureCodingAction extends ErasureCodingBase { + private static final Logger LOG = + LoggerFactory.getLogger(ErasureCodingAction.class); + public static final String EC_POLICY_NAME = "-policy"; + + private SmartConf conf; + private String ecPolicyName; + + @Override + public void init(Map args) { + super.init(args); + this.conf = getContext().getConf(); + this.srcPath = args.get(FILE_PATH); + if (args.containsKey(EC_TMP)) { + // this is a temp file kept for converting a file to another with other ec policy. + this.ecTmpPath = args.get(EC_TMP); + } + if (args.containsKey(EC_POLICY_NAME) && !args.get(EC_POLICY_NAME).isEmpty()) { + this.ecPolicyName = args.get(EC_POLICY_NAME); + } else { + String defaultEcPolicy = conf.getTrimmed(DFSConfigKeys.DFS_NAMENODE_EC_SYSTEM_DEFAULT_POLICY, + DFSConfigKeys.DFS_NAMENODE_EC_SYSTEM_DEFAULT_POLICY_DEFAULT); + this.ecPolicyName = defaultEcPolicy; + } + if (args.containsKey(BUF_SIZE) && !args.get(BUF_SIZE).isEmpty()) { + this.bufferSize = Integer.valueOf(args.get(BUF_SIZE)); + } + this.progress = 0.0F; + } + + @Override + protected void execute() throws Exception { + final String MATCH_RESULT = + "The current EC policy is matched with the target one."; + final String DIR_RESULT = + "The EC policy is set successfully for the given directory."; + final String CONVERT_RESULT = + "The file is converted successfully with the given or default ec policy."; + + this.setDfsClient(HadoopUtil.getDFSClient( + HadoopUtil.getNameNodeUri(conf), conf)); + // keep attribute consistent + HdfsFileStatus fileStatus = dfsClient.getFileInfo(srcPath); + if (fileStatus == null) { + throw new ActionException("File doesn't exist!"); + } + validateEcPolicy(ecPolicyName); + ErasureCodingPolicy srcEcPolicy = fileStatus.getErasureCodingPolicy(); + // if the current ecPolicy is already the target one, no need to convert + if (srcEcPolicy != null) { + if (srcEcPolicy.getName().equals(ecPolicyName)) { + appendResult(MATCH_RESULT); + this.progress = 1.0F; + return; + } + } else { + // if ecPolicy is null, it means replication. + if (ecPolicyName.equals(REPLICATION_POLICY_NAME)) { + appendResult(MATCH_RESULT); + this.progress = 1.0F; + return; + } + } + if (fileStatus.isDir()) { + dfsClient.setErasureCodingPolicy(srcPath, ecPolicyName); + this.progress = 1.0F; + appendResult(DIR_RESULT); + return; + } + HdfsDataOutputStream outputStream = null; + try { + // a file only with replication policy can be appended. + if (srcEcPolicy == null) { + // append the file to acquire the lock to avoid modifying, no real appending occurs. + outputStream = + dfsClient.append(srcPath, bufferSize, EnumSet.of(CreateFlag.APPEND), null, null); + } + convert(conf, ecPolicyName); + /** + * The append operation will change the modification accordingly, + * so we use the filestatus obtained before append to set ecTmp file's most attributes + */ + setAttributes(srcPath, fileStatus, ecTmpPath); + dfsClient.rename(ecTmpPath, srcPath, Options.Rename.OVERWRITE); + appendResult(CONVERT_RESULT); + if (srcEcPolicy == null) { + appendResult("The previous EC policy is replication."); + } else { + appendResult(String.format("The previous EC policy is {}.", srcEcPolicy.getName())); + } + appendResult(String.format("The current EC policy is {}.", ecPolicyName)); + } catch (ActionException ex) { + try { + if (dfsClient.getFileInfo(ecTmpPath) != null) { + dfsClient.delete(ecTmpPath, false); + } + } catch (IOException e) { + LOG.error("Failed to delete tmp file created during the conversion!"); + } + throw new ActionException(ex); + } finally { + if (outputStream != null) { + try { + outputStream.close(); + } catch (IOException ex) { + // IOException may be reported for missing blocks. + } + } + } + } + + public void validateEcPolicy(String ecPolicyName) throws Exception { + Map ecPolicyNameToState = new HashMap<>(); + for (ErasureCodingPolicyInfo info : dfsClient.getErasureCodingPolicies()) { + ecPolicyNameToState.put(info.getPolicy().getName(), info.getState()); + } + if (!ecPolicyNameToState.keySet().contains(ecPolicyName) && !ecPolicyName.equals(REPLICATION_POLICY_NAME)) { + throw new ActionException("The EC policy " + ecPolicyName + " is not supported!"); + } else if (ecPolicyNameToState.get(ecPolicyName) == ErasureCodingPolicyState.DISABLED + || ecPolicyNameToState.get(ecPolicyName) == ErasureCodingPolicyState.REMOVED) { + throw new ActionException("The EC policy " + ecPolicyName + " is disabled or removed!"); + } + } + + @Override + public float getProgress() { + return progress; + } +} \ No newline at end of file diff --git a/smart-hadoop-support/smart-hadoop-3.1/src/main/java/org/smartdata/hdfs/action/ErasureCodingBase.java b/smart-hadoop-support/smart-hadoop-3.1/src/main/java/org/smartdata/hdfs/action/ErasureCodingBase.java new file mode 100644 index 00000000000..1476de4e44c --- /dev/null +++ b/smart-hadoop-support/smart-hadoop-3.1/src/main/java/org/smartdata/hdfs/action/ErasureCodingBase.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.smartdata.hdfs.action; + +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.XAttrSetFlag; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSInputStream; +import org.apache.hadoop.hdfs.DFSOutputStream; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.smartdata.action.ActionException; +import org.smartdata.conf.SmartConf; + +import java.io.IOException; +import java.util.EnumSet; +import java.util.Map; + +/** + * An abstract base class for ErasureCodingAction & UnErasureCodingAction. + */ +abstract public class ErasureCodingBase extends HdfsAction { + private static final Logger LOG = + LoggerFactory.getLogger(ErasureCodingBase.class); + public static final String BUF_SIZE = "-bufSize"; + protected String srcPath; + protected String ecTmpPath; + protected int bufferSize = 1024 * 1024; + protected float progress; + // The value for -ecTmp is assigned by ErasureCodingScheduler. + public static final String EC_TMP = "-ecTmp"; + public static final String REPLICATION_POLICY_NAME = + SystemErasureCodingPolicies.getReplicationPolicy().getName(); + + protected void convert(SmartConf conf, String ecPolicyName) throws ActionException { + DFSInputStream in = null; + DFSOutputStream out = null; + try { + long blockSize = conf.getLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, + DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT); + in = dfsClient.open(srcPath, bufferSize, true); + HdfsFileStatus fileStatus = dfsClient.getFileInfo(srcPath); + // use the same FsPermission as srcPath + FsPermission permission = fileStatus.getPermission(); + out = dfsClient.create(ecTmpPath, permission, EnumSet.of(CreateFlag.CREATE), true, + (short) 1, blockSize, null, bufferSize, null, null, ecPolicyName); + long bytesRemaining = fileStatus.getLen(); + byte[] buf = new byte[bufferSize]; + while (bytesRemaining > 0L) { + int bytesToRead = + (int) (bytesRemaining < (long) buf.length ? bytesRemaining : + (long) buf.length); + int bytesRead = in.read(buf, 0, bytesToRead); + if (bytesRead == -1) { + break; + } + out.write(buf, 0, bytesRead); + bytesRemaining -= (long) bytesRead; + this.progress = (float) (fileStatus.getLen() - bytesRemaining) / fileStatus.getLen(); + } + } catch (Exception ex) { + throw new ActionException(ex); + } finally { + try { + if (in != null) { + in.close(); + } + if (out != null) { + out.close(); + } + } catch (IOException ex) { + LOG.error("IOException occurred when closing DFSInputStream or DFSOutputStream!"); + } + } + } + + // set attributes for dest to keep them consistent with their counterpart of src + protected void setAttributes(String src, HdfsFileStatus fileStatus, String dest) + throws IOException { + dfsClient.setOwner(dest, fileStatus.getOwner(), fileStatus.getGroup()); + dfsClient.setPermission(dest, fileStatus.getPermission()); + dfsClient.setStoragePolicy(dest, dfsClient.getStoragePolicy(src).getName()); + dfsClient.setTimes(dest, fileStatus.getModificationTime(), fileStatus.getAccessTime()); + boolean aclsEnabled = getContext().getConf().getBoolean( + DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, + DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_DEFAULT); + if (aclsEnabled) { + dfsClient.setAcl(dest, dfsClient.getAclStatus(src).getEntries()); + } + //TODO: check ec related record to avoid paradox + for (Map.Entry entry : dfsClient.getXAttrs(src).entrySet()) { + dfsClient.setXAttr(dest, entry.getKey(), entry.getValue(), + EnumSet.of(XAttrSetFlag.CREATE, XAttrSetFlag.REPLACE)); + } + } +} \ No newline at end of file diff --git a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/action/StripErasureCodeFileAction.java b/smart-hadoop-support/smart-hadoop-3.1/src/main/java/org/smartdata/hdfs/action/ListErasureCodingPolicy.java similarity index 50% rename from smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/action/StripErasureCodeFileAction.java rename to smart-hadoop-support/smart-hadoop-3.1/src/main/java/org/smartdata/hdfs/action/ListErasureCodingPolicy.java index 2be287215ee..7ad70b77cf8 100644 --- a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/action/StripErasureCodeFileAction.java +++ b/smart-hadoop-support/smart-hadoop-3.1/src/main/java/org/smartdata/hdfs/action/ListErasureCodingPolicy.java @@ -17,16 +17,36 @@ */ package org.smartdata.hdfs.action; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo; +import org.smartdata.action.annotation.ActionSignature; +import org.smartdata.conf.SmartConf; +import org.smartdata.hdfs.HadoopUtil; + +import java.util.Map; /** - * An action to do strip level erasure code a file. Only for Hadoop 3.x. + * An action to list the info for all EC policies in HDFS. */ -public class StripErasureCodeFileAction extends HdfsAction { - private static final Logger LOG = LoggerFactory.getLogger(StripErasureCodeFileAction.class); +@ActionSignature( + actionId = "listec", + displayName = "listec", + usage = "No args" +) +public class ListErasureCodingPolicy extends HdfsAction { + private SmartConf conf; + + @Override + public void init(Map args) { + super.init(args); + this.conf = getContext().getConf(); + } @Override - protected void execute() throws Exception { + public void execute() throws Exception { + this.setDfsClient(HadoopUtil.getDFSClient( + HadoopUtil.getNameNodeUri(conf), conf)); + for (ErasureCodingPolicyInfo policyInfo : dfsClient.getErasureCodingPolicies()) { + appendResult("{" + policyInfo.toString() + "}"); + } } } diff --git a/smart-hadoop-support/smart-hadoop-3.1/src/main/java/org/smartdata/hdfs/action/UnErasureCodingAction.java b/smart-hadoop-support/smart-hadoop-3.1/src/main/java/org/smartdata/hdfs/action/UnErasureCodingAction.java new file mode 100644 index 00000000000..8d05c2f1f44 --- /dev/null +++ b/smart-hadoop-support/smart-hadoop-3.1/src/main/java/org/smartdata/hdfs/action/UnErasureCodingAction.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.smartdata.hdfs.action; + +import org.apache.hadoop.fs.Options; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.smartdata.action.ActionException; +import org.smartdata.action.annotation.ActionSignature; +import org.smartdata.conf.SmartConf; +import org.smartdata.hdfs.HadoopUtil; + +import java.io.IOException; +import java.util.Map; + +/** + * An action to set replication policy for a dir or convert a file to another one in replication policy. + * Default value is used for argument of bufSize if its value is not given in this action. + */ +@ActionSignature( + actionId = "unec", + displayName = "unec", + usage = HdfsAction.FILE_PATH + " $src " + ErasureCodingBase.BUF_SIZE + " $bufSize" +) +public class UnErasureCodingAction extends ErasureCodingBase { + private static final Logger LOG = + LoggerFactory.getLogger(UnErasureCodingAction.class); + private String ecPolicyName; + private SmartConf conf; + + @Override + public void init(Map args) { + super.init(args); + this.conf = getContext().getConf(); + this.srcPath = args.get(FILE_PATH); + this.ecPolicyName = REPLICATION_POLICY_NAME; + if (args.containsKey(EC_TMP)) { + this.ecTmpPath = args.get(EC_TMP); + } + if (args.containsKey(BUF_SIZE) && !args.get(BUF_SIZE).isEmpty()) { + this.bufferSize = Integer.valueOf(args.get(BUF_SIZE)); + } + this.progress = 0.0F; + } + + @Override + protected void execute() throws Exception { + final String MATCH_RESULT = + "The current EC policy is replication already."; + final String DIR_RESULT = + "The replication EC policy is set successfully for the given directory."; + final String CONVERT_RESULT = + "The file is converted successfully with replication EC policy."; + + this.setDfsClient(HadoopUtil.getDFSClient( + HadoopUtil.getNameNodeUri(conf), conf)); + HdfsFileStatus fileStatus = dfsClient.getFileInfo(srcPath); + if (fileStatus == null) { + throw new ActionException("File doesn't exist!"); + } + ErasureCodingPolicy srcEcPolicy = fileStatus.getErasureCodingPolicy(); + // if ecPolicy is null, it means replication. + if (srcEcPolicy == null) { + this.progress = 1.0F; + appendResult(MATCH_RESULT); + return; + } + if (fileStatus.isDir()) { + dfsClient.setErasureCodingPolicy(srcPath, ecPolicyName); + progress = 1.0F; + appendResult(DIR_RESULT); + return; + } + try { + convert(conf, ecPolicyName); + setAttributes(srcPath, fileStatus, ecTmpPath); + dfsClient.rename(ecTmpPath, srcPath, Options.Rename.OVERWRITE); + appendResult(CONVERT_RESULT); + appendResult(String.format("The previous EC policy is %s.", srcEcPolicy.getName())); + appendResult(String.format("The current EC policy is %s.", REPLICATION_POLICY_NAME)); + } catch (ActionException ex) { + try { + if (dfsClient.getFileInfo(ecTmpPath) != null) { + dfsClient.delete(ecTmpPath, false); + } + } catch (IOException e) { + LOG.error("Failed to delete tmp file created during the conversion!"); + } + throw new ActionException(ex); + } + } + + @Override + public float getProgress() { + return progress; + } +} \ No newline at end of file diff --git a/smart-hadoop-support/smart-hadoop-3.1/src/test/java/org/smartdata/hdfs/action/TestErasureCodingAction.java b/smart-hadoop-support/smart-hadoop-3.1/src/test/java/org/smartdata/hdfs/action/TestErasureCodingAction.java new file mode 100644 index 00000000000..928f5e01ff5 --- /dev/null +++ b/smart-hadoop-support/smart-hadoop-3.1/src/test/java/org/smartdata/hdfs/action/TestErasureCodingAction.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.smartdata.hdfs.action; + +import org.apache.hadoop.fs.Path; +import static org.junit.Assert.*; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +public class TestErasureCodingAction extends TestErasureCodingMiniCluster { + + @Test + public void testEcActionForFile() throws Exception { + String srcPath = "/ec/test_file"; + createTestFile(srcPath, 1000); + // the file is stored in replication + assertEquals(null, dfsClient.getErasureCodingPolicy(srcPath)); + + ErasureCodingAction ecAction = new ErasureCodingAction(); + ecAction.setContext(smartContext); + String ecTmpPath = "/ssm/ec_tmp/tmp_file"; + Map args = new HashMap<>(); + args.put(HdfsAction.FILE_PATH, srcPath); + args.put(ErasureCodingBase.EC_TMP, ecTmpPath); + args.put(ErasureCodingAction.EC_POLICY_NAME, ecPolicy.getName()); + ecAction.init(args); + ecAction.run(); + assertTrue(ecAction.getExpectedAfterRun()); + // the file is stored in ec with default policy + assertEquals(dfsClient.getErasureCodingPolicy(srcPath), ecPolicy); + } + + @Test + public void testEcActionForDir() throws Exception { + String srcDirPath = "/test_dir/"; + dfs.mkdirs(new Path(srcDirPath)); + assertEquals(null, dfsClient.getErasureCodingPolicy(srcDirPath)); + + ErasureCodingAction ecAction = new ErasureCodingAction(); + ecAction.setContext(smartContext); + Map args = new HashMap<>(); + args.put(HdfsAction.FILE_PATH, srcDirPath); + args.put(ErasureCodingAction.EC_POLICY_NAME, ecPolicy.getName()); + ecAction.init(args); + ecAction.run(); + assertTrue(ecAction.getExpectedAfterRun()); + assertEquals(dfsClient.getErasureCodingPolicy(srcDirPath), ecPolicy); + + String srcFilePath = "/test_dir/test_file"; + createTestFile(srcFilePath, 1000); + // The newly created file should has the same EC policy as parent directory. + assertEquals(dfsClient.getErasureCodingPolicy(srcFilePath), ecPolicy); + } +} \ No newline at end of file diff --git a/smart-hadoop-support/smart-hadoop-3.1/src/test/java/org/smartdata/hdfs/action/TestErasureCodingMiniCluster.java b/smart-hadoop-support/smart-hadoop-3.1/src/test/java/org/smartdata/hdfs/action/TestErasureCodingMiniCluster.java new file mode 100644 index 00000000000..87daf8ee54a --- /dev/null +++ b/smart-hadoop-support/smart-hadoop-3.1/src/test/java/org/smartdata/hdfs/action/TestErasureCodingMiniCluster.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.smartdata.hdfs.action; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.*; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; +import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies; +import org.junit.After; +import org.junit.Before; +import org.smartdata.SmartContext; +import org.smartdata.conf.SmartConf; +import org.smartdata.conf.SmartConfKeys; +import org.smartdata.hdfs.MiniClusterHarness; + +import java.io.IOException; + +public class TestErasureCodingMiniCluster { + protected ErasureCodingPolicy ecPolicy; + // use the default one, not the one in MiniClusterHarness + public static final long BLOCK_SIZE = DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT; + protected MiniDFSCluster cluster; + protected DistributedFileSystem dfs; + protected DFSClient dfsClient; + protected SmartContext smartContext; + + @Before + public void init() throws Exception { + SmartConf conf = new SmartConf(); +// super.initConf(conf); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); + conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_DEFAULT); + conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT); + // use ErasureCodeConstants.XOR_2_1_SCHEMA + ecPolicy = SystemErasureCodingPolicies.getPolicies().get(3); + cluster = new MiniDFSCluster.Builder(conf). + numDataNodes(ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits()). + build(); + // Add namenode URL to smartContext + conf.set(SmartConfKeys.SMART_DFS_NAMENODE_RPCSERVER_KEY, + "hdfs://" + cluster.getNameNode().getNameNodeAddressHostPortString()); + smartContext = new SmartContext(conf); + cluster.waitActive(); + dfs = cluster.getFileSystem(); + dfsClient = dfs.getClient(); + dfsClient.enableErasureCodingPolicy(ecPolicy.getName()); + } + + public void createTestFile(String srcPath, long length) throws IOException { +// DFSTestUtil.createFile(dfs, new Path(srcPath), length, (short) 3, 0L); + DFSTestUtil.createFile(dfs, new Path(srcPath), 1024, length, BLOCK_SIZE, (short) 3, 0L); + } + + @After + public void shutdown() throws IOException { + if (cluster != null) { + cluster.shutdown(); + } + } +} diff --git a/smart-hadoop-support/smart-hadoop-3.1/src/test/java/org/smartdata/hdfs/action/TestUnErasureCodingAction.java b/smart-hadoop-support/smart-hadoop-3.1/src/test/java/org/smartdata/hdfs/action/TestUnErasureCodingAction.java new file mode 100644 index 00000000000..a0bb314a5a7 --- /dev/null +++ b/smart-hadoop-support/smart-hadoop-3.1/src/test/java/org/smartdata/hdfs/action/TestUnErasureCodingAction.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.smartdata.hdfs.action; + +import org.apache.hadoop.fs.Path; +import static org.junit.Assert.*; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +public class TestUnErasureCodingAction extends TestErasureCodingMiniCluster { + + @Test + public void testUnEcActionForFile() throws Exception { + String testDir = "/test_dir"; + dfs.mkdirs(new Path(testDir)); + dfs.setErasureCodingPolicy(new Path(testDir), ecPolicy.getName()); + // create test file, its EC policy should be consistent with parent dir, i.e., ecPolicy + String srcPath = testDir + "/ec_file"; + createTestFile(srcPath, 1000); + assertEquals(dfsClient.getErasureCodingPolicy(srcPath), ecPolicy); + + UnErasureCodingAction unEcAction = new UnErasureCodingAction(); + unEcAction.setContext(smartContext); + String ecTmpPath = "/ssm/ec_tmp/tmp_file"; + Map args = new HashMap<>(); + args.put(HdfsAction.FILE_PATH, srcPath); + args.put(ErasureCodingBase.EC_TMP, ecTmpPath); + unEcAction.init(args); + unEcAction.run(); + assertTrue(unEcAction.getExpectedAfterRun()); + assertNull(dfsClient.getErasureCodingPolicy(srcPath)); + } + + @Test + public void testUnEcActionForDir() throws Exception { + String testDir = "/test_dir"; + dfs.mkdirs(new Path(testDir)); + dfs.setErasureCodingPolicy(new Path(testDir), ecPolicy.getName()); + assertEquals(dfsClient.getErasureCodingPolicy(testDir), ecPolicy); + + UnErasureCodingAction unEcAction = new UnErasureCodingAction(); + unEcAction.setContext(smartContext); + Map args = new HashMap<>(); + args.put(HdfsAction.FILE_PATH, testDir); + unEcAction.init(args); + unEcAction.run(); + assertNull(dfs.getErasureCodingPolicy(new Path(testDir))); + + // create test file, its EC policy is expected to be replication + String srcPath = testDir + "/ec_file"; + createTestFile(srcPath, 1000); + assertNull(dfs.getErasureCodingPolicy(new Path(srcPath))); + } +} diff --git a/smart-hadoop-support/smart-hadoop-common/pom.xml b/smart-hadoop-support/smart-hadoop-common/pom.xml index 495999122b1..616896860d5 100644 --- a/smart-hadoop-support/smart-hadoop-common/pom.xml +++ b/smart-hadoop-support/smart-hadoop-common/pom.xml @@ -31,6 +31,16 @@ jar + + org.smartdata + smart-common + 1.5.0-SNAPSHOT + + + org.smartdata + smart-action + 1.5.0-SNAPSHOT + org.apache.hadoop hadoop-hdfs @@ -62,5 +72,10 @@ test test-jar + + junit + junit + test + diff --git a/smart-hadoop-support/smart-hadoop-common/src/main/java/org/smartdata/hdfs/CompatibilityHelper.java b/smart-hadoop-support/smart-hadoop-common/src/main/java/org/smartdata/hdfs/CompatibilityHelper.java index aad542a1b20..6cb4e0d1b9a 100644 --- a/smart-hadoop-support/smart-hadoop-common/src/main/java/org/smartdata/hdfs/CompatibilityHelper.java +++ b/smart-hadoop-support/smart-hadoop-common/src/main/java/org/smartdata/hdfs/CompatibilityHelper.java @@ -88,4 +88,8 @@ HdfsFileStatus createHdfsFileStatus( long length, boolean isdir, int block_replication, long blocksize, long modification_time, long access_time, FsPermission permission, String owner, String group, byte[] symlink, byte[] path, long fileId, int childrenNum, FileEncryptionInfo feInfo, byte storagePolicy); + + byte getErasureCodingPolicy(HdfsFileStatus fileStatus); + + byte getErasureCodingPolicyByName(DFSClient client, String ecPolicyName) throws IOException; } diff --git a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/HadoopUtil.java b/smart-hadoop-support/smart-hadoop-common/src/main/java/org/smartdata/hdfs/HadoopUtil.java similarity index 99% rename from smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/HadoopUtil.java rename to smart-hadoop-support/smart-hadoop-common/src/main/java/org/smartdata/hdfs/HadoopUtil.java index 6ccd040c925..febf610f025 100644 --- a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/HadoopUtil.java +++ b/smart-hadoop-support/smart-hadoop-common/src/main/java/org/smartdata/hdfs/HadoopUtil.java @@ -254,6 +254,7 @@ public static FileInfo convertFileStatus(HdfsFileStatus status, String path) { .setOwner(status.getOwner()) .setGroup(status.getGroup()) .setStoragePolicy(status.getStoragePolicy()) + .setErasureCodingPolicy(CompatibilityHelperLoader.getHelper().getErasureCodingPolicy(status)) .build(); } diff --git a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/action/HdfsAction.java b/smart-hadoop-support/smart-hadoop-common/src/main/java/org/smartdata/hdfs/action/HdfsAction.java similarity index 100% rename from smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/action/HdfsAction.java rename to smart-hadoop-support/smart-hadoop-common/src/main/java/org/smartdata/hdfs/action/HdfsAction.java diff --git a/smart-hadoop-support/smart-hadoop/src/test/java/org/smartdata/hdfs/MiniClusterHarness.java b/smart-hadoop-support/smart-hadoop-common/src/test/java/org/smartdata/hdfs/MiniClusterHarness.java similarity index 100% rename from smart-hadoop-support/smart-hadoop/src/test/java/org/smartdata/hdfs/MiniClusterHarness.java rename to smart-hadoop-support/smart-hadoop-common/src/test/java/org/smartdata/hdfs/MiniClusterHarness.java diff --git a/smart-hadoop-support/smart-hadoop/src/test/java/org/smartdata/hdfs/MiniClusterWithStoragesHarness.java b/smart-hadoop-support/smart-hadoop-common/src/test/java/org/smartdata/hdfs/MiniClusterWithStoragesHarness.java similarity index 100% rename from smart-hadoop-support/smart-hadoop/src/test/java/org/smartdata/hdfs/MiniClusterWithStoragesHarness.java rename to smart-hadoop-support/smart-hadoop-common/src/test/java/org/smartdata/hdfs/MiniClusterWithStoragesHarness.java diff --git a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/action/HdfsActionFactory.java b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/action/HdfsActionFactory.java index 3d20ab0cd25..55a0ec5caa2 100644 --- a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/action/HdfsActionFactory.java +++ b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/action/HdfsActionFactory.java @@ -17,12 +17,25 @@ */ package org.smartdata.hdfs.action; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.smartdata.action.AbstractActionFactory; +import org.smartdata.action.SmartAction; +import org.smartdata.hdfs.scheduler.ErasureCodingScheduler; + +import java.util.Arrays; +import java.util.List; /** * Built-in smart actions for HDFS system. */ public class HdfsActionFactory extends AbstractActionFactory { + private static final Logger LOG = LoggerFactory.getLogger(HdfsActionFactory.class); + public static final List HDFS3_ACTION_CLASSES = Arrays.asList( + "org.smartdata.hdfs.action.ListErasureCodingPolicy", + "org.smartdata.hdfs.action.CheckErasureCodingPolicy", + "org.smartdata.hdfs.action.ErasureCodingAction", + "org.smartdata.hdfs.action.UnErasureCodingAction"); static { addAction(AllSsdFileAction.class); @@ -37,7 +50,6 @@ public class HdfsActionFactory extends AbstractActionFactory { addAction(WriteFileAction.class); addAction(CheckStorageAction.class); addAction(SetXAttrAction.class); -// addAction("stripec", StripErasureCodeFileAction.class); // addAction("blockec", BlockErasureCodeFileAction.class); addAction(CopyFileAction.class); addAction(DeleteFileAction.class); @@ -57,5 +69,18 @@ public class HdfsActionFactory extends AbstractActionFactory { // addAction("diskbalance", DiskBalanceAction.class); // addAction("clusterbalance", ClusterBalanceAction.class); // addAction("setstoragepolicy", SetStoragePolicyAction.class); + if (ErasureCodingScheduler.isECSupported()) { + ClassLoader loader = Thread.currentThread().getContextClassLoader(); + if (loader == null) { + loader = ClassLoader.getSystemClassLoader(); + } + try { + for (String classString : HDFS3_ACTION_CLASSES) { + addAction((Class) loader.loadClass(classString)); + } + } catch (ClassNotFoundException ex) { + LOG.error("Class not found!", ex); + } + } } } diff --git a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/action/MetaDataAction.java b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/action/MetaDataAction.java index a816b99dfa9..169acf66614 100644 --- a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/action/MetaDataAction.java +++ b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/action/MetaDataAction.java @@ -111,7 +111,8 @@ protected void execute() throws Exception { permission, ownerName, groupName, - (byte) 1); + (byte) 1, + (byte) 0); changeFileMetaData(srcPath, fileInfo); } diff --git a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/metric/fetcher/InotifyEventApplier.java b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/metric/fetcher/InotifyEventApplier.java index f12df882b01..c419d7ec981 100644 --- a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/metric/fetcher/InotifyEventApplier.java +++ b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/metric/fetcher/InotifyEventApplier.java @@ -21,8 +21,10 @@ import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.inotify.Event; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.io.WritableUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.smartdata.hdfs.CompatibilityHelperLoader; import org.smartdata.hdfs.HadoopUtil; import org.smartdata.metastore.DBType; import org.smartdata.metastore.MetaStore; @@ -32,6 +34,8 @@ import org.smartdata.model.FileDiffType; import org.smartdata.model.FileInfo; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -59,7 +63,7 @@ public void apply(List events) throws IOException, MetaStoreException { List statements = new ArrayList<>(); for (Event event : events) { List gen = getSqlStatement(event); - if (gen != null && !gen.isEmpty()){ + if (gen != null && !gen.isEmpty()) { for (String s : gen) { if (s != null && s.length() > 0) { statements.add(s); @@ -234,6 +238,12 @@ private List getRenameSql(Event.RenameEvent renameEvent) if (status == null) { LOG.debug("Get rename dest status failed, {} -> {}", src, dest); } + // The dest path which the src is renamed to should be checked in file table + // to avoid duplicated record for one same path. + FileInfo destInfo = metaStore.getFile(dest); + if (destInfo != null) { + metaStore.deleteFileByPath(dest); + } if (info == null) { if (status != null) { info = HadoopUtil.convertFileStatus(status, dest); @@ -335,6 +345,7 @@ private String getMetaDataUpdateSql(Event.MetadataUpdateEvent metadataUpdateEven "UPDATE file SET block_replication = %s WHERE path = '%s';", metadataUpdateEvent.getReplication(), metadataUpdateEvent.getPath()); case XATTRS: + final String EC_POLICY = "hdfs.erasurecoding.policy"; //Todo if (LOG.isDebugEnabled()) { String message = "\n"; @@ -343,6 +354,24 @@ private String getMetaDataUpdateSql(Event.MetadataUpdateEvent metadataUpdateEven } LOG.debug(message); } + // The following code should be executed merely on HDFS3.x. + for (XAttr xAttr : metadataUpdateEvent.getxAttrs()) { + if (xAttr.getName().equals(EC_POLICY)) { + try { + String ecPolicyName = WritableUtils.readString( + new DataInputStream(new ByteArrayInputStream(xAttr.getValue()))); + byte ecPolicyId = CompatibilityHelperLoader.getHelper(). + getErasureCodingPolicyByName(client, ecPolicyName); + if (ecPolicyId == (byte) -1) { + LOG.error("Unrecognized EC policy for updating!"); + } + return String.format("UPDATE file SET ec_policy_id = %s WHERE path = '%s'", + ecPolicyId, metadataUpdateEvent.getPath()); + } catch (IOException ex) { + LOG.error("Error occurred for updating ecPolicy!", ex); + } + } + } break; case ACLS: return ""; diff --git a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/metric/fetcher/NamespaceFetcher.java b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/metric/fetcher/NamespaceFetcher.java index 7d194101f1c..79057040eb4 100644 --- a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/metric/fetcher/NamespaceFetcher.java +++ b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/metric/fetcher/NamespaceFetcher.java @@ -24,6 +24,7 @@ import org.slf4j.LoggerFactory; import org.smartdata.conf.SmartConf; import org.smartdata.conf.SmartConfKeys; +import org.smartdata.hdfs.CompatibilityHelperLoader; import org.smartdata.metastore.MetaStoreException; import org.smartdata.model.FileInfo; import org.smartdata.metastore.MetaStore; @@ -307,7 +308,8 @@ private FileInfo convertToFileInfo(HdfsFileStatus status, String parent) { status.getPermission().toShort(), status.getOwner(), status.getGroup(), - status.getStoragePolicy()); + status.getStoragePolicy(), + CompatibilityHelperLoader.getHelper().getErasureCodingPolicy(status)); return fileInfo; } } diff --git a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/scheduler/ErasureCodingScheduler.java b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/scheduler/ErasureCodingScheduler.java new file mode 100644 index 00000000000..06d3b673471 --- /dev/null +++ b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/scheduler/ErasureCodingScheduler.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.smartdata.hdfs.scheduler; + +import org.apache.hadoop.util.VersionInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.smartdata.SmartContext; +import org.smartdata.conf.SmartConf; +import org.smartdata.hdfs.action.*; +import org.smartdata.metastore.MetaStore; +import org.smartdata.metastore.MetaStoreException; +import org.smartdata.model.ActionInfo; +import org.smartdata.model.LaunchAction; +import org.smartdata.model.action.ScheduleResult; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public class ErasureCodingScheduler extends ActionSchedulerService { + public static final Logger LOG = LoggerFactory.getLogger(ErasureCodingScheduler.class); + public static final String ecActionID = "ec"; + public static final String unecActionID = "unec"; + public static final String checkecActionID = "checkec"; + public static final String listecActionID = "listec"; + public static final List actions = + Arrays.asList(ecActionID, unecActionID, checkecActionID, listecActionID); + public static final String EC_DIR = "/system/ssm/ec_tmp/"; + public static final String EC_TMP = "-ecTmp"; + private Set fileLock; + private SmartConf conf; + private MetaStore metaStore; + + public ErasureCodingScheduler(SmartContext context, MetaStore metaStore) { + super(context, metaStore); + this.conf = context.getConf(); + this.metaStore = metaStore; + } + + public List getSupportedActions() { + return actions; + } + + public void init() throws IOException { + fileLock = new HashSet<>(); + } + + @Override + public void start() throws IOException { + + } + + @Override + public void stop() throws IOException { + + } + + @Override + public boolean onSubmit(ActionInfo actionInfo) throws IOException { + if (!isECSupported()) { + throw new IOException(actionInfo.getActionName() + + " is not supported on " + VersionInfo.getVersion()); + } + if (!actionInfo.getActionName().equals(listecActionID)) { + if (actionInfo.getArgs().get(HdfsAction.FILE_PATH) == null) { + throw new IOException("No src path is given!"); + } + } + return true; + } + + public static boolean isECSupported() { + String[] parts = VersionInfo.getVersion().split("\\."); + return Integer.parseInt(parts[0]) == 3; + } + + @Override + public ScheduleResult onSchedule(ActionInfo actionInfo, LaunchAction action) { + if (!actions.contains(action.getActionType())) { + return ScheduleResult.SUCCESS; + } + + if (actionInfo.getActionName().equals(listecActionID)) { + return ScheduleResult.SUCCESS; + } + + String srcPath = action.getArgs().get(HdfsAction.FILE_PATH); + if (srcPath == null) { + actionInfo.appendLog("No file is given in this action!"); + return ScheduleResult.FAIL; + } + + if (actionInfo.getActionName().equals(checkecActionID)) { + return ScheduleResult.SUCCESS; + } + + // check file lock merely for ec & unec action + if (fileLock.contains(srcPath)) { + return ScheduleResult.FAIL; + } + try { + if (!metaStore.getFile(srcPath).isdir()) { + // For ec or unec, add ecTmp argument + String tmpName = createTmpName(action); + action.getArgs().put(EC_TMP, EC_DIR + tmpName); + actionInfo.getArgs().put(EC_TMP, EC_DIR + tmpName); + } + } catch (MetaStoreException ex) { + LOG.error("Error occurred for getting file info", ex); + actionInfo.appendLog(ex.getMessage()); + return ScheduleResult.FAIL; + } + // lock the file only if ec or unec action is scheduled + fileLock.add(srcPath); + return ScheduleResult.SUCCESS; + } + + private String createTmpName(LaunchAction action) { + String path = action.getArgs().get(HdfsAction.FILE_PATH); + String fileName; + int index = path.lastIndexOf("/"); + if (index == path.length() - 1) { + index = path.substring(0, path.length() - 1).indexOf("/"); + fileName = path.substring(index + 1, path.length() - 1); + } else { + fileName = path.substring(index + 1, path.length()); + } + /** + * The dest tmp file is under EC_DIR and + * named by fileName, aidxxx and current time in millisecond with "_" separated + */ + String tmpName = fileName + "_" + "aid" + action.getActionId() + + "_" + System.currentTimeMillis(); + return tmpName; + } + + @Override + public void onActionFinished(ActionInfo actionInfo) { + if (actionInfo.getActionName().equals(ecActionID) || + actionInfo.getActionName().equals(unecActionID)) { + fileLock.remove(actionInfo.getArgs().get(HdfsAction.FILE_PATH)); + } + } +} diff --git a/smart-hadoop-support/smart-hadoop/src/test/java/org/smartdata/hdfs/metric/fetcher/TestCachedListFetcher.java b/smart-hadoop-support/smart-hadoop/src/test/java/org/smartdata/hdfs/metric/fetcher/TestCachedListFetcher.java index b2285f36126..198c40c20ed 100644 --- a/smart-hadoop-support/smart-hadoop/src/test/java/org/smartdata/hdfs/metric/fetcher/TestCachedListFetcher.java +++ b/smart-hadoop-support/smart-hadoop/src/test/java/org/smartdata/hdfs/metric/fetcher/TestCachedListFetcher.java @@ -105,10 +105,11 @@ private FileInfo createFileStatus(String pathString) { String group = "admin"; long fileId = fid; byte storagePolicy = 0; + byte erasureCodingPolicy = 0; fid++; return new FileInfo(pathString, fileId, length, isDir, (short)blockReplication, blockSize, modTime, accessTime, - (short) 1, owner, group, storagePolicy); + (short) 1, owner, group, storagePolicy, erasureCodingPolicy); } @Test diff --git a/smart-metastore/src/main/java/org/smartdata/metastore/MetaStore.java b/smart-metastore/src/main/java/org/smartdata/metastore/MetaStore.java index 18a86717664..958bffaeb31 100644 --- a/smart-metastore/src/main/java/org/smartdata/metastore/MetaStore.java +++ b/smart-metastore/src/main/java/org/smartdata/metastore/MetaStore.java @@ -686,7 +686,7 @@ public List listFileActions(long rid, // Add a mock fileInfo fileInfo = new FileInfo(filePath, 0L, 0L, false, (short) 0, 0L, 0L, 0L, (short) 0, - "root", "root", (byte) 0); + "root", "root", (byte) 0, (byte) 0); } detailedFileAction.setFileLength(fileInfo.getLength()); detailedFileAction.setFilePath(filePath); @@ -723,7 +723,7 @@ public List listFileActions(long rid, long start, long offse // Add a mock fileInfo fileInfo = new FileInfo(filePath, 0L, 0L, false, (short) 0, 0L, 0L, 0L, (short) 0, - "root", "root", (byte) 0); + "root", "root", (byte) 0, (byte) 0); } detailedFileAction.setFileLength(fileInfo.getLength()); detailedFileAction.setFilePath(filePath); diff --git a/smart-metastore/src/main/java/org/smartdata/metastore/dao/FileInfoDao.java b/smart-metastore/src/main/java/org/smartdata/metastore/dao/FileInfoDao.java index 852d1c4ddb9..276b89070b7 100644 --- a/smart-metastore/src/main/java/org/smartdata/metastore/dao/FileInfoDao.java +++ b/smart-metastore/src/main/java/org/smartdata/metastore/dao/FileInfoDao.java @@ -173,6 +173,7 @@ private Map toMap(FileInfo fileInfo) { parameters .put("owner_group", fileInfo.getGroup()); parameters.put("permission", fileInfo.getPermission()); + parameters.put("ec_policy_id", fileInfo.getErasureCodingPolicy()); return parameters; } @@ -191,7 +192,8 @@ public FileInfo mapRow(ResultSet resultSet, int i) resultSet.getShort("permission"), resultSet.getString("owner"), resultSet.getString("owner_group"), - resultSet.getByte("sid") + resultSet.getByte("sid"), + resultSet.getByte("ec_policy_id") ); return fileInfo; } diff --git a/smart-metastore/src/test/java/org/smartdata/metastore/TestMetaStore.java b/smart-metastore/src/test/java/org/smartdata/metastore/TestMetaStore.java index a4d43195199..fa3bf8d44f0 100644 --- a/smart-metastore/src/test/java/org/smartdata/metastore/TestMetaStore.java +++ b/smart-metastore/src/test/java/org/smartdata/metastore/TestMetaStore.java @@ -173,6 +173,7 @@ public void testGetFiles() throws Exception { String group = "admin"; long fileId = 56L; byte storagePolicy = 0; + byte erasureCodingPolicy = 0; FileInfo fileInfo = new FileInfo( pathString, @@ -186,7 +187,8 @@ public void testGetFiles() throws Exception { (short) 1, owner, group, - storagePolicy); + storagePolicy, + erasureCodingPolicy); metaStore.insertFile(fileInfo); FileInfo dbFileInfo = metaStore.getFile(56); Assert.assertTrue(dbFileInfo.equals(fileInfo)); @@ -292,6 +294,7 @@ public void testMoveSyncRules() throws Exception { String group = "admin"; long fileId = 56L; byte storagePolicy = 0; + byte erasureCodingPolicy = 0; FileInfo fileInfo = new FileInfo( pathString, @@ -305,7 +308,8 @@ public void testMoveSyncRules() throws Exception { (short) 1, owner, group, - storagePolicy); + storagePolicy, + erasureCodingPolicy); metaStore.insertFile(fileInfo); Map args = new HashMap(); args.put("-file", "/src/1"); @@ -417,6 +421,7 @@ public void testInsetFiles() throws Exception { String group = "admin"; long fileId = 312321L; byte storagePolicy = 0; + byte erasureCodingPolicy = 0; FileInfo[] files = { new FileInfo( pathString, @@ -430,7 +435,8 @@ public void testInsetFiles() throws Exception { (short) 1, owner, group, - storagePolicy) + storagePolicy, + erasureCodingPolicy) }; metaStore.insertFiles(files); FileInfo dbFileInfo = metaStore.getFile("/tmp/testFile"); diff --git a/smart-metastore/src/test/java/org/smartdata/metastore/dao/TestAccessCountTableManager.java b/smart-metastore/src/test/java/org/smartdata/metastore/dao/TestAccessCountTableManager.java index ce643d69269..491ac35a747 100644 --- a/smart-metastore/src/test/java/org/smartdata/metastore/dao/TestAccessCountTableManager.java +++ b/smart-metastore/src/test/java/org/smartdata/metastore/dao/TestAccessCountTableManager.java @@ -172,6 +172,7 @@ private void prepareFiles(MetaStore metaStore) throws MetaStoreException { (short) 1, "root", "admin", + (byte) 0, (byte) 0)); } metaStore.insertFiles(statusInternals.toArray(new FileInfo[0])); @@ -190,6 +191,7 @@ private void insertNewFile(MetaStore metaStore, String file, Long fid) (short) 1, "root", "admin", + (byte) 0, (byte) 0); metaStore.insertFile(finfo); } diff --git a/smart-metastore/src/test/java/org/smartdata/metastore/dao/TestFileInfoDao.java b/smart-metastore/src/test/java/org/smartdata/metastore/dao/TestFileInfoDao.java index 694b5d73c56..2980939f7f4 100644 --- a/smart-metastore/src/test/java/org/smartdata/metastore/dao/TestFileInfoDao.java +++ b/smart-metastore/src/test/java/org/smartdata/metastore/dao/TestFileInfoDao.java @@ -57,15 +57,16 @@ public void testInsetGetDeleteFiles() throws Exception { String group = "admin"; long fileId = 312321L; byte storagePolicy = 0; - FileInfo fileInfo = new FileInfo(path, fileId, length, isDir, blockReplication, - blockSize, modTime, accessTime, permission, owner, group, storagePolicy); + byte erasureCodingPolicy = 0; + FileInfo fileInfo = new FileInfo(path, fileId, length, isDir, blockReplication, blockSize, + modTime, accessTime, permission, owner, group, storagePolicy, erasureCodingPolicy); fileInfoDao.insert(fileInfo); FileInfo file1 = fileInfoDao.getByPath("/testFile"); Assert.assertTrue(fileInfo.equals(file1)); FileInfo file2 = fileInfoDao.getById(fileId); Assert.assertTrue(fileInfo.equals(file2)); - FileInfo fileInfo1 = new FileInfo(path, fileId + 1, length, isDir, blockReplication, - blockSize, modTime, accessTime, permission, owner, group, storagePolicy); + FileInfo fileInfo1 = new FileInfo(path, fileId + 1, length, isDir, blockReplication, blockSize, + modTime, accessTime, permission, owner, group, storagePolicy, erasureCodingPolicy); fileInfoDao.insert(fileInfo1); List fileInfos = fileInfoDao.getFilesByPrefix("/testaaFile"); Assert.assertTrue(fileInfos.size() == 0); @@ -93,12 +94,13 @@ public void testInsertUpdateFiles() throws Exception { String group = "admin"; long fileId = 312321L; byte storagePolicy = 0; + byte erasureCodingPolicy = 0; Map mapOwnerIdName = new HashMap<>(); mapOwnerIdName.put(1, "root"); Map mapGroupIdName = new HashMap<>(); mapGroupIdName.put(1, "admin"); - FileInfo fileInfo = new FileInfo(path, fileId, length, isDir, blockReplication, - blockSize, modTime, accessTime, permission, owner, group, storagePolicy); + FileInfo fileInfo = new FileInfo(path, fileId, length, isDir, blockReplication, blockSize, + modTime, accessTime, permission, owner, group, storagePolicy, erasureCodingPolicy); fileInfoDao.insert(fileInfo); fileInfoDao.update(path, 10); FileInfo file = fileInfoDao.getById(fileId); diff --git a/smart-metastore/src/test/java/org/smartdata/metastore/dao/TestTableAggregator.java b/smart-metastore/src/test/java/org/smartdata/metastore/dao/TestTableAggregator.java index 9ea42113767..6e31aad6b7f 100644 --- a/smart-metastore/src/test/java/org/smartdata/metastore/dao/TestTableAggregator.java +++ b/smart-metastore/src/test/java/org/smartdata/metastore/dao/TestTableAggregator.java @@ -109,6 +109,7 @@ private void prepareFiles(MetaStore metaStore) throws MetaStoreException { (short) 1, "root", "admin", + (byte) 0, (byte) 0)); } metaStore.insertFiles(statusInternals.toArray(new FileInfo[0])); diff --git a/smart-server/src/test/java/org/smartdata/server/engine/rule/TestRuleManager.java b/smart-server/src/test/java/org/smartdata/server/engine/rule/TestRuleManager.java index 1b8c14691ed..7df4ed80423 100644 --- a/smart-server/src/test/java/org/smartdata/server/engine/rule/TestRuleManager.java +++ b/smart-server/src/test/java/org/smartdata/server/engine/rule/TestRuleManager.java @@ -303,7 +303,7 @@ public void testMultiThreadChangeState() throws Exception { long length = 100; long fid = 10000; FileInfo[] files = { new FileInfo("/tmp/testfile", fid, length, false, (short) 3, - 1024, now, now, (short) 1, null, null, (byte) 3) }; + 1024, now, now, (short) 1, null, null, (byte) 3, (byte) 0) }; metaStore.insertFiles(files); long rid = ruleManager.submitRule(rule, RuleState.ACTIVE);