diff --git a/smart-common/src/main/java/org/smartdata/SmartConstants.java b/smart-common/src/main/java/org/smartdata/SmartConstants.java
index 9206d43c1f8..70ff96e9ae7 100644
--- a/smart-common/src/main/java/org/smartdata/SmartConstants.java
+++ b/smart-common/src/main/java/org/smartdata/SmartConstants.java
@@ -28,7 +28,8 @@ public class SmartConstants {
"org.smartdata.hdfs.scheduler.MoverScheduler, "
+ "org.smartdata.hdfs.scheduler.CopyScheduler, "
+ "org.smartdata.hdfs.scheduler.Copy2S3Scheduler,"
- + "org.smartdata.hdfs.scheduler.SmallFileScheduler";
+ + "org.smartdata.hdfs.scheduler.SmallFileScheduler,"
+ + "org.smartdata.hdfs.scheduler.ErasureCodingScheduler";
public static final String SMART_HADOOP_LAST_INOTIFY_TXID =
"smart_hadoop_last_inotify_txid";
diff --git a/smart-common/src/main/java/org/smartdata/model/FileInfo.java b/smart-common/src/main/java/org/smartdata/model/FileInfo.java
index 3e119cb600f..1cab069c27f 100644
--- a/smart-common/src/main/java/org/smartdata/model/FileInfo.java
+++ b/smart-common/src/main/java/org/smartdata/model/FileInfo.java
@@ -32,11 +32,12 @@ public class FileInfo {
private String owner;
private String group;
private byte storagePolicy;
+ private byte erasureCodingPolicy;
public FileInfo(String path, long fileId, long length, boolean isdir,
short blockReplication, long blocksize, long modificationTime,
long accessTime, short permission, String owner, String group,
- byte storagePolicy) {
+ byte storagePolicy, byte erasureCodingPolicy) {
this.path = path;
this.fileId = fileId;
this.length = length;
@@ -49,6 +50,7 @@ public FileInfo(String path, long fileId, long length, boolean isdir,
this.owner = owner;
this.group = group;
this.storagePolicy = storagePolicy;
+ this.erasureCodingPolicy = erasureCodingPolicy;
}
public String getPath() {
@@ -147,6 +149,14 @@ public void setStoragePolicy(byte storagePolicy) {
this.storagePolicy = storagePolicy;
}
+ public byte getErasureCodingPolicy() {
+ return erasureCodingPolicy;
+ }
+
+ public void setErasureCodingPolicy(byte erasureCodingPolicy) {
+ this.erasureCodingPolicy = erasureCodingPolicy;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -165,6 +175,7 @@ public boolean equals(Object o) {
&& accessTime == fileInfo.accessTime
&& permission == fileInfo.permission
&& storagePolicy == fileInfo.storagePolicy
+ && erasureCodingPolicy == fileInfo.erasureCodingPolicy
&& Objects.equals(path, fileInfo.path)
&& Objects.equals(owner, fileInfo.owner)
&& Objects.equals(group, fileInfo.group);
@@ -184,7 +195,8 @@ public int hashCode() {
permission,
owner,
group,
- storagePolicy);
+ storagePolicy,
+ erasureCodingPolicy);
}
public static Builder newBuilder() {
@@ -196,7 +208,7 @@ public String toString() {
return String.format(
"FileInfo{path=\'%s\', fileId=%s, length=%s, isdir=%s, blockReplication=%s, "
+ "blocksize=%s, modificationTime=%s, accessTime=%s, permission=%s, owner=\'%s\', "
- + "group=\'%s\', storagePolicy=%s}",
+ + "group=\'%s\', storagePolicy=%s, erasureCodingPolicy=%s}",
path,
fileId,
length,
@@ -208,7 +220,8 @@ public String toString() {
permission,
owner,
group,
- storagePolicy);
+ storagePolicy,
+ erasureCodingPolicy);
}
public static class Builder {
@@ -224,6 +237,7 @@ public static class Builder {
private String owner;
private String group;
private byte storagePolicy;
+ private byte erasureCodingPolicy;
public Builder setPath(String path) {
this.path = path;
@@ -285,10 +299,15 @@ public Builder setStoragePolicy(byte storagePolicy) {
return this;
}
+ public Builder setErasureCodingPolicy(byte erasureCodingPolicy) {
+ this.erasureCodingPolicy = erasureCodingPolicy;
+ return this;
+ }
+
public FileInfo build() {
return new FileInfo(path, fileId, length, isdir, blockReplication,
blocksize, modificationTime, accessTime, permission, owner,
- group, storagePolicy);
+ group, storagePolicy, erasureCodingPolicy);
}
@Override
@@ -296,7 +315,7 @@ public String toString() {
return String.format(
"Builder{path=\'%s\', fileId=%s, length=%s, isdir=%s, blockReplication=%s, "
+ "blocksize=%s, modificationTime=%s, accessTime=%s, permission=%s, owner=\'%s\', "
- + "group=\'%s\', storagePolicy=\'%s\'}",
+ + "group=\'%s\', storagePolicy=%s, erasureCodingPolicy=%s}",
path,
fileId,
length,
@@ -308,7 +327,8 @@ public String toString() {
permission,
owner,
group,
- storagePolicy);
+ storagePolicy,
+ erasureCodingPolicy);
}
}
}
diff --git a/smart-common/src/test/java/org/smartdata/model/TestFileInfo.java b/smart-common/src/test/java/org/smartdata/model/TestFileInfo.java
index cacaf219909..269b119603a 100644
--- a/smart-common/src/test/java/org/smartdata/model/TestFileInfo.java
+++ b/smart-common/src/test/java/org/smartdata/model/TestFileInfo.java
@@ -36,7 +36,8 @@ public ChildFileInfo(
short permission,
String owner,
String group,
- byte storagePolicy) {
+ byte storagePolicy,
+ byte erasureCodingPolicy) {
super(
path,
fileId,
@@ -49,7 +50,8 @@ public ChildFileInfo(
permission,
owner,
group,
- storagePolicy);
+ storagePolicy,
+ erasureCodingPolicy);
}
}
@@ -57,37 +59,43 @@ public ChildFileInfo(
public void testEquals() throws Exception {
//Case 1:
FileInfo fileInfo =
- new FileInfo(" ", 1, 1, true, (short) 1, 1, 1, 1, (short) 1, " ", " ", (byte) 1);
+ new FileInfo(" ", 1, 1, true, (short) 1, 1, 1,
+ 1, (short) 1, " ", " ", (byte) 1, (byte) 0);
Assert.assertEquals(true, fileInfo.equals(fileInfo));
//Case 2:
FileInfo fileInfo1 =
- new FileInfo(" ", 1, 1, true, (short) 1, 1, 1, 1, (short) 1, " ", " ", (byte) 1);
+ new FileInfo(" ", 1, 1, true, (short) 1, 1, 1,
+ 1, (short) 1, " ", " ", (byte) 1, (byte) 0);
Assert.assertEquals(true, fileInfo.equals(fileInfo1));
//Case 3:
FileInfo fileInfo2 =
- new FileInfo(" ", 1, 1, true, (short) 1, 1, 1, 1, (short) 1, null, " ", (byte) 1);
+ new FileInfo(" ", 1, 1, true, (short) 1, 1, 1,
+ 1, (short) 1, null, " ", (byte) 1, (byte) 0);
Assert.assertEquals(false, fileInfo.equals(fileInfo2));
Assert.assertEquals(false, fileInfo2.equals(fileInfo));
//Case 4:
FileInfo fileInfo3 =
- new FileInfo(null, 1, 1, true, (short) 1, 1, 1, 1, (short) 1, " ", " ", (byte) 1);
+ new FileInfo(null, 1, 1, true, (short) 1, 1, 1,
+ 1, (short) 1, " ", " ", (byte) 1, (byte) 0);
Assert.assertEquals(false, fileInfo.equals(fileInfo3));
Assert.assertEquals(false, fileInfo3.equals(fileInfo));
//Case 5:
FileInfo fileInfo4 =
- new FileInfo(null, 1, 1, true, (short) 1, 1, 1, 1, (short) 1, " ", null, (byte) 1);
+ new FileInfo(null, 1, 1, true, (short) 1, 1, 1,
+ 1, (short) 1, " ", null, (byte) 1, (byte) 0);
Assert.assertEquals(false, fileInfo.equals(fileInfo4));
Assert.assertEquals(false, fileInfo4.equals(fileInfo));
//Case 6:
FileInfo fileInfo5 =
- new FileInfo(" ", 1, 1, true, (short) 1, 1, 1, 1, (short) 2, " ", " ", (byte) 1);
+ new FileInfo(" ", 1, 1, true, (short) 1, 1, 1,
+ 1, (short) 2, " ", " ", (byte) 1, (byte) 0);
Assert.assertEquals(false, fileInfo.equals(fileInfo5));
}
}
diff --git a/smart-hadoop-support/smart-hadoop-2/src/main/java/org/smartdata/hdfs/CompatibilityHelper2.java b/smart-hadoop-support/smart-hadoop-2/src/main/java/org/smartdata/hdfs/CompatibilityHelper2.java
index 935321b452e..9c8d31fe66a 100644
--- a/smart-hadoop-support/smart-hadoop-2/src/main/java/org/smartdata/hdfs/CompatibilityHelper2.java
+++ b/smart-hadoop-support/smart-hadoop-2/src/main/java/org/smartdata/hdfs/CompatibilityHelper2.java
@@ -20,6 +20,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
@@ -62,4 +63,13 @@ public HdfsFileStatus createHdfsFileStatus(
length, isdir, block_replication, blocksize, modification_time, access_time, permission,
owner, group, symlink, path, fileId, childrenNum, feInfo, storagePolicy);
}
+
+ public byte getErasureCodingPolicy(HdfsFileStatus fileStatus) {
+ // for HDFS2.x, the erasure policy is always replication whose id is 0 in HDFS.
+ return (byte) 0;
+ }
+
+ public byte getErasureCodingPolicyByName(DFSClient client, String ecPolicyName) throws IOException {
+ return (byte) 0;
+ }
}
\ No newline at end of file
diff --git a/smart-hadoop-support/smart-hadoop-3.1/pom.xml b/smart-hadoop-support/smart-hadoop-3.1/pom.xml
index a68ac82c0ae..b371795c186 100644
--- a/smart-hadoop-support/smart-hadoop-3.1/pom.xml
+++ b/smart-hadoop-support/smart-hadoop-3.1/pom.xml
@@ -70,6 +70,11 @@
smart-hadoop-common
1.5.0-SNAPSHOT
+
+ org.apache.hadoop
+ hadoop-aws
+ ${hadoop.version}
+
org.apache.hadoop
hadoop-hdfs
@@ -84,10 +89,27 @@
test
test-jar
+
+ org.mockito
+ mockito-all
+ test
+
org.apache.hadoop
- hadoop-aws
- ${hadoop.version}
+ hadoop-common
+ test
+ test-jar
+
+
+ org.apache.hadoop
+ hadoop-hdfs
+ test
+ test-jar
+
+
+ junit
+ junit
+ test
diff --git a/smart-hadoop-support/smart-hadoop-3.1/src/main/java/org/smartdata/hdfs/CompatibilityHelper31.java b/smart-hadoop-support/smart-hadoop-3.1/src/main/java/org/smartdata/hdfs/CompatibilityHelper31.java
index a1ce43f0205..c78723a864f 100644
--- a/smart-hadoop-support/smart-hadoop-3.1/src/main/java/org/smartdata/hdfs/CompatibilityHelper31.java
+++ b/smart-hadoop-support/smart-hadoop-3.1/src/main/java/org/smartdata/hdfs/CompatibilityHelper31.java
@@ -207,4 +207,27 @@ public HdfsFileStatus createHdfsFileStatus(
.storagePolicy(storagePolicy)
.build();
}
+
+ @Override
+ public byte getErasureCodingPolicy(HdfsFileStatus fileStatus) {
+ ErasureCodingPolicy erasureCodingPolicy = fileStatus.getErasureCodingPolicy();
+ // null means replication policy and its id is 0 in HDFS.
+ if (erasureCodingPolicy == null) {
+ return (byte) 0;
+ }
+ return fileStatus.getErasureCodingPolicy().getId();
+ }
+
+ @Override
+ public byte getErasureCodingPolicyByName(DFSClient client, String ecPolicyName) throws IOException {
+ if (ecPolicyName.equals(SystemErasureCodingPolicies.getReplicationPolicy().getName())) {
+ return (byte) 0;
+ }
+ for (ErasureCodingPolicyInfo policyInfo : client.getErasureCodingPolicies()) {
+ if (policyInfo.getPolicy().getName().equals(ecPolicyName)) {
+ return policyInfo.getPolicy().getId();
+ }
+ }
+ return (byte) -1;
+ }
}
\ No newline at end of file
diff --git a/smart-hadoop-support/smart-hadoop-3.1/src/main/java/org/smartdata/hdfs/action/CheckErasureCodingPolicy.java b/smart-hadoop-support/smart-hadoop-3.1/src/main/java/org/smartdata/hdfs/action/CheckErasureCodingPolicy.java
new file mode 100644
index 00000000000..84074bc3e73
--- /dev/null
+++ b/smart-hadoop-support/smart-hadoop-3.1/src/main/java/org/smartdata/hdfs/action/CheckErasureCodingPolicy.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.smartdata.hdfs.action;
+
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
+import org.smartdata.action.annotation.ActionSignature;
+import org.smartdata.conf.SmartConf;
+import org.smartdata.hdfs.HadoopUtil;
+
+import java.util.Map;
+
+/**
+ * An action to check the EC policy for a file or dir.
+ */
+@ActionSignature(
+ actionId = "checkec",
+ displayName = "checkec",
+ usage = HdfsAction.FILE_PATH + " $src"
+)
+public class CheckErasureCodingPolicy extends HdfsAction {
+ public static final String RESULT_OF_NULL_EC_POLICY =
+ "The EC policy is replication.";
+ private SmartConf conf;
+ private String srcPath;
+
+ @Override
+ public void init(Map args) {
+ super.init(args);
+ this.conf = getContext().getConf();
+ this.srcPath = args.get(HdfsAction.FILE_PATH);
+ }
+
+ @Override
+ public void execute() throws Exception {
+ this.setDfsClient(HadoopUtil.getDFSClient(
+ HadoopUtil.getNameNodeUri(conf), conf));
+ ErasureCodingPolicy srcEcPolicy = dfsClient.getErasureCodingPolicy(srcPath);
+ if (srcEcPolicy == null) {
+ appendResult(RESULT_OF_NULL_EC_POLICY);
+ } else {
+ appendResult(srcEcPolicy.toString());
+ }
+ }
+}
diff --git a/smart-hadoop-support/smart-hadoop-3.1/src/main/java/org/smartdata/hdfs/action/ErasureCodingAction.java b/smart-hadoop-support/smart-hadoop-3.1/src/main/java/org/smartdata/hdfs/action/ErasureCodingAction.java
new file mode 100644
index 00000000000..2b50d9a36ce
--- /dev/null
+++ b/smart-hadoop-support/smart-hadoop-3.1/src/main/java/org/smartdata/hdfs/action/ErasureCodingAction.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.smartdata.hdfs.action;
+
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyState;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.smartdata.action.ActionException;
+import org.smartdata.action.annotation.ActionSignature;
+import org.smartdata.conf.SmartConf;
+import org.smartdata.hdfs.HadoopUtil;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * An action to set an EC policy for a dir or convert a file to another one in an EC policy.
+ * Default values are used for arguments of policy & bufSize if their values are not given in this action.
+ */
+@ActionSignature(
+ actionId = "ec",
+ displayName = "ec",
+ usage = HdfsAction.FILE_PATH + " $src " + ErasureCodingAction.EC_POLICY_NAME + " $policy" +
+ ErasureCodingBase.BUF_SIZE + " $bufSize"
+)
+public class ErasureCodingAction extends ErasureCodingBase {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ErasureCodingAction.class);
+ public static final String EC_POLICY_NAME = "-policy";
+
+ private SmartConf conf;
+ private String ecPolicyName;
+
+ @Override
+ public void init(Map args) {
+ super.init(args);
+ this.conf = getContext().getConf();
+ this.srcPath = args.get(FILE_PATH);
+ if (args.containsKey(EC_TMP)) {
+ // this is a temp file kept for converting a file to another with other ec policy.
+ this.ecTmpPath = args.get(EC_TMP);
+ }
+ if (args.containsKey(EC_POLICY_NAME) && !args.get(EC_POLICY_NAME).isEmpty()) {
+ this.ecPolicyName = args.get(EC_POLICY_NAME);
+ } else {
+ String defaultEcPolicy = conf.getTrimmed(DFSConfigKeys.DFS_NAMENODE_EC_SYSTEM_DEFAULT_POLICY,
+ DFSConfigKeys.DFS_NAMENODE_EC_SYSTEM_DEFAULT_POLICY_DEFAULT);
+ this.ecPolicyName = defaultEcPolicy;
+ }
+ if (args.containsKey(BUF_SIZE) && !args.get(BUF_SIZE).isEmpty()) {
+ this.bufferSize = Integer.valueOf(args.get(BUF_SIZE));
+ }
+ this.progress = 0.0F;
+ }
+
+ @Override
+ protected void execute() throws Exception {
+ final String MATCH_RESULT =
+ "The current EC policy is matched with the target one.";
+ final String DIR_RESULT =
+ "The EC policy is set successfully for the given directory.";
+ final String CONVERT_RESULT =
+ "The file is converted successfully with the given or default ec policy.";
+
+ this.setDfsClient(HadoopUtil.getDFSClient(
+ HadoopUtil.getNameNodeUri(conf), conf));
+ // keep attribute consistent
+ HdfsFileStatus fileStatus = dfsClient.getFileInfo(srcPath);
+ if (fileStatus == null) {
+ throw new ActionException("File doesn't exist!");
+ }
+ validateEcPolicy(ecPolicyName);
+ ErasureCodingPolicy srcEcPolicy = fileStatus.getErasureCodingPolicy();
+ // if the current ecPolicy is already the target one, no need to convert
+ if (srcEcPolicy != null) {
+ if (srcEcPolicy.getName().equals(ecPolicyName)) {
+ appendResult(MATCH_RESULT);
+ this.progress = 1.0F;
+ return;
+ }
+ } else {
+ // if ecPolicy is null, it means replication.
+ if (ecPolicyName.equals(REPLICATION_POLICY_NAME)) {
+ appendResult(MATCH_RESULT);
+ this.progress = 1.0F;
+ return;
+ }
+ }
+ if (fileStatus.isDir()) {
+ dfsClient.setErasureCodingPolicy(srcPath, ecPolicyName);
+ this.progress = 1.0F;
+ appendResult(DIR_RESULT);
+ return;
+ }
+ HdfsDataOutputStream outputStream = null;
+ try {
+ // a file only with replication policy can be appended.
+ if (srcEcPolicy == null) {
+ // append the file to acquire the lock to avoid modifying, no real appending occurs.
+ outputStream =
+ dfsClient.append(srcPath, bufferSize, EnumSet.of(CreateFlag.APPEND), null, null);
+ }
+ convert(conf, ecPolicyName);
+ /**
+ * The append operation will change the modification accordingly,
+ * so we use the filestatus obtained before append to set ecTmp file's most attributes
+ */
+ setAttributes(srcPath, fileStatus, ecTmpPath);
+ dfsClient.rename(ecTmpPath, srcPath, Options.Rename.OVERWRITE);
+ appendResult(CONVERT_RESULT);
+ if (srcEcPolicy == null) {
+ appendResult("The previous EC policy is replication.");
+ } else {
+ appendResult(String.format("The previous EC policy is {}.", srcEcPolicy.getName()));
+ }
+ appendResult(String.format("The current EC policy is {}.", ecPolicyName));
+ } catch (ActionException ex) {
+ try {
+ if (dfsClient.getFileInfo(ecTmpPath) != null) {
+ dfsClient.delete(ecTmpPath, false);
+ }
+ } catch (IOException e) {
+ LOG.error("Failed to delete tmp file created during the conversion!");
+ }
+ throw new ActionException(ex);
+ } finally {
+ if (outputStream != null) {
+ try {
+ outputStream.close();
+ } catch (IOException ex) {
+ // IOException may be reported for missing blocks.
+ }
+ }
+ }
+ }
+
+ public void validateEcPolicy(String ecPolicyName) throws Exception {
+ Map ecPolicyNameToState = new HashMap<>();
+ for (ErasureCodingPolicyInfo info : dfsClient.getErasureCodingPolicies()) {
+ ecPolicyNameToState.put(info.getPolicy().getName(), info.getState());
+ }
+ if (!ecPolicyNameToState.keySet().contains(ecPolicyName) && !ecPolicyName.equals(REPLICATION_POLICY_NAME)) {
+ throw new ActionException("The EC policy " + ecPolicyName + " is not supported!");
+ } else if (ecPolicyNameToState.get(ecPolicyName) == ErasureCodingPolicyState.DISABLED
+ || ecPolicyNameToState.get(ecPolicyName) == ErasureCodingPolicyState.REMOVED) {
+ throw new ActionException("The EC policy " + ecPolicyName + " is disabled or removed!");
+ }
+ }
+
+ @Override
+ public float getProgress() {
+ return progress;
+ }
+}
\ No newline at end of file
diff --git a/smart-hadoop-support/smart-hadoop-3.1/src/main/java/org/smartdata/hdfs/action/ErasureCodingBase.java b/smart-hadoop-support/smart-hadoop-3.1/src/main/java/org/smartdata/hdfs/action/ErasureCodingBase.java
new file mode 100644
index 00000000000..1476de4e44c
--- /dev/null
+++ b/smart-hadoop-support/smart-hadoop-3.1/src/main/java/org/smartdata/hdfs/action/ErasureCodingBase.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.smartdata.hdfs.action;
+
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.XAttrSetFlag;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSInputStream;
+import org.apache.hadoop.hdfs.DFSOutputStream;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.smartdata.action.ActionException;
+import org.smartdata.conf.SmartConf;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.Map;
+
+/**
+ * An abstract base class for ErasureCodingAction & UnErasureCodingAction.
+ */
+abstract public class ErasureCodingBase extends HdfsAction {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ErasureCodingBase.class);
+ public static final String BUF_SIZE = "-bufSize";
+ protected String srcPath;
+ protected String ecTmpPath;
+ protected int bufferSize = 1024 * 1024;
+ protected float progress;
+ // The value for -ecTmp is assigned by ErasureCodingScheduler.
+ public static final String EC_TMP = "-ecTmp";
+ public static final String REPLICATION_POLICY_NAME =
+ SystemErasureCodingPolicies.getReplicationPolicy().getName();
+
+ protected void convert(SmartConf conf, String ecPolicyName) throws ActionException {
+ DFSInputStream in = null;
+ DFSOutputStream out = null;
+ try {
+ long blockSize = conf.getLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY,
+ DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT);
+ in = dfsClient.open(srcPath, bufferSize, true);
+ HdfsFileStatus fileStatus = dfsClient.getFileInfo(srcPath);
+ // use the same FsPermission as srcPath
+ FsPermission permission = fileStatus.getPermission();
+ out = dfsClient.create(ecTmpPath, permission, EnumSet.of(CreateFlag.CREATE), true,
+ (short) 1, blockSize, null, bufferSize, null, null, ecPolicyName);
+ long bytesRemaining = fileStatus.getLen();
+ byte[] buf = new byte[bufferSize];
+ while (bytesRemaining > 0L) {
+ int bytesToRead =
+ (int) (bytesRemaining < (long) buf.length ? bytesRemaining :
+ (long) buf.length);
+ int bytesRead = in.read(buf, 0, bytesToRead);
+ if (bytesRead == -1) {
+ break;
+ }
+ out.write(buf, 0, bytesRead);
+ bytesRemaining -= (long) bytesRead;
+ this.progress = (float) (fileStatus.getLen() - bytesRemaining) / fileStatus.getLen();
+ }
+ } catch (Exception ex) {
+ throw new ActionException(ex);
+ } finally {
+ try {
+ if (in != null) {
+ in.close();
+ }
+ if (out != null) {
+ out.close();
+ }
+ } catch (IOException ex) {
+ LOG.error("IOException occurred when closing DFSInputStream or DFSOutputStream!");
+ }
+ }
+ }
+
+ // set attributes for dest to keep them consistent with their counterpart of src
+ protected void setAttributes(String src, HdfsFileStatus fileStatus, String dest)
+ throws IOException {
+ dfsClient.setOwner(dest, fileStatus.getOwner(), fileStatus.getGroup());
+ dfsClient.setPermission(dest, fileStatus.getPermission());
+ dfsClient.setStoragePolicy(dest, dfsClient.getStoragePolicy(src).getName());
+ dfsClient.setTimes(dest, fileStatus.getModificationTime(), fileStatus.getAccessTime());
+ boolean aclsEnabled = getContext().getConf().getBoolean(
+ DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY,
+ DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_DEFAULT);
+ if (aclsEnabled) {
+ dfsClient.setAcl(dest, dfsClient.getAclStatus(src).getEntries());
+ }
+ //TODO: check ec related record to avoid paradox
+ for (Map.Entry entry : dfsClient.getXAttrs(src).entrySet()) {
+ dfsClient.setXAttr(dest, entry.getKey(), entry.getValue(),
+ EnumSet.of(XAttrSetFlag.CREATE, XAttrSetFlag.REPLACE));
+ }
+ }
+}
\ No newline at end of file
diff --git a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/action/StripErasureCodeFileAction.java b/smart-hadoop-support/smart-hadoop-3.1/src/main/java/org/smartdata/hdfs/action/ListErasureCodingPolicy.java
similarity index 50%
rename from smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/action/StripErasureCodeFileAction.java
rename to smart-hadoop-support/smart-hadoop-3.1/src/main/java/org/smartdata/hdfs/action/ListErasureCodingPolicy.java
index 2be287215ee..7ad70b77cf8 100644
--- a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/action/StripErasureCodeFileAction.java
+++ b/smart-hadoop-support/smart-hadoop-3.1/src/main/java/org/smartdata/hdfs/action/ListErasureCodingPolicy.java
@@ -17,16 +17,36 @@
*/
package org.smartdata.hdfs.action;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
+import org.smartdata.action.annotation.ActionSignature;
+import org.smartdata.conf.SmartConf;
+import org.smartdata.hdfs.HadoopUtil;
+
+import java.util.Map;
/**
- * An action to do strip level erasure code a file. Only for Hadoop 3.x.
+ * An action to list the info for all EC policies in HDFS.
*/
-public class StripErasureCodeFileAction extends HdfsAction {
- private static final Logger LOG = LoggerFactory.getLogger(StripErasureCodeFileAction.class);
+@ActionSignature(
+ actionId = "listec",
+ displayName = "listec",
+ usage = "No args"
+)
+public class ListErasureCodingPolicy extends HdfsAction {
+ private SmartConf conf;
+
+ @Override
+ public void init(Map args) {
+ super.init(args);
+ this.conf = getContext().getConf();
+ }
@Override
- protected void execute() throws Exception {
+ public void execute() throws Exception {
+ this.setDfsClient(HadoopUtil.getDFSClient(
+ HadoopUtil.getNameNodeUri(conf), conf));
+ for (ErasureCodingPolicyInfo policyInfo : dfsClient.getErasureCodingPolicies()) {
+ appendResult("{" + policyInfo.toString() + "}");
+ }
}
}
diff --git a/smart-hadoop-support/smart-hadoop-3.1/src/main/java/org/smartdata/hdfs/action/UnErasureCodingAction.java b/smart-hadoop-support/smart-hadoop-3.1/src/main/java/org/smartdata/hdfs/action/UnErasureCodingAction.java
new file mode 100644
index 00000000000..8d05c2f1f44
--- /dev/null
+++ b/smart-hadoop-support/smart-hadoop-3.1/src/main/java/org/smartdata/hdfs/action/UnErasureCodingAction.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.smartdata.hdfs.action;
+
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.smartdata.action.ActionException;
+import org.smartdata.action.annotation.ActionSignature;
+import org.smartdata.conf.SmartConf;
+import org.smartdata.hdfs.HadoopUtil;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * An action to set replication policy for a dir or convert a file to another one in replication policy.
+ * Default value is used for argument of bufSize if its value is not given in this action.
+ */
+@ActionSignature(
+ actionId = "unec",
+ displayName = "unec",
+ usage = HdfsAction.FILE_PATH + " $src " + ErasureCodingBase.BUF_SIZE + " $bufSize"
+)
+public class UnErasureCodingAction extends ErasureCodingBase {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(UnErasureCodingAction.class);
+ private String ecPolicyName;
+ private SmartConf conf;
+
+ @Override
+ public void init(Map args) {
+ super.init(args);
+ this.conf = getContext().getConf();
+ this.srcPath = args.get(FILE_PATH);
+ this.ecPolicyName = REPLICATION_POLICY_NAME;
+ if (args.containsKey(EC_TMP)) {
+ this.ecTmpPath = args.get(EC_TMP);
+ }
+ if (args.containsKey(BUF_SIZE) && !args.get(BUF_SIZE).isEmpty()) {
+ this.bufferSize = Integer.valueOf(args.get(BUF_SIZE));
+ }
+ this.progress = 0.0F;
+ }
+
+ @Override
+ protected void execute() throws Exception {
+ final String MATCH_RESULT =
+ "The current EC policy is replication already.";
+ final String DIR_RESULT =
+ "The replication EC policy is set successfully for the given directory.";
+ final String CONVERT_RESULT =
+ "The file is converted successfully with replication EC policy.";
+
+ this.setDfsClient(HadoopUtil.getDFSClient(
+ HadoopUtil.getNameNodeUri(conf), conf));
+ HdfsFileStatus fileStatus = dfsClient.getFileInfo(srcPath);
+ if (fileStatus == null) {
+ throw new ActionException("File doesn't exist!");
+ }
+ ErasureCodingPolicy srcEcPolicy = fileStatus.getErasureCodingPolicy();
+ // if ecPolicy is null, it means replication.
+ if (srcEcPolicy == null) {
+ this.progress = 1.0F;
+ appendResult(MATCH_RESULT);
+ return;
+ }
+ if (fileStatus.isDir()) {
+ dfsClient.setErasureCodingPolicy(srcPath, ecPolicyName);
+ progress = 1.0F;
+ appendResult(DIR_RESULT);
+ return;
+ }
+ try {
+ convert(conf, ecPolicyName);
+ setAttributes(srcPath, fileStatus, ecTmpPath);
+ dfsClient.rename(ecTmpPath, srcPath, Options.Rename.OVERWRITE);
+ appendResult(CONVERT_RESULT);
+ appendResult(String.format("The previous EC policy is %s.", srcEcPolicy.getName()));
+ appendResult(String.format("The current EC policy is %s.", REPLICATION_POLICY_NAME));
+ } catch (ActionException ex) {
+ try {
+ if (dfsClient.getFileInfo(ecTmpPath) != null) {
+ dfsClient.delete(ecTmpPath, false);
+ }
+ } catch (IOException e) {
+ LOG.error("Failed to delete tmp file created during the conversion!");
+ }
+ throw new ActionException(ex);
+ }
+ }
+
+ @Override
+ public float getProgress() {
+ return progress;
+ }
+}
\ No newline at end of file
diff --git a/smart-hadoop-support/smart-hadoop-3.1/src/test/java/org/smartdata/hdfs/action/TestErasureCodingAction.java b/smart-hadoop-support/smart-hadoop-3.1/src/test/java/org/smartdata/hdfs/action/TestErasureCodingAction.java
new file mode 100644
index 00000000000..928f5e01ff5
--- /dev/null
+++ b/smart-hadoop-support/smart-hadoop-3.1/src/test/java/org/smartdata/hdfs/action/TestErasureCodingAction.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.smartdata.hdfs.action;
+
+import org.apache.hadoop.fs.Path;
+import static org.junit.Assert.*;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class TestErasureCodingAction extends TestErasureCodingMiniCluster {
+
+ @Test
+ public void testEcActionForFile() throws Exception {
+ String srcPath = "/ec/test_file";
+ createTestFile(srcPath, 1000);
+ // the file is stored in replication
+ assertEquals(null, dfsClient.getErasureCodingPolicy(srcPath));
+
+ ErasureCodingAction ecAction = new ErasureCodingAction();
+ ecAction.setContext(smartContext);
+ String ecTmpPath = "/ssm/ec_tmp/tmp_file";
+ Map args = new HashMap<>();
+ args.put(HdfsAction.FILE_PATH, srcPath);
+ args.put(ErasureCodingBase.EC_TMP, ecTmpPath);
+ args.put(ErasureCodingAction.EC_POLICY_NAME, ecPolicy.getName());
+ ecAction.init(args);
+ ecAction.run();
+ assertTrue(ecAction.getExpectedAfterRun());
+ // the file is stored in ec with default policy
+ assertEquals(dfsClient.getErasureCodingPolicy(srcPath), ecPolicy);
+ }
+
+ @Test
+ public void testEcActionForDir() throws Exception {
+ String srcDirPath = "/test_dir/";
+ dfs.mkdirs(new Path(srcDirPath));
+ assertEquals(null, dfsClient.getErasureCodingPolicy(srcDirPath));
+
+ ErasureCodingAction ecAction = new ErasureCodingAction();
+ ecAction.setContext(smartContext);
+ Map args = new HashMap<>();
+ args.put(HdfsAction.FILE_PATH, srcDirPath);
+ args.put(ErasureCodingAction.EC_POLICY_NAME, ecPolicy.getName());
+ ecAction.init(args);
+ ecAction.run();
+ assertTrue(ecAction.getExpectedAfterRun());
+ assertEquals(dfsClient.getErasureCodingPolicy(srcDirPath), ecPolicy);
+
+ String srcFilePath = "/test_dir/test_file";
+ createTestFile(srcFilePath, 1000);
+ // The newly created file should has the same EC policy as parent directory.
+ assertEquals(dfsClient.getErasureCodingPolicy(srcFilePath), ecPolicy);
+ }
+}
\ No newline at end of file
diff --git a/smart-hadoop-support/smart-hadoop-3.1/src/test/java/org/smartdata/hdfs/action/TestErasureCodingMiniCluster.java b/smart-hadoop-support/smart-hadoop-3.1/src/test/java/org/smartdata/hdfs/action/TestErasureCodingMiniCluster.java
new file mode 100644
index 00000000000..87daf8ee54a
--- /dev/null
+++ b/smart-hadoop-support/smart-hadoop-3.1/src/test/java/org/smartdata/hdfs/action/TestErasureCodingMiniCluster.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.smartdata.hdfs.action;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.*;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
+import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
+import org.junit.After;
+import org.junit.Before;
+import org.smartdata.SmartContext;
+import org.smartdata.conf.SmartConf;
+import org.smartdata.conf.SmartConfKeys;
+import org.smartdata.hdfs.MiniClusterHarness;
+
+import java.io.IOException;
+
+public class TestErasureCodingMiniCluster {
+ protected ErasureCodingPolicy ecPolicy;
+ // use the default one, not the one in MiniClusterHarness
+ public static final long BLOCK_SIZE = DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
+ protected MiniDFSCluster cluster;
+ protected DistributedFileSystem dfs;
+ protected DFSClient dfsClient;
+ protected SmartContext smartContext;
+
+ @Before
+ public void init() throws Exception {
+ SmartConf conf = new SmartConf();
+// super.initConf(conf);
+ conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+ conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_DEFAULT);
+ conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT);
+ // use ErasureCodeConstants.XOR_2_1_SCHEMA
+ ecPolicy = SystemErasureCodingPolicies.getPolicies().get(3);
+ cluster = new MiniDFSCluster.Builder(conf).
+ numDataNodes(ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits()).
+ build();
+ // Add namenode URL to smartContext
+ conf.set(SmartConfKeys.SMART_DFS_NAMENODE_RPCSERVER_KEY,
+ "hdfs://" + cluster.getNameNode().getNameNodeAddressHostPortString());
+ smartContext = new SmartContext(conf);
+ cluster.waitActive();
+ dfs = cluster.getFileSystem();
+ dfsClient = dfs.getClient();
+ dfsClient.enableErasureCodingPolicy(ecPolicy.getName());
+ }
+
+ public void createTestFile(String srcPath, long length) throws IOException {
+// DFSTestUtil.createFile(dfs, new Path(srcPath), length, (short) 3, 0L);
+ DFSTestUtil.createFile(dfs, new Path(srcPath), 1024, length, BLOCK_SIZE, (short) 3, 0L);
+ }
+
+ @After
+ public void shutdown() throws IOException {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+}
diff --git a/smart-hadoop-support/smart-hadoop-3.1/src/test/java/org/smartdata/hdfs/action/TestUnErasureCodingAction.java b/smart-hadoop-support/smart-hadoop-3.1/src/test/java/org/smartdata/hdfs/action/TestUnErasureCodingAction.java
new file mode 100644
index 00000000000..a0bb314a5a7
--- /dev/null
+++ b/smart-hadoop-support/smart-hadoop-3.1/src/test/java/org/smartdata/hdfs/action/TestUnErasureCodingAction.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.smartdata.hdfs.action;
+
+import org.apache.hadoop.fs.Path;
+import static org.junit.Assert.*;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class TestUnErasureCodingAction extends TestErasureCodingMiniCluster {
+
+ @Test
+ public void testUnEcActionForFile() throws Exception {
+ String testDir = "/test_dir";
+ dfs.mkdirs(new Path(testDir));
+ dfs.setErasureCodingPolicy(new Path(testDir), ecPolicy.getName());
+ // create test file, its EC policy should be consistent with parent dir, i.e., ecPolicy
+ String srcPath = testDir + "/ec_file";
+ createTestFile(srcPath, 1000);
+ assertEquals(dfsClient.getErasureCodingPolicy(srcPath), ecPolicy);
+
+ UnErasureCodingAction unEcAction = new UnErasureCodingAction();
+ unEcAction.setContext(smartContext);
+ String ecTmpPath = "/ssm/ec_tmp/tmp_file";
+ Map args = new HashMap<>();
+ args.put(HdfsAction.FILE_PATH, srcPath);
+ args.put(ErasureCodingBase.EC_TMP, ecTmpPath);
+ unEcAction.init(args);
+ unEcAction.run();
+ assertTrue(unEcAction.getExpectedAfterRun());
+ assertNull(dfsClient.getErasureCodingPolicy(srcPath));
+ }
+
+ @Test
+ public void testUnEcActionForDir() throws Exception {
+ String testDir = "/test_dir";
+ dfs.mkdirs(new Path(testDir));
+ dfs.setErasureCodingPolicy(new Path(testDir), ecPolicy.getName());
+ assertEquals(dfsClient.getErasureCodingPolicy(testDir), ecPolicy);
+
+ UnErasureCodingAction unEcAction = new UnErasureCodingAction();
+ unEcAction.setContext(smartContext);
+ Map args = new HashMap<>();
+ args.put(HdfsAction.FILE_PATH, testDir);
+ unEcAction.init(args);
+ unEcAction.run();
+ assertNull(dfs.getErasureCodingPolicy(new Path(testDir)));
+
+ // create test file, its EC policy is expected to be replication
+ String srcPath = testDir + "/ec_file";
+ createTestFile(srcPath, 1000);
+ assertNull(dfs.getErasureCodingPolicy(new Path(srcPath)));
+ }
+}
diff --git a/smart-hadoop-support/smart-hadoop-common/pom.xml b/smart-hadoop-support/smart-hadoop-common/pom.xml
index 495999122b1..616896860d5 100644
--- a/smart-hadoop-support/smart-hadoop-common/pom.xml
+++ b/smart-hadoop-support/smart-hadoop-common/pom.xml
@@ -31,6 +31,16 @@
jar
+
+ org.smartdata
+ smart-common
+ 1.5.0-SNAPSHOT
+
+
+ org.smartdata
+ smart-action
+ 1.5.0-SNAPSHOT
+
org.apache.hadoop
hadoop-hdfs
@@ -62,5 +72,10 @@
test
test-jar
+
+ junit
+ junit
+ test
+
diff --git a/smart-hadoop-support/smart-hadoop-common/src/main/java/org/smartdata/hdfs/CompatibilityHelper.java b/smart-hadoop-support/smart-hadoop-common/src/main/java/org/smartdata/hdfs/CompatibilityHelper.java
index aad542a1b20..6cb4e0d1b9a 100644
--- a/smart-hadoop-support/smart-hadoop-common/src/main/java/org/smartdata/hdfs/CompatibilityHelper.java
+++ b/smart-hadoop-support/smart-hadoop-common/src/main/java/org/smartdata/hdfs/CompatibilityHelper.java
@@ -88,4 +88,8 @@ HdfsFileStatus createHdfsFileStatus(
long length, boolean isdir, int block_replication, long blocksize, long modification_time,
long access_time, FsPermission permission, String owner, String group, byte[] symlink, byte[] path,
long fileId, int childrenNum, FileEncryptionInfo feInfo, byte storagePolicy);
+
+ byte getErasureCodingPolicy(HdfsFileStatus fileStatus);
+
+ byte getErasureCodingPolicyByName(DFSClient client, String ecPolicyName) throws IOException;
}
diff --git a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/HadoopUtil.java b/smart-hadoop-support/smart-hadoop-common/src/main/java/org/smartdata/hdfs/HadoopUtil.java
similarity index 99%
rename from smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/HadoopUtil.java
rename to smart-hadoop-support/smart-hadoop-common/src/main/java/org/smartdata/hdfs/HadoopUtil.java
index 6ccd040c925..febf610f025 100644
--- a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/HadoopUtil.java
+++ b/smart-hadoop-support/smart-hadoop-common/src/main/java/org/smartdata/hdfs/HadoopUtil.java
@@ -254,6 +254,7 @@ public static FileInfo convertFileStatus(HdfsFileStatus status, String path) {
.setOwner(status.getOwner())
.setGroup(status.getGroup())
.setStoragePolicy(status.getStoragePolicy())
+ .setErasureCodingPolicy(CompatibilityHelperLoader.getHelper().getErasureCodingPolicy(status))
.build();
}
diff --git a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/action/HdfsAction.java b/smart-hadoop-support/smart-hadoop-common/src/main/java/org/smartdata/hdfs/action/HdfsAction.java
similarity index 100%
rename from smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/action/HdfsAction.java
rename to smart-hadoop-support/smart-hadoop-common/src/main/java/org/smartdata/hdfs/action/HdfsAction.java
diff --git a/smart-hadoop-support/smart-hadoop/src/test/java/org/smartdata/hdfs/MiniClusterHarness.java b/smart-hadoop-support/smart-hadoop-common/src/test/java/org/smartdata/hdfs/MiniClusterHarness.java
similarity index 100%
rename from smart-hadoop-support/smart-hadoop/src/test/java/org/smartdata/hdfs/MiniClusterHarness.java
rename to smart-hadoop-support/smart-hadoop-common/src/test/java/org/smartdata/hdfs/MiniClusterHarness.java
diff --git a/smart-hadoop-support/smart-hadoop/src/test/java/org/smartdata/hdfs/MiniClusterWithStoragesHarness.java b/smart-hadoop-support/smart-hadoop-common/src/test/java/org/smartdata/hdfs/MiniClusterWithStoragesHarness.java
similarity index 100%
rename from smart-hadoop-support/smart-hadoop/src/test/java/org/smartdata/hdfs/MiniClusterWithStoragesHarness.java
rename to smart-hadoop-support/smart-hadoop-common/src/test/java/org/smartdata/hdfs/MiniClusterWithStoragesHarness.java
diff --git a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/action/HdfsActionFactory.java b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/action/HdfsActionFactory.java
index 3d20ab0cd25..55a0ec5caa2 100644
--- a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/action/HdfsActionFactory.java
+++ b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/action/HdfsActionFactory.java
@@ -17,12 +17,25 @@
*/
package org.smartdata.hdfs.action;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.smartdata.action.AbstractActionFactory;
+import org.smartdata.action.SmartAction;
+import org.smartdata.hdfs.scheduler.ErasureCodingScheduler;
+
+import java.util.Arrays;
+import java.util.List;
/**
* Built-in smart actions for HDFS system.
*/
public class HdfsActionFactory extends AbstractActionFactory {
+ private static final Logger LOG = LoggerFactory.getLogger(HdfsActionFactory.class);
+ public static final List HDFS3_ACTION_CLASSES = Arrays.asList(
+ "org.smartdata.hdfs.action.ListErasureCodingPolicy",
+ "org.smartdata.hdfs.action.CheckErasureCodingPolicy",
+ "org.smartdata.hdfs.action.ErasureCodingAction",
+ "org.smartdata.hdfs.action.UnErasureCodingAction");
static {
addAction(AllSsdFileAction.class);
@@ -37,7 +50,6 @@ public class HdfsActionFactory extends AbstractActionFactory {
addAction(WriteFileAction.class);
addAction(CheckStorageAction.class);
addAction(SetXAttrAction.class);
-// addAction("stripec", StripErasureCodeFileAction.class);
// addAction("blockec", BlockErasureCodeFileAction.class);
addAction(CopyFileAction.class);
addAction(DeleteFileAction.class);
@@ -57,5 +69,18 @@ public class HdfsActionFactory extends AbstractActionFactory {
// addAction("diskbalance", DiskBalanceAction.class);
// addAction("clusterbalance", ClusterBalanceAction.class);
// addAction("setstoragepolicy", SetStoragePolicyAction.class);
+ if (ErasureCodingScheduler.isECSupported()) {
+ ClassLoader loader = Thread.currentThread().getContextClassLoader();
+ if (loader == null) {
+ loader = ClassLoader.getSystemClassLoader();
+ }
+ try {
+ for (String classString : HDFS3_ACTION_CLASSES) {
+ addAction((Class) loader.loadClass(classString));
+ }
+ } catch (ClassNotFoundException ex) {
+ LOG.error("Class not found!", ex);
+ }
+ }
}
}
diff --git a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/action/MetaDataAction.java b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/action/MetaDataAction.java
index a816b99dfa9..169acf66614 100644
--- a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/action/MetaDataAction.java
+++ b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/action/MetaDataAction.java
@@ -111,7 +111,8 @@ protected void execute() throws Exception {
permission,
ownerName,
groupName,
- (byte) 1);
+ (byte) 1,
+ (byte) 0);
changeFileMetaData(srcPath, fileInfo);
}
diff --git a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/metric/fetcher/InotifyEventApplier.java b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/metric/fetcher/InotifyEventApplier.java
index f12df882b01..c419d7ec981 100644
--- a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/metric/fetcher/InotifyEventApplier.java
+++ b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/metric/fetcher/InotifyEventApplier.java
@@ -21,8 +21,10 @@
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.inotify.Event;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.io.WritableUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.smartdata.hdfs.CompatibilityHelperLoader;
import org.smartdata.hdfs.HadoopUtil;
import org.smartdata.metastore.DBType;
import org.smartdata.metastore.MetaStore;
@@ -32,6 +34,8 @@
import org.smartdata.model.FileDiffType;
import org.smartdata.model.FileInfo;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -59,7 +63,7 @@ public void apply(List events) throws IOException, MetaStoreException {
List statements = new ArrayList<>();
for (Event event : events) {
List gen = getSqlStatement(event);
- if (gen != null && !gen.isEmpty()){
+ if (gen != null && !gen.isEmpty()) {
for (String s : gen) {
if (s != null && s.length() > 0) {
statements.add(s);
@@ -234,6 +238,12 @@ private List getRenameSql(Event.RenameEvent renameEvent)
if (status == null) {
LOG.debug("Get rename dest status failed, {} -> {}", src, dest);
}
+ // The dest path which the src is renamed to should be checked in file table
+ // to avoid duplicated record for one same path.
+ FileInfo destInfo = metaStore.getFile(dest);
+ if (destInfo != null) {
+ metaStore.deleteFileByPath(dest);
+ }
if (info == null) {
if (status != null) {
info = HadoopUtil.convertFileStatus(status, dest);
@@ -335,6 +345,7 @@ private String getMetaDataUpdateSql(Event.MetadataUpdateEvent metadataUpdateEven
"UPDATE file SET block_replication = %s WHERE path = '%s';",
metadataUpdateEvent.getReplication(), metadataUpdateEvent.getPath());
case XATTRS:
+ final String EC_POLICY = "hdfs.erasurecoding.policy";
//Todo
if (LOG.isDebugEnabled()) {
String message = "\n";
@@ -343,6 +354,24 @@ private String getMetaDataUpdateSql(Event.MetadataUpdateEvent metadataUpdateEven
}
LOG.debug(message);
}
+ // The following code should be executed merely on HDFS3.x.
+ for (XAttr xAttr : metadataUpdateEvent.getxAttrs()) {
+ if (xAttr.getName().equals(EC_POLICY)) {
+ try {
+ String ecPolicyName = WritableUtils.readString(
+ new DataInputStream(new ByteArrayInputStream(xAttr.getValue())));
+ byte ecPolicyId = CompatibilityHelperLoader.getHelper().
+ getErasureCodingPolicyByName(client, ecPolicyName);
+ if (ecPolicyId == (byte) -1) {
+ LOG.error("Unrecognized EC policy for updating!");
+ }
+ return String.format("UPDATE file SET ec_policy_id = %s WHERE path = '%s'",
+ ecPolicyId, metadataUpdateEvent.getPath());
+ } catch (IOException ex) {
+ LOG.error("Error occurred for updating ecPolicy!", ex);
+ }
+ }
+ }
break;
case ACLS:
return "";
diff --git a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/metric/fetcher/NamespaceFetcher.java b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/metric/fetcher/NamespaceFetcher.java
index 7d194101f1c..79057040eb4 100644
--- a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/metric/fetcher/NamespaceFetcher.java
+++ b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/metric/fetcher/NamespaceFetcher.java
@@ -24,6 +24,7 @@
import org.slf4j.LoggerFactory;
import org.smartdata.conf.SmartConf;
import org.smartdata.conf.SmartConfKeys;
+import org.smartdata.hdfs.CompatibilityHelperLoader;
import org.smartdata.metastore.MetaStoreException;
import org.smartdata.model.FileInfo;
import org.smartdata.metastore.MetaStore;
@@ -307,7 +308,8 @@ private FileInfo convertToFileInfo(HdfsFileStatus status, String parent) {
status.getPermission().toShort(),
status.getOwner(),
status.getGroup(),
- status.getStoragePolicy());
+ status.getStoragePolicy(),
+ CompatibilityHelperLoader.getHelper().getErasureCodingPolicy(status));
return fileInfo;
}
}
diff --git a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/scheduler/ErasureCodingScheduler.java b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/scheduler/ErasureCodingScheduler.java
new file mode 100644
index 00000000000..06d3b673471
--- /dev/null
+++ b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/scheduler/ErasureCodingScheduler.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.smartdata.hdfs.scheduler;
+
+import org.apache.hadoop.util.VersionInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.smartdata.SmartContext;
+import org.smartdata.conf.SmartConf;
+import org.smartdata.hdfs.action.*;
+import org.smartdata.metastore.MetaStore;
+import org.smartdata.metastore.MetaStoreException;
+import org.smartdata.model.ActionInfo;
+import org.smartdata.model.LaunchAction;
+import org.smartdata.model.action.ScheduleResult;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class ErasureCodingScheduler extends ActionSchedulerService {
+ public static final Logger LOG = LoggerFactory.getLogger(ErasureCodingScheduler.class);
+ public static final String ecActionID = "ec";
+ public static final String unecActionID = "unec";
+ public static final String checkecActionID = "checkec";
+ public static final String listecActionID = "listec";
+ public static final List actions =
+ Arrays.asList(ecActionID, unecActionID, checkecActionID, listecActionID);
+ public static final String EC_DIR = "/system/ssm/ec_tmp/";
+ public static final String EC_TMP = "-ecTmp";
+ private Set fileLock;
+ private SmartConf conf;
+ private MetaStore metaStore;
+
+ public ErasureCodingScheduler(SmartContext context, MetaStore metaStore) {
+ super(context, metaStore);
+ this.conf = context.getConf();
+ this.metaStore = metaStore;
+ }
+
+ public List getSupportedActions() {
+ return actions;
+ }
+
+ public void init() throws IOException {
+ fileLock = new HashSet<>();
+ }
+
+ @Override
+ public void start() throws IOException {
+
+ }
+
+ @Override
+ public void stop() throws IOException {
+
+ }
+
+ @Override
+ public boolean onSubmit(ActionInfo actionInfo) throws IOException {
+ if (!isECSupported()) {
+ throw new IOException(actionInfo.getActionName() +
+ " is not supported on " + VersionInfo.getVersion());
+ }
+ if (!actionInfo.getActionName().equals(listecActionID)) {
+ if (actionInfo.getArgs().get(HdfsAction.FILE_PATH) == null) {
+ throw new IOException("No src path is given!");
+ }
+ }
+ return true;
+ }
+
+ public static boolean isECSupported() {
+ String[] parts = VersionInfo.getVersion().split("\\.");
+ return Integer.parseInt(parts[0]) == 3;
+ }
+
+ @Override
+ public ScheduleResult onSchedule(ActionInfo actionInfo, LaunchAction action) {
+ if (!actions.contains(action.getActionType())) {
+ return ScheduleResult.SUCCESS;
+ }
+
+ if (actionInfo.getActionName().equals(listecActionID)) {
+ return ScheduleResult.SUCCESS;
+ }
+
+ String srcPath = action.getArgs().get(HdfsAction.FILE_PATH);
+ if (srcPath == null) {
+ actionInfo.appendLog("No file is given in this action!");
+ return ScheduleResult.FAIL;
+ }
+
+ if (actionInfo.getActionName().equals(checkecActionID)) {
+ return ScheduleResult.SUCCESS;
+ }
+
+ // check file lock merely for ec & unec action
+ if (fileLock.contains(srcPath)) {
+ return ScheduleResult.FAIL;
+ }
+ try {
+ if (!metaStore.getFile(srcPath).isdir()) {
+ // For ec or unec, add ecTmp argument
+ String tmpName = createTmpName(action);
+ action.getArgs().put(EC_TMP, EC_DIR + tmpName);
+ actionInfo.getArgs().put(EC_TMP, EC_DIR + tmpName);
+ }
+ } catch (MetaStoreException ex) {
+ LOG.error("Error occurred for getting file info", ex);
+ actionInfo.appendLog(ex.getMessage());
+ return ScheduleResult.FAIL;
+ }
+ // lock the file only if ec or unec action is scheduled
+ fileLock.add(srcPath);
+ return ScheduleResult.SUCCESS;
+ }
+
+ private String createTmpName(LaunchAction action) {
+ String path = action.getArgs().get(HdfsAction.FILE_PATH);
+ String fileName;
+ int index = path.lastIndexOf("/");
+ if (index == path.length() - 1) {
+ index = path.substring(0, path.length() - 1).indexOf("/");
+ fileName = path.substring(index + 1, path.length() - 1);
+ } else {
+ fileName = path.substring(index + 1, path.length());
+ }
+ /**
+ * The dest tmp file is under EC_DIR and
+ * named by fileName, aidxxx and current time in millisecond with "_" separated
+ */
+ String tmpName = fileName + "_" + "aid" + action.getActionId() +
+ "_" + System.currentTimeMillis();
+ return tmpName;
+ }
+
+ @Override
+ public void onActionFinished(ActionInfo actionInfo) {
+ if (actionInfo.getActionName().equals(ecActionID) ||
+ actionInfo.getActionName().equals(unecActionID)) {
+ fileLock.remove(actionInfo.getArgs().get(HdfsAction.FILE_PATH));
+ }
+ }
+}
diff --git a/smart-hadoop-support/smart-hadoop/src/test/java/org/smartdata/hdfs/metric/fetcher/TestCachedListFetcher.java b/smart-hadoop-support/smart-hadoop/src/test/java/org/smartdata/hdfs/metric/fetcher/TestCachedListFetcher.java
index b2285f36126..198c40c20ed 100644
--- a/smart-hadoop-support/smart-hadoop/src/test/java/org/smartdata/hdfs/metric/fetcher/TestCachedListFetcher.java
+++ b/smart-hadoop-support/smart-hadoop/src/test/java/org/smartdata/hdfs/metric/fetcher/TestCachedListFetcher.java
@@ -105,10 +105,11 @@ private FileInfo createFileStatus(String pathString) {
String group = "admin";
long fileId = fid;
byte storagePolicy = 0;
+ byte erasureCodingPolicy = 0;
fid++;
return new FileInfo(pathString, fileId, length,
isDir, (short)blockReplication, blockSize, modTime, accessTime,
- (short) 1, owner, group, storagePolicy);
+ (short) 1, owner, group, storagePolicy, erasureCodingPolicy);
}
@Test
diff --git a/smart-metastore/src/main/java/org/smartdata/metastore/MetaStore.java b/smart-metastore/src/main/java/org/smartdata/metastore/MetaStore.java
index 18a86717664..958bffaeb31 100644
--- a/smart-metastore/src/main/java/org/smartdata/metastore/MetaStore.java
+++ b/smart-metastore/src/main/java/org/smartdata/metastore/MetaStore.java
@@ -686,7 +686,7 @@ public List listFileActions(long rid,
// Add a mock fileInfo
fileInfo = new FileInfo(filePath, 0L, 0L, false,
(short) 0, 0L, 0L, 0L, (short) 0,
- "root", "root", (byte) 0);
+ "root", "root", (byte) 0, (byte) 0);
}
detailedFileAction.setFileLength(fileInfo.getLength());
detailedFileAction.setFilePath(filePath);
@@ -723,7 +723,7 @@ public List listFileActions(long rid, long start, long offse
// Add a mock fileInfo
fileInfo = new FileInfo(filePath, 0L, 0L, false,
(short) 0, 0L, 0L, 0L, (short) 0,
- "root", "root", (byte) 0);
+ "root", "root", (byte) 0, (byte) 0);
}
detailedFileAction.setFileLength(fileInfo.getLength());
detailedFileAction.setFilePath(filePath);
diff --git a/smart-metastore/src/main/java/org/smartdata/metastore/dao/FileInfoDao.java b/smart-metastore/src/main/java/org/smartdata/metastore/dao/FileInfoDao.java
index 852d1c4ddb9..276b89070b7 100644
--- a/smart-metastore/src/main/java/org/smartdata/metastore/dao/FileInfoDao.java
+++ b/smart-metastore/src/main/java/org/smartdata/metastore/dao/FileInfoDao.java
@@ -173,6 +173,7 @@ private Map toMap(FileInfo fileInfo) {
parameters
.put("owner_group", fileInfo.getGroup());
parameters.put("permission", fileInfo.getPermission());
+ parameters.put("ec_policy_id", fileInfo.getErasureCodingPolicy());
return parameters;
}
@@ -191,7 +192,8 @@ public FileInfo mapRow(ResultSet resultSet, int i)
resultSet.getShort("permission"),
resultSet.getString("owner"),
resultSet.getString("owner_group"),
- resultSet.getByte("sid")
+ resultSet.getByte("sid"),
+ resultSet.getByte("ec_policy_id")
);
return fileInfo;
}
diff --git a/smart-metastore/src/test/java/org/smartdata/metastore/TestMetaStore.java b/smart-metastore/src/test/java/org/smartdata/metastore/TestMetaStore.java
index a4d43195199..fa3bf8d44f0 100644
--- a/smart-metastore/src/test/java/org/smartdata/metastore/TestMetaStore.java
+++ b/smart-metastore/src/test/java/org/smartdata/metastore/TestMetaStore.java
@@ -173,6 +173,7 @@ public void testGetFiles() throws Exception {
String group = "admin";
long fileId = 56L;
byte storagePolicy = 0;
+ byte erasureCodingPolicy = 0;
FileInfo fileInfo =
new FileInfo(
pathString,
@@ -186,7 +187,8 @@ public void testGetFiles() throws Exception {
(short) 1,
owner,
group,
- storagePolicy);
+ storagePolicy,
+ erasureCodingPolicy);
metaStore.insertFile(fileInfo);
FileInfo dbFileInfo = metaStore.getFile(56);
Assert.assertTrue(dbFileInfo.equals(fileInfo));
@@ -292,6 +294,7 @@ public void testMoveSyncRules() throws Exception {
String group = "admin";
long fileId = 56L;
byte storagePolicy = 0;
+ byte erasureCodingPolicy = 0;
FileInfo fileInfo =
new FileInfo(
pathString,
@@ -305,7 +308,8 @@ public void testMoveSyncRules() throws Exception {
(short) 1,
owner,
group,
- storagePolicy);
+ storagePolicy,
+ erasureCodingPolicy);
metaStore.insertFile(fileInfo);
Map args = new HashMap();
args.put("-file", "/src/1");
@@ -417,6 +421,7 @@ public void testInsetFiles() throws Exception {
String group = "admin";
long fileId = 312321L;
byte storagePolicy = 0;
+ byte erasureCodingPolicy = 0;
FileInfo[] files = {
new FileInfo(
pathString,
@@ -430,7 +435,8 @@ public void testInsetFiles() throws Exception {
(short) 1,
owner,
group,
- storagePolicy)
+ storagePolicy,
+ erasureCodingPolicy)
};
metaStore.insertFiles(files);
FileInfo dbFileInfo = metaStore.getFile("/tmp/testFile");
diff --git a/smart-metastore/src/test/java/org/smartdata/metastore/dao/TestAccessCountTableManager.java b/smart-metastore/src/test/java/org/smartdata/metastore/dao/TestAccessCountTableManager.java
index ce643d69269..491ac35a747 100644
--- a/smart-metastore/src/test/java/org/smartdata/metastore/dao/TestAccessCountTableManager.java
+++ b/smart-metastore/src/test/java/org/smartdata/metastore/dao/TestAccessCountTableManager.java
@@ -172,6 +172,7 @@ private void prepareFiles(MetaStore metaStore) throws MetaStoreException {
(short) 1,
"root",
"admin",
+ (byte) 0,
(byte) 0));
}
metaStore.insertFiles(statusInternals.toArray(new FileInfo[0]));
@@ -190,6 +191,7 @@ private void insertNewFile(MetaStore metaStore, String file, Long fid)
(short) 1,
"root",
"admin",
+ (byte) 0,
(byte) 0);
metaStore.insertFile(finfo);
}
diff --git a/smart-metastore/src/test/java/org/smartdata/metastore/dao/TestFileInfoDao.java b/smart-metastore/src/test/java/org/smartdata/metastore/dao/TestFileInfoDao.java
index 694b5d73c56..2980939f7f4 100644
--- a/smart-metastore/src/test/java/org/smartdata/metastore/dao/TestFileInfoDao.java
+++ b/smart-metastore/src/test/java/org/smartdata/metastore/dao/TestFileInfoDao.java
@@ -57,15 +57,16 @@ public void testInsetGetDeleteFiles() throws Exception {
String group = "admin";
long fileId = 312321L;
byte storagePolicy = 0;
- FileInfo fileInfo = new FileInfo(path, fileId, length, isDir, blockReplication,
- blockSize, modTime, accessTime, permission, owner, group, storagePolicy);
+ byte erasureCodingPolicy = 0;
+ FileInfo fileInfo = new FileInfo(path, fileId, length, isDir, blockReplication, blockSize,
+ modTime, accessTime, permission, owner, group, storagePolicy, erasureCodingPolicy);
fileInfoDao.insert(fileInfo);
FileInfo file1 = fileInfoDao.getByPath("/testFile");
Assert.assertTrue(fileInfo.equals(file1));
FileInfo file2 = fileInfoDao.getById(fileId);
Assert.assertTrue(fileInfo.equals(file2));
- FileInfo fileInfo1 = new FileInfo(path, fileId + 1, length, isDir, blockReplication,
- blockSize, modTime, accessTime, permission, owner, group, storagePolicy);
+ FileInfo fileInfo1 = new FileInfo(path, fileId + 1, length, isDir, blockReplication, blockSize,
+ modTime, accessTime, permission, owner, group, storagePolicy, erasureCodingPolicy);
fileInfoDao.insert(fileInfo1);
List fileInfos = fileInfoDao.getFilesByPrefix("/testaaFile");
Assert.assertTrue(fileInfos.size() == 0);
@@ -93,12 +94,13 @@ public void testInsertUpdateFiles() throws Exception {
String group = "admin";
long fileId = 312321L;
byte storagePolicy = 0;
+ byte erasureCodingPolicy = 0;
Map mapOwnerIdName = new HashMap<>();
mapOwnerIdName.put(1, "root");
Map mapGroupIdName = new HashMap<>();
mapGroupIdName.put(1, "admin");
- FileInfo fileInfo = new FileInfo(path, fileId, length, isDir, blockReplication,
- blockSize, modTime, accessTime, permission, owner, group, storagePolicy);
+ FileInfo fileInfo = new FileInfo(path, fileId, length, isDir, blockReplication, blockSize,
+ modTime, accessTime, permission, owner, group, storagePolicy, erasureCodingPolicy);
fileInfoDao.insert(fileInfo);
fileInfoDao.update(path, 10);
FileInfo file = fileInfoDao.getById(fileId);
diff --git a/smart-metastore/src/test/java/org/smartdata/metastore/dao/TestTableAggregator.java b/smart-metastore/src/test/java/org/smartdata/metastore/dao/TestTableAggregator.java
index 9ea42113767..6e31aad6b7f 100644
--- a/smart-metastore/src/test/java/org/smartdata/metastore/dao/TestTableAggregator.java
+++ b/smart-metastore/src/test/java/org/smartdata/metastore/dao/TestTableAggregator.java
@@ -109,6 +109,7 @@ private void prepareFiles(MetaStore metaStore) throws MetaStoreException {
(short) 1,
"root",
"admin",
+ (byte) 0,
(byte) 0));
}
metaStore.insertFiles(statusInternals.toArray(new FileInfo[0]));
diff --git a/smart-server/src/test/java/org/smartdata/server/engine/rule/TestRuleManager.java b/smart-server/src/test/java/org/smartdata/server/engine/rule/TestRuleManager.java
index 1b8c14691ed..7df4ed80423 100644
--- a/smart-server/src/test/java/org/smartdata/server/engine/rule/TestRuleManager.java
+++ b/smart-server/src/test/java/org/smartdata/server/engine/rule/TestRuleManager.java
@@ -303,7 +303,7 @@ public void testMultiThreadChangeState() throws Exception {
long length = 100;
long fid = 10000;
FileInfo[] files = { new FileInfo("/tmp/testfile", fid, length, false, (short) 3,
- 1024, now, now, (short) 1, null, null, (byte) 3) };
+ 1024, now, now, (short) 1, null, null, (byte) 3, (byte) 0) };
metaStore.insertFiles(files);
long rid = ruleManager.submitRule(rule, RuleState.ACTIVE);