Skip to content

Commit

Permalink
Fix exception error when processing corrupted ARC files, and empty fi…
Browse files Browse the repository at this point in the history
…les. (#272)

* Fix exception when processing corrupted ARC files
* Filter out non-empty archive files in loadArchives()
* Fix archive files path pattern
* Resolves #246
* Resolves #271 
* Resolves #258
  • Loading branch information
borislin authored and ruebot committed Oct 4, 2018
1 parent c95a51d commit b8e57ec
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 6 deletions.
9 changes: 7 additions & 2 deletions src/main/java/io/archivesunleashed/data/ArcRecordUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,13 @@ public static byte[] getContent(final ARCRecord record) throws IOException {
+ "URL IP-address Archive-date Content-type Archive-length";
}

return copyToByteArray(record, (int) meta.getLength()
- versionEtc.length(), true);
try {
return copyToByteArray(record, (int) meta.getLength()
- versionEtc.length(), true);
} catch (Exception e) {
// Catch exceptions related to any corrupt archive files.
return new byte[0];
}
}

/**
Expand Down
24 changes: 20 additions & 4 deletions src/main/scala/io/archivesunleashed/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@

package io

import io.archivesunleashed.data.{ArchiveRecordWritable, ArchiveRecordInputFormat}
import io.archivesunleashed.data.{ArchiveRecordInputFormat, ArchiveRecordWritable}
import ArchiveRecordWritable.ArchiveFormat
import io.archivesunleashed.matchbox.{DetectLanguage, ExtractDate, ExtractLinks, ExtractImageLinks, ExtractImageDetails, ExtractDomain, RemoveHTML, ComputeMD5}
import io.archivesunleashed.matchbox.{ComputeMD5, DetectLanguage, ExtractDate, ExtractDomain, ExtractImageDetails, ExtractImageLinks, ExtractLinks, RemoveHTML}
import io.archivesunleashed.matchbox.ImageDetails
import io.archivesunleashed.matchbox.ExtractDate.DateComponent
import org.apache.hadoop.fs.{FileSystem, Path}
// scalastyle:off underscore.import
import io.archivesunleashed.matchbox.ExtractDate.DateComponent._
import org.apache.spark.sql._
Expand All @@ -41,17 +42,32 @@ import scala.util.matching.Regex
package object archivesunleashed {
/** Loads records from either WARCs, ARCs or Twitter API data (JSON). */
object RecordLoader {
/** Gets all non-empty archive files.
*
* @param dir the path to the directory containing archive files
* @param fs filesystem
* @return a String consisting of all non-empty archive files path.
*/
def getFiles(dir: Path, fs: FileSystem): String = {
val statuses = fs.globStatus(dir)
val files = statuses.filter(f => fs.getContentSummary(f.getPath).getLength > 0).map(f => f.getPath)
files.mkString(",")
}

/** Creates an Archive Record RDD from a WARC or ARC file.
*
* @param path the path to the WARC(s)
* @param sc the apache spark context
* @return an RDD of ArchiveRecords for mapping.
*/
def loadArchives(path: String, sc: SparkContext): RDD[ArchiveRecord] =
sc.newAPIHadoopFile(path, classOf[ArchiveRecordInputFormat], classOf[LongWritable], classOf[ArchiveRecordWritable])
def loadArchives(path: String, sc: SparkContext): RDD[ArchiveRecord] = {
val fs = FileSystem.get(sc.hadoopConfiguration)
val p = new Path(path)
sc.newAPIHadoopFile(getFiles(p, fs), classOf[ArchiveRecordInputFormat], classOf[LongWritable], classOf[ArchiveRecordWritable])
.filter(r => (r._2.getFormat == ArchiveFormat.ARC) ||
((r._2.getFormat == ArchiveFormat.WARC) && r._2.getRecord.getHeader.getHeaderValue("WARC-Type").equals("response")))
.map(r => new ArchiveRecordImpl(new SerializableWritable(r._2)))
}

/** Creates an Archive Record RDD from tweets.
*
Expand Down

0 comments on commit b8e57ec

Please sign in to comment.