Skip to content

Commit

Permalink
Remove RDD option in app; DataFrame only now. (#450)
Browse files Browse the repository at this point in the history
- Resolves #449
- Updates and renames tests were applicable
- Update README to reflect updates
  • Loading branch information
ruebot authored Apr 20, 2020
1 parent d5a0433 commit 17ac324
Show file tree
Hide file tree
Showing 15 changed files with 52 additions and 419 deletions.
4 changes: 1 addition & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ $ mvn clean install

The Toolkit offers a variety of extraction jobs with
[`spark-submit`](https://spark.apache.org/docs/latest/submitting-applications.html)
. These extraction jobs have a few configuration options, and analysis can use
RDD or DataFrame in most cases.
. These extraction jobs have a few configuration options.

The extraction jobs have a basic outline of:

Expand All @@ -80,7 +79,6 @@ Additional flags include:

* `--output-format FORMAT` (Used only for the `DomainGraphExtractor`, and the
options are `TEXT` (default) or `GEXF`.)
* `--df` (The extractor will use a DataFrame to carry out analysis.)
* `--split` (The extractor will put results for each input file in its own
directory. Each directory name will be the name of the ARC/WARC file parsed.)
* `--partition N` (The extractor will partition RDD or DataFrame according to N
Expand Down
117 changes: 13 additions & 104 deletions src/main/scala/io/archivesunleashed/app/CommandLineApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import java.nio.file.{Files, Paths}

import io.archivesunleashed.{ArchiveRecord, RecordLoader}
import org.apache.log4j.Logger
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, Row}
import org.apache.spark.{SparkConf, SparkContext}
import org.rogach.scallop.exceptions.ScallopException
Expand All @@ -46,13 +45,11 @@ import org.rogach.scallop.ScallopConf
* OUTPUT_DIRECTORY is the directory to put result in
*
* FORMAT is meant to work with DomainGraphExtractor
* Three supported options are TEXT (default), GEXF, or GRAPHML
*
* If --df is present, the program will use a DataFrame to carry out analysis
* Three supported options are CSV (default), GEXF, or GRAPHML
*
* If --split is present, the program will put results for each input file in its own folder. Otherwise they will be merged.
*
* If --partition N is present, the program will partition RDD or DataFrame according to N before writing results.
* If --partition N is present, the program will partition the DataFrame according to N before writing results.
* Otherwise, the partition is left as is.
*/

Expand Down Expand Up @@ -82,9 +79,8 @@ class CmdAppConf(args: Seq[String]) extends ScallopConf(args) {
val input = opt[List[String]](descr = "input file path", required = true)
val output = opt[String](descr = "output directory path", required = true)
val outputFormat = opt[String](descr =
"output format for DomainGraphExtractor, one of TEXT, GEXF, or GRAPHML")
"output format for DomainGraphExtractor, one of CSV, GEXF, or GRAPHML")
val split = opt[Boolean]()
val df = opt[Boolean]()
val partition = opt[Int]()
verify()
}
Expand All @@ -100,36 +96,6 @@ class CommandLineApp(conf: CmdAppConf) {
private var saveTarget = ""
private var sparkCtx : Option[SparkContext] = None

/** Maps extractor type string to RDD extractors.
*
* Each closure takes a RDD[ArchiveRecord] obtained from RecordLoader, performs the extraction, and
* saves results to file by calling save method of CommandLineApp class.
* Closures return nothing.
*/

private val rddExtractors = Map[String, RDD[ArchiveRecord] => Any](
"DomainFrequencyExtractor" ->
((rdd: RDD[ArchiveRecord]) => {
save(DomainFrequencyExtractor(rdd))
}),
"DomainGraphExtractor" ->
((rdd: RDD[ArchiveRecord]) => {
if (!configuration.outputFormat.isEmpty && configuration.outputFormat() == "GEXF") {
new File(saveTarget).mkdirs()
WriteGEXF(DomainGraphExtractor(rdd), Paths.get(saveTarget).toString + "/GEXF.gexf")
} else if (!configuration.outputFormat.isEmpty && configuration.outputFormat() == "GRAPHML") {
new File(saveTarget).mkdirs()
WriteGraphML(DomainGraphExtractor(rdd), Paths.get(saveTarget).toString + "/GRAPHML.graphml")
} else {
save(DomainGraphExtractor(rdd))
}
}),
"PlainTextExtractor" ->
((rdd: RDD[ArchiveRecord]) => {
save(PlainTextExtractor(rdd))
})
)

/** Maps extractor type string to DataFrame Extractors.
*
* Each closure takes a list of file names to be extracted, loads them using RecordLoader,
Expand All @@ -138,7 +104,7 @@ class CommandLineApp(conf: CmdAppConf) {
* Closures return nothing.
*/

private val dfExtractors = Map[String, List[String] => Any](
private val extractors = Map[String, List[String] => Any](
"DomainFrequencyExtractor" ->
((inputFiles: List[String]) => {
var df = RecordLoader.loadArchives(inputFiles.head, sparkCtx.get).webpages()
Expand Down Expand Up @@ -208,21 +174,6 @@ class CommandLineApp(conf: CmdAppConf) {
}
}

/** Generic routine for saving RDD obtained from Map Reduce operation of extractors.
*
* @param r RDD obtained by applying RDD extractors to original RDD
* @tparam T template class name for RDD. Not used.
* @return Unit
*/

def save[T](r: RDD[T]): Unit = {
if (!configuration.partition.isEmpty) {
r.coalesce(configuration.partition()).saveAsTextFile(saveTarget)
} else {
r.saveAsTextFile(saveTarget)
}
}

/** Verify the validity of command line arguments regarding input and output files.
*
* All input files need to exist, and ouput files should not exist, for this to pass.
Expand All @@ -244,21 +195,21 @@ class CommandLineApp(conf: CmdAppConf) {
}
}

/** Prepare for invoking DataFrame implementation of extractors.
/** Prepare for invoking extractors.
*
* @return Any
*/

def dfHandler(): Any = {
if (!(dfExtractors contains configuration.extractor())) {
logger.error(configuration.extractor() + " not supported with DataFrame. " +
def handler(): Any = {
if (!(extractors contains configuration.extractor())) {
logger.error(configuration.extractor() + " not supported. " +
"The following extractors are supported: ")
dfExtractors foreach { tuple => logger.error(tuple._1) }
extractors foreach { tuple => logger.error(tuple._1) }
throw new IllegalArgumentException()
}

val extractFunction: List[String] => Any =
dfExtractors get configuration.extractor() match {
extractors get configuration.extractor() match {
case Some(func) => func
case None =>
throw new InternalError()
Expand All @@ -275,54 +226,12 @@ class CommandLineApp(conf: CmdAppConf) {
}
}

/** Prepare for invoking RDD implementation of extractors.
*
* @return Any
*/

def rddHandler(): Any = {
if (!(rddExtractors contains configuration.extractor())) {
logger.error(configuration.extractor() +
" not supported with RDD. The following extractors are supported: ")
rddExtractors foreach { tuple => logger.error(tuple._1) }
throw new IllegalArgumentException()
}

val extractFunction: RDD[ArchiveRecord] => Any =
rddExtractors get configuration.extractor() match {
case Some(func) => func
case None =>
throw new InternalError()
}

if (!configuration.split.isEmpty && configuration.split()) {
configuration.input() foreach { f =>
val archive = RecordLoader.loadArchives(f, sparkCtx.get)
saveTarget = Paths.get(configuration.output(), Paths.get(f).getFileName.toString).toString
extractFunction(archive)
}
} else {
var nameList = configuration.input()
var rdd = RecordLoader.loadArchives(nameList.head, sparkCtx.get)
nameList.tail foreach { f =>
rdd = rdd.union(RecordLoader.loadArchives(f, sparkCtx.get))
}
saveTarget = Paths.get(configuration.output()).toString
extractFunction(rdd)
}
}

/** Choose either DataFrame implementation or RDD implementation of extractors
* depending on the option specified in command line arguments.
/** Process the handler.
*
* @return Any
*/
def process(): Any = {
if (!configuration.df.isEmpty && configuration.df()) {
dfHandler()
} else {
rddHandler()
}
handler()
}

/** Set Spark context to be used.
Expand Down Expand Up @@ -350,7 +259,7 @@ object CommandLineAppRunner {
case x: Throwable => throw x
}

val conf = new SparkConf().setAppName("AUTCommandLineApp")
val conf = new SparkConf().setAppName("Archives Unleashed Toolkit")
conf.set("spark.driver.allowMultipleContexts", "true")
app.setSparkContext(new SparkContext(conf))
app.process()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,11 @@
package io.archivesunleashed.app

import io.archivesunleashed.df.{ExtractDomainDF}
import io.archivesunleashed.matchbox.{ExtractDomainRDD}
import io.archivesunleashed.{ArchiveRecord, DataFrameLoader, CountableRDD}
import org.apache.spark.rdd.RDD
import io.archivesunleashed.{ArchiveRecord, DataFrameLoader}
import org.apache.spark.sql.functions.desc
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object DomainFrequencyExtractor {
/** Extract domain frequency from web archive using RDD.
*
* @param records RDD[ArchiveRecord] obtained from RecordLoader
* @return RDD[(String,Int))], which is (domain, count)
*/
def apply(records: RDD[ArchiveRecord]): RDD[(String, Int)] = {
records
.keepValidPages()
.map(r => ExtractDomainRDD(r.getUrl))
.countItems()
}

/** Extract domain frequency from web archive using DataFrame and Spark SQL.
*
* @param d DataFrame obtained from RecordLoader
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,11 @@
package io.archivesunleashed.app

import io.archivesunleashed.df.{ExtractDomainDF, RemovePrefixWWWDF}
import io.archivesunleashed.matchbox.{ExtractDomainRDD, ExtractLinksRDD}
import io.archivesunleashed.{ArchiveRecord, DataFrameLoader, CountableRDD}
import org.apache.spark.rdd.RDD
import io.archivesunleashed.{ArchiveRecord, DataFrameLoader}
import org.apache.spark.sql.functions.desc
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object DomainGraphExtractor {
/** Extract domain graph from web archive using RDD.
*
* @param records RDD[ArchiveRecord] obtained from RecordLoader
* @return RDD[(String, String, String), Int],
* which is ((CrawlDate, SourceDomain, DestinationDomain), Frequency)
*/
def apply(records: RDD[ArchiveRecord]): RDD[((String, String, String), Int)] = {
records
.keepValidPages()
.map(r => (r.getCrawlDate, ExtractLinksRDD(r.getUrl, r.getContentString)))
.flatMap(r => r._2.map(f =>
(r._1,
ExtractDomainRDD(f._1).replaceAll("^\\s*www\\.", ""),
ExtractDomainRDD(f._2).replaceAll("^\\s*www\\.", ""))
))
.filter(r => r._2 != "" && r._3 != "")
.countItems()
.filter(r => r._2 > 5)
}

/** Extract domain graph from web archive using DataFrame and Spark SQL.
*
* @param d DataFrame obtained from RecordLoader
Expand Down
15 changes: 0 additions & 15 deletions src/main/scala/io/archivesunleashed/app/PlainTextExtractor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,12 @@

package io.archivesunleashed.app

import io.archivesunleashed.matchbox.{RemoveHTMLRDD, RemoveHTTPHeaderRDD}
import io.archivesunleashed.ArchiveRecord
import io.archivesunleashed.df.{ExtractDomainDF, RemoveHTMLDF,
RemoveHTTPHeaderDF}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object PlainTextExtractor {
/** Extract plain text from web archive using RDD.
*
* @param records RDD[ArchiveRecord] obtained from RecordLoader
* @return RDD[(String, String, String, String)], which is
* (crawl date, domain, url, text)
*/
def apply(records: RDD[ArchiveRecord]): RDD[(String, String, String, String)] = {
records
.keepValidPages()
.map(r => (r.getCrawlDate, r.getDomain, r.getUrl,
RemoveHTMLRDD(RemoveHTTPHeaderRDD(r.getContentString))))
}

/** Extract plain text from web archive using DataFrame and Spark SQL.
*
* @param d DataFrame obtained from RecordLoader
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@

package io.archivesunleashed.app

import io.archivesunleashed.matchbox.{RemoveHTMLRDD, RemoveHTTPHeaderRDD}
import io.archivesunleashed.ArchiveRecord
import io.archivesunleashed.df.{ExtractDomainDF, RemoveHTMLDF,
RemoveHTTPHeaderDF}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object WebPagesExtractor {
Expand Down
Loading

0 comments on commit 17ac324

Please sign in to comment.