Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HadoopArchives.java polishing #29

Open
wants to merge 4 commits into
base: trunk
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,14 @@
import com.google.common.base.Charsets;

/**
* a archive creation utility.
* This is an archive creation utility.
* This class provides methods that can be used
* to create hadoop archives. For understanding of
* Hadoop archives look at {@link HarFileSystem}.
*/
public class HadoopArchives implements Tool {
public static final int VERSION = 3;
private static final Log LOG = LogFactory.getLog(HadoopArchives.class);

private static final String NAME = "har";
private static final String ARCHIVE_NAME = "archiveName";
private static final String REPLICATION = "r";
Expand All @@ -104,22 +103,23 @@ public class HadoopArchives implements Tool {
static final String HAR_REPLICATION_LABEL = NAME + ".replication.factor";
/** the size of the part files that will be created when archiving **/
static final String HAR_PARTSIZE_LABEL = NAME + ".partfile.size";

static final String TEST_HADOOP_ARCHIVES_JAR_PATH = "test.hadoop.archives.jar";
/** size of each part file size **/
long partSize = 2 * 1024 * 1024 * 1024l;
/** size of blocks in hadoop archives **/
long blockSize = 512 * 1024 * 1024l;
/** the desired replication degree; default is 3 **/
short repl = 3;

private static final String usage = "archive"
+ " <-archiveName <NAME>.har> <-p <parent path>> [-r <replication factor>]" +
" <src>* <dest>" +
"\n";


private JobConf conf;

public HadoopArchives(Configuration conf) {
setConf(conf);
}

public void setConf(Configuration conf) {
if (conf instanceof JobConf) {
this.conf = (JobConf) conf;
Expand All @@ -130,7 +130,7 @@ public void setConf(Configuration conf) {
// This is for test purposes since MR2, different from Streaming
// here it is not possible to add a JAR to the classpath the tool
// will when running the mapreduce job.
String testJar = System.getProperty(TEST_HADOOP_ARCHIVES_JAR_PATH, null);
String testJar = System.getProperty(TEST_HADOOP_ARCHIVES_JAR_PATH);
if (testJar != null) {
this.conf.setJar(testJar);
}
Expand All @@ -140,10 +140,6 @@ public Configuration getConf() {
return this.conf;
}

public HadoopArchives(Configuration conf) {
setConf(conf);
}

// check the src paths
private static void checkPaths(Configuration conf, List<Path> paths) throws
IOException {
Expand All @@ -158,11 +154,11 @@ private static void checkPaths(Configuration conf, List<Path> paths) throws
/**
* this assumes that there are two types of files file/dir
* @param fs the input filesystem
* @param fdir the filestatusdir of the path
* @param fdir the filestatusdir of the path
* @param out the list of paths output of recursive ls
* @throws IOException
*/
private void recursivels(FileSystem fs, FileStatusDir fdir, List<FileStatusDir> out)
private void recursiveLs(FileSystem fs, FileStatusDir fdir, List<FileStatusDir> out)
throws IOException {
if (fdir.getFileStatus().isFile()) {
out.add(fdir);
Expand All @@ -172,9 +168,9 @@ private void recursivels(FileSystem fs, FileStatusDir fdir, List<FileStatusDir>
out.add(fdir);
FileStatus[] listStatus = fs.listStatus(fdir.getFileStatus().getPath());
fdir.setChildren(listStatus);
for (FileStatus stat: listStatus) {
for (FileStatus stat : listStatus) {
FileStatusDir fstatDir = new FileStatusDir(stat, null);
recursivels(fs, fstatDir, out);
recursiveLs(fs, fstatDir, out);
}
}
}
Expand Down Expand Up @@ -233,8 +229,8 @@ static class HArchiveInputFormat implements InputFormat<LongWritable, HarEntry>
//generate input splits from the src file lists
public InputSplit[] getSplits(JobConf jconf, int numSplits)
throws IOException {
String srcfilelist = jconf.get(SRC_LIST_LABEL, "");
if ("".equals(srcfilelist)) {
String srcFileList = jconf.get(SRC_LIST_LABEL, "");
if ("".equals(srcFileList)) {
throw new IOException("Unable to get the " +
"src file for archive generation.");
}
Expand All @@ -243,10 +239,10 @@ public InputSplit[] getSplits(JobConf jconf, int numSplits)
throw new IOException("Invalid size of files to archive");
}
//we should be safe since this is set by our own code
Path src = new Path(srcfilelist);
Path src = new Path(srcFileList);
FileSystem fs = src.getFileSystem(jconf);
FileStatus fstatus = fs.getFileStatus(src);
ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
ArrayList<FileSplit> splits = new ArrayList<>(numSplits);
LongWritable key = new LongWritable();
final HarEntry value = new HarEntry();
// the remaining bytes in the file split
Expand All @@ -262,7 +258,7 @@ public InputSplit[] getSplits(JobConf jconf, int numSplits)
// have equals sized data to read and write to.
try (SequenceFile.Reader reader = new SequenceFile.Reader(fs, src, jconf)) {
while(reader.next(key, value)) {
if (currentCount + key.get() > targetSize && currentCount != 0){
if (currentCount != 0 && currentCount + key.get() > targetSize){
long size = lastPos - startPos;
splits.add(new FileSplit(src, startPos, size, (String[]) null));
remaining = remaining - size;
Expand All @@ -289,22 +285,18 @@ public RecordReader<LongWritable, HarEntry> getRecordReader(InputSplit split,
}

private boolean checkValidName(String name) {
Path tmp = new Path(name);
if (tmp.depth() != 1) {
return false;
if (name.endsWith(".har")) {
Path tmp = new Path(name);
if (tmp.depth() == 1) return true;
}
if (name.endsWith(".har"))
return true;
return false;
}


private Path largestDepth(List<Path> paths) {
Path deepest = paths.get(0);
for (Path p: paths) {
if (p.depth() > deepest.depth()) {
deepest = p;
}
for (Path p : paths) {
if (p.depth() > deepest.depth()) deepest = p;
}
return deepest;
}
Expand All @@ -320,13 +312,15 @@ private Path relPathToRoot(Path fullPath, Path root) {
// rather than just using substring
// so that we do not break sometime later
final Path justRoot = new Path(Path.SEPARATOR);
if (fullPath.depth() == root.depth()) {
final int fullPathDepth = fullPath.depth();
final int rootDepth = root.depth();
if (fullPathDepth == rootDepth) {
return justRoot;
}
else if (fullPath.depth() > root.depth()) {
else if (fullPathDepth > rootDepth) {
Path retPath = new Path(fullPath.getName());
Path parent = fullPath.getParent();
for (int i=0; i < (fullPath.depth() - root.depth() -1); i++) {
for (int i=0; i < (fullPathDepth - rootDepth -1); i++) {
retPath = new Path(parent.getName(), retPath);
parent = parent.getParent();
}
Expand Down Expand Up @@ -356,7 +350,7 @@ else if (fullPath.depth() > root.depth()) {
private void writeTopLevelDirs(SequenceFile.Writer srcWriter,
List<Path> paths, Path parentPath) throws IOException {
// extract paths from absolute URI's
List<Path> justPaths = new ArrayList<Path>();
List<Path> justPaths = new ArrayList<>();
for (Path p: paths) {
justPaths.add(new Path(p.toUri().getPath()));
}
Expand All @@ -365,38 +359,27 @@ private void writeTopLevelDirs(SequenceFile.Writer srcWriter,
* twice and also we need to only add valid child of a path that
* are specified the user.
*/
TreeMap<String, HashSet<String>> allpaths = new TreeMap<String,
HashSet<String>>();
TreeMap<String, HashSet<String>> allPaths = new TreeMap<>();
/* the largest depth of paths. the max number of times
* we need to iterate
*/
Path deepest = largestDepth(paths);
Path root = new Path(Path.SEPARATOR);
for (int i = parentPath.depth(); i < deepest.depth(); i++) {
List<Path> parents = new ArrayList<Path>();
for (Path p: justPaths) {
if (p.compareTo(root) == 0){
//do nothing
}
else {
Path parent = p.getParent();
if (null != parent) {
if (allpaths.containsKey(parent.toString())) {
HashSet<String> children = allpaths.get(parent.toString());
children.add(p.getName());
}
else {
HashSet<String> children = new HashSet<String>();
children.add(p.getName());
allpaths.put(parent.toString(), children);
}
parents.add(parent);
}
List<Path> parents = new ArrayList<>();
for (Path p : justPaths) {
Path parent = p.getParent();
if (p.compareTo(root) != 0 && parent != null) {
final String patentAsString = parent.toString();
HashSet<String> children = new HashSet<>();
children.add(p.getName());
allPaths.put(patentAsString, children);
parents.add(parent);
}
}
justPaths = parents;
}
Set<Map.Entry<String, HashSet<String>>> keyVals = allpaths.entrySet();
Set<Map.Entry<String, HashSet<String>>> keyVals = allPaths.entrySet();
for (Map.Entry<String, HashSet<String>> entry : keyVals) {
final Path relPath = relPathToRoot(new Path(entry.getKey()), parentPath);
if (relPath != null) {
Expand Down Expand Up @@ -523,10 +506,10 @@ void archive(Path parentPath, List<Path> srcPaths,
// and then write them to the input file
// one at a time
for (Path src: srcPaths) {
ArrayList<FileStatusDir> allFiles = new ArrayList<FileStatusDir>();
FileStatus fstatus = fs.getFileStatus(src);
FileStatusDir fdir = new FileStatusDir(fstatus, null);
recursivels(fs, fdir, allFiles);
List<FileStatusDir> allFiles = new ArrayList<>();
recursiveLs(fs, fdir, allFiles);
for (FileStatusDir statDir: allFiles) {
FileStatus stat = statDir.getFileStatus();
long len = stat.isDirectory()? 0:stat.getLen();
Expand Down Expand Up @@ -869,7 +852,7 @@ public int run(String[] args) throws Exception {
}
// Remaining args
args = commandLine.getArgs();
List<Path> srcPaths = new ArrayList<Path>();
List<Path> srcPaths = new ArrayList<>();
Path destPath = null;
//read the rest of the paths
for (int i = 0; i < args.length; i++) {
Expand Down Expand Up @@ -899,7 +882,7 @@ public int run(String[] args) throws Exception {
srcPaths.add(parentPath);
}
// do a glob on the srcPaths and then pass it on
List<Path> globPaths = new ArrayList<Path>();
List<Path> globPaths = new ArrayList<>();
for (Path p: srcPaths) {
FileSystem fs = p.getFileSystem(getConf());
FileStatus[] statuses = fs.globStatus(p);
Expand All @@ -923,8 +906,6 @@ public int run(String[] args) throws Exception {
return 0;
}

static final String TEST_HADOOP_ARCHIVES_JAR_PATH = "test.hadoop.archives.jar";

/** the main functions **/
public static void main(String[] args) {
JobConf job = new JobConf(HadoopArchives.class);
Expand Down